flink-sql cdc ????????sink?????????sink????????update??hive
hi,all: ??flink-sql cdc sink?sinkupdate??hive?? ??hive??join ??flink-sql??hive??MySQL??flink-sql cdchive
?????? ??????????StreamingFileSink??hive metadata??????????????????
hi??Rui Li?? commited??add partition /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-19/hour=07/part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1031 ---- ??: "user-zh"
?????? ??????????StreamingFileSink??hive metadata??????????????????
sql?? INSERT INTO rt_dwd.dwd_music_copyright_test SELECT url,md5,utime,title,singer,company,level, from_unixtime(cast(utime/1000 as int),'-MM-dd') ,from_unixtime(cast(utime/1000 as int),'HH') FROM music_source; ---- ??: "user-zh"
?????? ??????????StreamingFileSink??hive metadata??????????????????
hi, Rui Li: 2020-09-04 17:17:10,548 INFO org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - Partition {dt=2020-08-22, hour=18} of table `hive_catalog`.`rt_dwd`.`dwd_music_copyright_test` is ready to be committed 2020-09-04 17:17:10,716 INFO org.apache.flink.table.filesystem.MetastoreCommitPolicy [] - Committed partition {dt=2020-08-22, hour=18} to metastore 2020-09-04 17:17:10,720 INFO org.apache.flink.table.filesystem.SuccessFileCommitPolicy[] - Committed partition {dt=2020-08-22, hour=18} with success file 2020-09-04 17:17:19,652 INFO org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - Partition {dt=2020-08-22, hour=19} of table `hive_catalog`.`rt_dwd`.`dwd_music_copyright_test` is ready to be committed 2020-09-04 17:17:19,820 INFO org.apache.flink.table.filesystem.MetastoreCommitPolicy [] - Committed partition {dt=2020-08-22, hour=19} to metastore 2020-09-04 17:17:19,824 INFO org.apache.flink.table.filesystem.SuccessFileCommitPolicy[] - Committed partition {dt=2020-08-22, hour=19} with success file ??hdfs 2020-09-04 17:16:04,100 INFO org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper [] - creating real writer to write at hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-22/hour=07/.part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1140.inprogress.1631ac6c-a07c-4ad7-86ff-cf0d4375d1de 2020-09-04 17:16:04,126 INFO org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper [] - creating real writer to write at hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-22/hour=19/.part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1142.inprogress.2700eded-5ed0-4794-8ee9-21721c0c2ffd ---- ??: "Rui Li"
?????? ??????????StreamingFileSink??hive metadata??????????????????
??checkpointtm??info??warn 2020-09-04 17:17:59,520 INFO org.apache.hadoop.io.retry.RetryInvocationHandler [] - Exception while invoking create of class ClientNamenodeProtocolTranslatorPB over uhadoop-op3raf-master2/10.42.52.202:8020 after 14 fail over attempts. Trying to fail over immediately. java.io.IOException: java.lang.InterruptedException at org.apache.hadoop.ipc.Client.call(Client.java:1449) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.ipc.Client.call(Client.java:1401) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at com.sun.proxy.$Proxy26.create(Unknown Source) ~[?:?] at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:295) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_144] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_144] at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at com.sun.proxy.$Proxy27.create(Unknown Source) ~[?:?] at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1721) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1657) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1582) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) ~[flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar:1.11.0] at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) ~[flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.filesystem.SuccessFileCommitPolicy.commit(SuccessFileCommitPolicy.java:45) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.filesystem.stream.StreamingFileCommitter.commitPartitions(StreamingFileCommitter.java:167) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.filesystem.stream.StreamingFileCommitter.processElement(StreamingFileCommitter.java:144) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) [music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?] at
?????? ????StreamingFileSink??hive metadata??????????????????
?? ---- ??: "MuChen" <9329...@qq.com; :2020??9??7??(??) 11:01 ??:"user-zh"https://s1.ax1x.com/2020/09/07/wn1CFg.png checkpoint?? 2020-09-04 17:17:59 org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) -- -- ??: "user-zh"
?????? ????StreamingFileSink??hive metadata??????????????????
hi,jingsong: ?? https://s1.ax1x.com/2020/09/07/wn1CFg.png checkpoint?? 2020-09-04 17:17:59 org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ---- ??: "user-zh"
????StreamingFileSink??hive metadata??????????????????
hi, all?? 1. DataStream APIkafka??DataStream ds1?? 2. tableEnvhive catalog?? tableEnv.registerCatalog(catalogName, catalog); tableEnv.useCatalog(catalogName); 3. ??ds1??table Table sourcetable = tableEnv.fromDataStream(ds1); String souceTableName = "music_source"; tableEnv.createTemporaryView(souceTableName, sourcetable); 4. hive CREATE TABLE `dwd_music_copyright_test`( `url` string COMMENT 'url', `md5` string COMMENT 'md5', `utime` bigint COMMENT '', `title` string COMMENT '??', `singer` string COMMENT '??', `company` string COMMENT '', `level` int COMMENT '??.0??,1??acrcloud??,3??') PARTITIONED BY ( `dt` string, `hour` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test' TBLPROPERTIES ( 'connector'='HiveCatalog', 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file', 'sink.partition-commit.trigger'='partition-time', 'sink.rolling-policy.check-interval'='30s', 'sink.rolling-policy.rollover-interval'='1min', 'sink.rolling-policy.file-size'='1MB'); 5. ??step3??dwd_music_copyright_test flink:1.11 kafka:1.1.1 hadoop:2.6.0 hive:1.2.0 hive catalog??hour=02??hour=03?? show partitions rt_dwd.dwd_music_copyright_test; | dt=2020-08-29/hour=00 | | dt=2020-08-29/hour=01 | | dt=2020-08-29/hour=04 | | dt=2020-08-29/hour=05 | hdfs?? $ hadoop fs -du -h /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/ 4.5 K 13.4 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=00 2.0 K 6.1 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=01 1.7 K 5.1 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=02 1.3 K 3.8 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=03 3.1 K 9.2 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=04 ??add partition flink WebUI??checkpoint??StreamingFileCommitter?? ?? 1. exactly-once??sink??catalog 2. 3. EXACTLY_ONCE??kafkaisolation.level=read_committed??enable.auto.commit=false??EXACTLY_ONCE?? streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
flink ???? StreamingFileSink ??catalog??????????????
hi, all?? DataStream APIkafka??DataStream ds1?? tableEnvhive catalog?? tableEnv.registerCatalog(catalogName, catalog); tableEnv.useCatalog(catalogName); ??ds1??table Table sourcetable = tableEnv.fromDataStream(ds1); String souceTableName = "music_source"; tableEnv.createTemporaryView(souceTableName, sourcetable); hive CREATE TABLE `dwd_music_copyright_test`( `url` string COMMENT 'url', `md5` string COMMENT 'md5', `utime` bigint COMMENT '', `title` string COMMENT '??', `singer` string COMMENT '??', `company` string COMMENT '', `level` int COMMENT '??.0??,1??acrcloud??,3??') PARTITIONED BY ( `dt` string, `hour` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test' TBLPROPERTIES ( 'connector'='HiveCatalog', 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file', 'sink.partition-commit.trigger'='partition-time', 'sink.rolling-policy.check-interval'='30s', 'sink.rolling-policy.rollover-interval'='1min', 'sink.rolling-policy.file-size'='1MB'); ??step3??dwd_music_copyright_test flink:1.11 kafka:1.1.1 hadoop:2.6.0 hive:1.2.0 hive catalog??hour=02??hour=03?? show partitions rt_dwd.dwd_music_copyright_test; | dt=2020-08-29/hour=00 | | dt=2020-08-29/hour=01 | | dt=2020-08-29/hour=04 | | dt=2020-08-29/hour=05 | hdfs?? $ hadoop fs -du -h /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/ 4.5 K 13.4 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=00 2.0 K 6.1 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=01 1.7 K 5.1 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=02 1.3 K 3.8 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=03 3.1 K 9.2 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=04 ??add partition flink WebUI??checkpoint??StreamingFileCommitter?? ?? exactly-once??sink??catalog EXACTLY_ONCE??kafkaisolation.level=read_committed??enable.auto.commit=false??EXACTLY_ONCE?? streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
?????? flink????yarn??HA??????????????HA??????????????????????state
hi,jiliang1993: ??yarn??yarn.resourcemanager.am.max-attempts yarn.application-attempt-failures-validity-interval??attempts10??10attempts??110??attempts??attemptsmin(yarnyarn.resourcemanager.am.max-attempts,flinkyarn.application-attempts)yarn?? Best, MuChen. ---- ??:"jiliang1993"https://blog.csdn.net/cndotaci/article/details/106870413 gt; ??flinkyarn??2??6yarn?? gt; gt; gt; gt; 1. gt; gt; 2. HAstate?? gt; gt; flink??1.10.0 gt; gt; flink-conf.yaml?? gt; $ grep -v ^# flink-conf.yaml |grep -v ^$ jobmanager.rpc.address: localhost gt; jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m gt; taskmanager.memory.process.size: 1568m taskmanager.numberOfTaskSlots: 1 gt; parallelism.default: 1 high-availability: zookeeper gt; high-availability.storageDir: hdfs:///flink/ha/ gt; high-availability.zookeeper.quorum: gt; uhadoop-op3raf-master1,uhadoop-op3raf-master2,uhadoop-op3raf-core1 gt; state.checkpoints.dir: hdfs:///flink/checkpoint state.savepoints.dir: gt; hdfs:///flink/flink-savepoints state.checkpoints.num-retained:60 gt; state.backend.incremental: true jobmanager.execution.failover-strategy: gt; region jobmanager.archive.fs.dir: hdfs:///flink/flink-jobs/ gt; historyserver.web.port: 8082 historyserver.archive.fs.dir: gt; hdfs:///flink/flink-jobs/ historyserver.archive.fs.refresh-interval: 1 gt; # HA yarn.application-attempts: 2 gt; ssh??jm??kill gt; [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 16853 gt; YarnTaskExecutorRunner 17527 PrestoServer 33289 YarnTaskExecutorRunner gt; 18026 YarnJobClusterEntrypoint 20283 Jps 39599 NodeManager gt; [root@uhadoop-op3raf-task48 ~]# kill -9 18026 [root@uhadoop-op3raf-task48 gt; ~]# jps 34785 YarnTaskExecutorRunner 16853 -- process information gt; unavailable 17527 PrestoServer 21383 Jps 33289 YarnTaskExecutorRunner 20412 gt; YarnJobClusterEntrypoint 39599 NodeManager [root@uhadoop-op3raf-task48 gt; ~]# kill -9 20412 [root@uhadoop-op3raf-task48 ~]# jps 34785 gt; YarnTaskExecutorRunner 21926 YarnJobClusterEntrypoint 23207 Jps 17527 gt; PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager gt; [root@uhadoop-op3raf-task48 ~]# kill -9 21926 [root@uhadoop-op3raf-task48 gt; ~]# jps 34785 YarnTaskExecutorRunner 23318 YarnJobClusterEntrypoint 26279 gt; Jps 17527 PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager gt; [root@uhadoop-op3raf-task48 ~]# kill -9 23318
?????? flink????yarn??HA??????????????HA??????????????????????state
hi ?? Best, MuChen. ---- ??:""https://blog.csdn.net/cndotaci/article/details/106870413 ??flinkyarn??2??6yarn?? 1. 2. HAstate?? flink??1.10.0 flink-conf.yaml?? $ grep -v ^# flink-conf.yaml |grep -v ^$ jobmanager.rpc.address: localhost jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m taskmanager.memory.process.size: 1568m taskmanager.numberOfTaskSlots: 1 parallelism.default: 1 high-availability: zookeeper high-availability.storageDir: hdfs:///flink/ha/ high-availability.zookeeper.quorum: uhadoop-op3raf-master1,uhadoop-op3raf-master2,uhadoop-op3raf-core1 state.checkpoints.dir: hdfs:///flink/checkpoint state.savepoints.dir: hdfs:///flink/flink-savepoints state.checkpoints.num-retained:60 state.backend.incremental: true jobmanager.execution.failover-strategy: region jobmanager.archive.fs.dir: hdfs:///flink/flink-jobs/ historyserver.web.port: 8082 historyserver.archive.fs.dir: hdfs:///flink/flink-jobs/ historyserver.archive.fs.refresh-interval: 1 # HA yarn.application-attempts: 2 ssh??jm??kill [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 16853 YarnTaskExecutorRunner 17527 PrestoServer 33289 YarnTaskExecutorRunner 18026 YarnJobClusterEntrypoint 20283 Jps 39599 NodeManager [root@uhadoop-op3raf-task48 ~]# kill -9 18026 [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 16853 -- process information unavailable 17527 PrestoServer 21383 Jps 33289 YarnTaskExecutorRunner 20412 YarnJobClusterEntrypoint 39599 NodeManager [root@uhadoop-op3raf-task48 ~]# kill -9 20412 [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 21926 YarnJobClusterEntrypoint 23207 Jps 17527 PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager [root@uhadoop-op3raf-task48 ~]# kill -9 21926 [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 23318 YarnJobClusterEntrypoint 26279 Jps 17527 PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager [root@uhadoop-op3raf-task48 ~]# kill -9 23318
flink????yarn??HA??????????????HA??????????????????????state
hi??all?? ??https://blog.csdn.net/cndotaci/article/details/106870413??flinkyarn??2??6yarn?? 1. 2. HAstate?? flink??1.10.0 flink-conf.yaml?? $ grep -v ^# flink-conf.yaml |grep -v ^$ jobmanager.rpc.address: localhost jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m taskmanager.memory.process.size: 1568m taskmanager.numberOfTaskSlots: 1 parallelism.default: 1 high-availability: zookeeper high-availability.storageDir: hdfs:///flink/ha/ high-availability.zookeeper.quorum: uhadoop-op3raf-master1,uhadoop-op3raf-master2,uhadoop-op3raf-core1 state.checkpoints.dir: hdfs:///flink/checkpoint state.savepoints.dir: hdfs:///flink/flink-savepoints state.checkpoints.num-retained:60 state.backend.incremental: true jobmanager.execution.failover-strategy: region jobmanager.archive.fs.dir: hdfs:///flink/flink-jobs/ historyserver.web.port: 8082 historyserver.archive.fs.dir: hdfs:///flink/flink-jobs/ historyserver.archive.fs.refresh-interval: 1 # HA yarn.application-attempts: 2 ssh??jm??kill [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 16853 YarnTaskExecutorRunner 17527 PrestoServer 33289 YarnTaskExecutorRunner 18026 YarnJobClusterEntrypoint 20283 Jps 39599 NodeManager [root@uhadoop-op3raf-task48 ~]# kill -9 18026 [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 16853 -- process information unavailable 17527 PrestoServer 21383 Jps 33289 YarnTaskExecutorRunner 20412 YarnJobClusterEntrypoint 39599 NodeManager [root@uhadoop-op3raf-task48 ~]# kill -9 20412 [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 21926 YarnJobClusterEntrypoint 23207 Jps 17527 PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager [root@uhadoop-op3raf-task48 ~]# kill -9 21926 [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 23318 YarnJobClusterEntrypoint 26279 Jps 17527 PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager [root@uhadoop-op3raf-task48 ~]# kill -9 23318
?????? flinksql????????????????????
?? ??hive table??dag??hive??hive3??subtask?? https://s1.ax1x.com/2020/06/30/N4qxNq.png subtaskrunning19??SUCCESS?? ---- ??:"Rui Li"https://s1.ax1x.com/2020/06/29/Nf2dIA.png gt; gt; INFO15:34?? gt; 2020-06-29 14:53:20,260 INFOamp;nbsp; gt; org.apache.flink.api.common.io.LocatableInputSplitAssigneramp;nbsp;amp;nbsp;amp;nbsp; gt; - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29 gt; 14:53:22,845 INFOamp;nbsp; gt; org.apache.flink.runtime.executiongraph.ExecutionGraphamp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; gt; - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70, gt; PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5 gt; 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. 2020-06-29 gt; 15:34:52,982 INFOamp;nbsp; gt; org.apache.flink.runtime.entrypoint.ClusterEntrypointamp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; gt; - Shutting YarnSessionClusterEntrypoint down with application status gt; SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFOamp;nbsp; gt; org.apache.flink.runtime.dispatcher.DispatcherRestEndpointamp;nbsp;amp;nbsp;amp;nbsp; gt; - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFOamp;nbsp; gt; org.apache.flink.runtime.dispatcher.DispatcherRestEndpointamp;nbsp;amp;nbsp;amp;nbsp; gt; - Removing cache directory gt; /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 2020-06-29 gt; 15:34:53,073 INFOamp;nbsp; gt; org.apache.flink.runtime.dispatcher.DispatcherRestEndpointamp;nbsp;amp;nbsp;amp;nbsp; gt; - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29 gt; 15:34:53,074 INFOamp;nbsp; gt; org.apache.flink.runtime.dispatcher.DispatcherRestEndpointamp;nbsp;amp;nbsp;amp;nbsp; gt; - Shut down complete. 2020-06-29 15:34:53,074 INFOamp;nbsp; gt; org.apache.flink.yarn.YarnResourceManageramp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; gt; - Shut down cluster because application is in SUCCEEDED, diagnostics null. gt; 2020-06-29 15:34:53,076 INFOamp;nbsp; gt; org.apache.flink.yarn.YarnResourceManageramp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; gt; - Unregister application from the YARN Resource Manager with final status gt; SUCCEEDED. 2020-06-29 15:34:53,088 INFOamp;nbsp; gt; org.apache.hadoop.yarn.client.api.impl.AMRMClientImplamp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; gt; - Waiting for application to be successfully unregistered. 2020-06-29 gt; 15:34:53,306 INFOamp;nbsp; gt; org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentamp;nbsp; gt; - Closing components. 2020-06-29 15:34:53,308 INFOamp;nbsp; gt; org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcessamp;nbsp; gt; - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309 gt; INFOamp;nbsp; gt; org.apache.flink.runtime.dispatcher.StandaloneDispatcheramp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; gt; - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1 :38817/user/dispatcher. gt; 2020-06-29 15:34:53,310 INFOamp;nbsp; gt; org.apache.flink.runtime.dispatcher.StandaloneDispatcheramp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; gt; - Stopping all currently running jobs of dispatcher gt; akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 gt; 15:34:53,311 INFOamp;nbsp; gt; org.apache.flink.runtime.jobmaster.JobMasteramp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; gt; - Stopping the JobMaster for job default: insert into gt; rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 INFOamp;nbsp; gt; org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImplamp;nbsp; - gt; Interrupted while waiting for queue gt; java.lang.InterruptedExceptionamp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; gt; at gt; java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; gt; at gt; java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; gt; at gt;
?????? flinksql????????????????????
$ grep -v \# sql-client-defaults.yaml |grep -v ^$ catalogs:- name: myhive type: hive hive-conf-dir: /home/fsql/hive/conf default-database: default execution: planner: blink type: streaming time-characteristic: event-time periodic-watermarks-interval: 200 result-mode: table max-table-result-rows: 100 parallelism: 4 max-parallelism: 128 min-idle-state-retention: 0 max-idle-state-retention: 0 current-catalog: myhive current-database: default restart-strategy: type: fixed-delay deployment: response-timeout: 5000 gateway-address: "" gateway-port: 0 ---- ??:"zhisheng"https://s1.ax1x.com/2020/06/29/Nf2dIA.png INFO15:34?? 2020-06-29 14:53:20,260 INFOnbsp; org.apache.flink.api.common.io.LocatableInputSplitAssignernbsp;nbsp;nbsp; - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29 14:53:22,845 INFOnbsp; org.apache.flink.runtime.executiongraph.ExecutionGraphnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70, PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. 2020-06-29 15:34:52,982 INFOnbsp; org.apache.flink.runtime.entrypoint.ClusterEntrypointnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; - Shutting YarnSessionClusterEntrypoint down with application status SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFOnbsp; org.apache.flink.runtime.dispatcher.DispatcherRestEndpointnbsp;nbsp;nbsp; - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFOnbsp; org.apache.flink.runtime.dispatcher.DispatcherRestEndpointnbsp;nbsp;nbsp; - Removing cache directory /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 2020-06-29 15:34:53,073 INFOnbsp; org.apache.flink.runtime.dispatcher.DispatcherRestEndpointnbsp;nbsp;nbsp; - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29 15:34:53,074 INFOnbsp; org.apache.flink.runtime.dispatcher.DispatcherRestEndpointnbsp;nbsp;nbsp; - Shut down complete. 2020-06-29 15:34:53,074 INFOnbsp; org.apache.flink.yarn.YarnResourceManagernbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; - Shut down cluster because application is in SUCCEEDED, diagnostics null. 2020-06-29 15:34:53,076 INFOnbsp; org.apache.flink.yarn.YarnResourceManagernbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; - Unregister application from the YARN Resource Manager with final status SUCCEEDED. 2020-06-29 15:34:53,088 INFOnbsp; org.apache.hadoop.yarn.client.api.impl.AMRMClientImplnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; - Waiting for application to be successfully unregistered. 2020-06-29 15:34:53,306 INFOnbsp; org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentnbsp; - Closing components. 2020-06-29 15:34:53,308 INFOnbsp; org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcessnbsp; - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309 INFOnbsp; org.apache.flink.runtime.dispatcher.StandaloneDispatchernbsp;nbsp;nbsp;nbsp;nbsp; - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 15:34:53,310 INFOnbsp; org.apache.flink.runtime.dispatcher.StandaloneDispatchernbsp;nbsp;nbsp;nbsp;nbsp; - Stopping all currently running jobs of dispatcher akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 15:34:53,311 INFOnbsp; org.apache.flink.runtime.jobmaster.JobMasternbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; - Stopping the JobMaster for job default: insert into rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 INFOnbsp; org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImplnbsp; - Interrupted while waiting for queue java.lang.InterruptedExceptionnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287) 2020-06-29 15:34:53,324 INFOnbsp; org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxynbsp; - Opening proxy : uhadoop-op3raf-core12:2 nbsp; ps:amp;nbsp; 1. kafka 2. flink1.10.0 ??SUCCEEDED ??
flinksql????????????????????
hi, yarn-session??bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli 21 sql-clientsql?? kafkahive??joinmysql succeeded??https://s1.ax1x.com/2020/06/29/Nf2dIA.png INFO15:34?? 2020-06-29 14:53:20,260 INFO org.apache.flink.api.common.io.LocatableInputSplitAssigner- Assigning remote split to host uhadoop-op3raf-core12 2020-06-29 14:53:22,845 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70, PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. 2020-06-29 15:34:52,982 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting YarnSessionClusterEntrypoint down with application status SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Removing cache directory /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 2020-06-29 15:34:53,073 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29 15:34:53,074 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Shut down complete. 2020-06-29 15:34:53,074 INFO org.apache.flink.yarn.YarnResourceManager - Shut down cluster because application is in SUCCEEDED, diagnostics null. 2020-06-29 15:34:53,076 INFO org.apache.flink.yarn.YarnResourceManager - Unregister application from the YARN Resource Manager with final status SUCCEEDED. 2020-06-29 15:34:53,088 INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Waiting for application to be successfully unregistered. 2020-06-29 15:34:53,306 INFO org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent - Closing components. 2020-06-29 15:34:53,308 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 15:34:53,310 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Stopping all currently running jobs of dispatcher akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 15:34:53,311 INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job default: insert into rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 INFO org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - Interrupted while waiting for queue java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287) 2020-06-29 15:34:53,324 INFO org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - Opening proxy : uhadoop-op3raf-core12:2 ps: 1. kafka 2. flink1.10.0 ??SUCCEEDED ?? sql?? # -- ??5??vid??vid_group -- ??55mysql insert into rt_app.app_video_cover_abtest_test select begin_time, vid, vid_group, max(dv), max(click), max(vv), max(effectivevv) from( select t1.begin_time begin_time, t1.u_vid vid, t1.u_vid_group vid_group, dv, click, vv, if(effectivevv is null,0,effectivevv) effectivevv from ( -- dv??click??vv select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) begin_time, cast(u_vid as bigint) u_vid,u_vid_group, sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and u_c_module='M011',1,0)) dv, sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and u_c_module='M011',1,0)) click,sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0)) vv FROM rt_ods.ods_applog_vidsplit where u_vid is not null and trim(u_vid)<''and u_vid_group is not null and trim(u_vid_group) not in ('','-1')and ( (concat(u_mod,'-',u_ac) in
??????flinksql????hbase??????????
Hi,Roc Marshal: Best, MuChen. ---- ??:"Roc Marshal"http://uhadoop-op3raf-core24:42976 sql-client: bin/sql-client.sh embedded hbaseflinksql?? # CREATE TABLE hbase_video_pic_title_q70 ( key string, cf1 ROW
flinksql????hbase??????????
Hi, All: ??flinksqlhbase hadoop??masterflink?? yarn-session: bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli 21 # ?? [admin@uhadoop-op3raf-master2 flink10]$ 2020-06-23 09:30:56,402 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - Authentication failed 2020-06-23 09:30:56,515 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - Authentication failed JobManager Web Interface: http://uhadoop-op3raf-core24:42976 sql-client: bin/sql-client.sh embedded hbaseflinksql?? # CREATE TABLE hbase_video_pic_title_q70 ( key string, cf1 ROW
?????? ???? yarn-session.sh??????????????????????????????
hi,Yang Wang: HDFSstaging($HOME/.flink/application_id)??*flink-conf.yaml*??-qu??-nm??-jm??-tm ??-n??-n ?? Best, MuChen ---- ??:"Yang Wang"https://imgchr.com/i/NJIn4x ??Flink session cluster ??root.default ?? flink??1.10.0 flink-conf.yaml?? [fsql@10-42-63-116 conf]$ grep -v ^# flink-conf.yaml |grep -v ^$ jobmanager.rpc.address: localhost jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m taskmanager.memory.process.size: 2048m taskmanager.numberOfTaskSlots: 10 parallelism.default: 1 jobmanager.execution.failover-strategy: region
???? yarn-session.sh??????????????????????????????
hi,all: hadoop??A. Abin/yarn-session.sh -n 5 -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli /dev/null 21 yarn-session?? root.flinkfsql-cli. yarn https://imgchr.com/i/NJIn4x ??Flink session cluster ??root.default ?? flink??1.10.0 flink-conf.yaml?? [fsql@10-42-63-116 conf]$ grep -v ^# flink-conf.yaml |grep -v ^$ jobmanager.rpc.address: localhost jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m taskmanager.memory.process.size: 2048m taskmanager.numberOfTaskSlots: 10 parallelism.default: 1 jobmanager.execution.failover-strategy: region
sql-client????????????????SUCCEEDED????
hi, yarn-session:bin/yarn-session.sh -n 5 -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli /dev/null 21 sql-clientsql?? kafkahive??joinmysql sql?? # -- ??5??vid??vid_group -- ??55mysql insert into rt_app.app_video_cover_abtest_test select begin_time, vid, vid_group, max(dv), max(click), max(vv), max(effectivevv) from( select t1.begin_time begin_time, t1.u_vid vid, t1.u_vid_group vid_group, dv, click, vv, if(effectivevv is null,0,effectivevv) effectivevv from ( -- dv??click??vv select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) begin_time, cast(u_vid as bigint) u_vid,u_vid_group, sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and u_c_module='M011',1,0)) dv, sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and u_c_module='M011',1,0)) click,sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0)) vv FROM rt_ods.ods_applog_vidsplit where u_vid is not null and trim(u_vid)<''and u_vid_group is not null and trim(u_vid_group) not in ('','-1')and ( (concat(u_mod,'-',u_ac) in ('emptylog-video_display','emptylog-video_click') and u_c_module='M011') or (concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011') ) group by TUMBLE(bjdt, INTERVAL '5' MINUTE),cast(u_vid as bigint),u_vid_group ) t1 left join ( -- effectivevv selectbegin_time,u_vid, u_vid_group,count(1) effectivevv from (select begin_time, u_vid, u_vid_group, u_diu, u_playid, m_pt, q70fromdw.video_pic_title_q70 a join( select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) begin_time, cast(u_vid as bigint) u_vid, u_vid_group, u_diu, u_playid, max(u_playtime) m_pt FROM rt_ods.ods_applog_vidsplit where u_vid is not null and trim(u_vid)<'' and u_vid_group is not null and trim(u_vid_group) not in ('','-1') and concat(u_mod,'-',u_ac)='emptylog-video_play_speed' and u_f_module='M011' and u_playtime0 group by TUMBLE(bjdt, INTERVAL '5' MINUTE), cast(u_vid as bigint), u_vid_group, u_diu, u_playid) bon a.vid=b.u_vidgroup by begin_time, u_vid, u_vid_group, u_diu, u_playid, m_pt, q70 ) temp where m_pt=q70 group by begin_time,u_vid,u_vid_group ) t2 on t1.begin_time=t2.begin_time and t1.u_vid=t2.u_vid and t1.u_vid_group=t2.u_vid_group )t3 group by begin_time, vid, vid_group ; succeeded??https://s1.ax1x.com/2020/06/18/NnyX24.png INFO??: 2020-06-17 21:27:07,968 INFO org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - Interrupted while waiting for queue java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:201 4) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287) ps: 1. kafka 2. flink1.10.0 ??SUCCEEDED ??
sql-client????????????????SUCCEEDED????
hi, yarn-session:bin/yarn-session.sh -n 5 -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli /dev/null 21 sql-clientsql?? kafkahive??joinmysql ??succeeded?? ps:kafka ??SUCCEEDED ??