flink-sql cdc ????????sink?????????sink????????update??hive

2020-09-15 文章 MuChen
hi,all:
  ??flink-sql cdc 
sink?sinkupdate??hive??


??hive??join
??flink-sql??hive??MySQL??flink-sql
 cdchive

?????? ??????????StreamingFileSink??hive metadata??????????????????

2020-09-08 文章 MuChen
hi??Rui Li??
commited??add partition
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-19/hour=07/part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1031




----
??: 
   "user-zh"



?????? ??????????StreamingFileSink??hive metadata??????????????????

2020-09-08 文章 MuChen
sql??


INSERT INTO rt_dwd.dwd_music_copyright_test
SELECT url,md5,utime,title,singer,company,level,
  from_unixtime(cast(utime/1000 as int),'-MM-dd')
  ,from_unixtime(cast(utime/1000 as int),'HH') FROM music_source;





----
??: 
   "user-zh"



?????? ??????????StreamingFileSink??hive metadata??????????????????

2020-09-08 文章 MuChen
hi, Rui Li:

2020-09-04 17:17:10,548 INFO  
org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - Partition 
{dt=2020-08-22, hour=18} of table 
`hive_catalog`.`rt_dwd`.`dwd_music_copyright_test` is ready to be committed
2020-09-04 17:17:10,716 INFO  
org.apache.flink.table.filesystem.MetastoreCommitPolicy  [] - Committed 
partition {dt=2020-08-22, hour=18} to metastore
2020-09-04 17:17:10,720 INFO  
org.apache.flink.table.filesystem.SuccessFileCommitPolicy[] - Committed 
partition {dt=2020-08-22, hour=18} with success file
2020-09-04 17:17:19,652 INFO  
org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - Partition 
{dt=2020-08-22, hour=19} of table 
`hive_catalog`.`rt_dwd`.`dwd_music_copyright_test` is ready to be committed
2020-09-04 17:17:19,820 INFO  
org.apache.flink.table.filesystem.MetastoreCommitPolicy  [] - Committed 
partition {dt=2020-08-22, hour=19} to metastore
2020-09-04 17:17:19,824 INFO  
org.apache.flink.table.filesystem.SuccessFileCommitPolicy[] - Committed 
partition {dt=2020-08-22, hour=19} with success file






??hdfs
2020-09-04 17:16:04,100 INFO  
org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper [] - 
creating real writer to write at 
hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-22/hour=07/.part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1140.inprogress.1631ac6c-a07c-4ad7-86ff-cf0d4375d1de

2020-09-04 17:16:04,126 INFO  
org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper [] - 
creating real writer to write at 
hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-22/hour=19/.part-b7d8f3c6-f1f3-40d4-a269-1ccf2c9a7720-0-1142.inprogress.2700eded-5ed0-4794-8ee9-21721c0c2ffd



----
??: 
   "Rui Li" 
   


?????? ??????????StreamingFileSink??hive metadata??????????????????

2020-09-07 文章 MuChen
??checkpointtm??info??warn
2020-09-04 17:17:59,520 INFO  org.apache.hadoop.io.retry.RetryInvocationHandler 
   [] - Exception while invoking create of class 
ClientNamenodeProtocolTranslatorPB over 
uhadoop-op3raf-master2/10.42.52.202:8020 after 14 fail over attempts. Trying to 
fail over immediately.
java.io.IOException: java.lang.InterruptedException
at org.apache.hadoop.ipc.Client.call(Client.java:1449) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.hadoop.ipc.Client.call(Client.java:1401) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at com.sun.proxy.$Proxy26.create(Unknown Source) ~[?:?]
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:295)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown 
Source) ~[?:?]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_144]
at java.lang.reflect.Method.invoke(Method.java:498) 
~[?:1.8.0_144]
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at com.sun.proxy.$Proxy27.create(Unknown Source) ~[?:?]
at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1721)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1657) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1582) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.filesystem.SuccessFileCommitPolicy.commit(SuccessFileCommitPolicy.java:45)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.filesystem.stream.StreamingFileCommitter.commitPartitions(StreamingFileCommitter.java:167)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.filesystem.stream.StreamingFileCommitter.processElement(StreamingFileCommitter.java:144)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 [music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 

?????? ????StreamingFileSink??hive metadata??????????????????

2020-09-07 文章 MuChen
??




----
??: 
   "MuChen" 
   <9329...@qq.com;
:2020??9??7??(??) 11:01
??:"user-zh"https://s1.ax1x.com/2020/09/07/wn1CFg.png


checkpoint??
2020-09-04 17:17:59
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
 at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736)
 at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)





--  --
??: 
   "user-zh"



?????? ????StreamingFileSink??hive metadata??????????????????

2020-09-06 文章 MuChen
hi,jingsong:


??
https://s1.ax1x.com/2020/09/07/wn1CFg.png


checkpoint??
2020-09-04 17:17:59
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)





----
??: 
   "user-zh"



????StreamingFileSink??hive metadata??????????????????

2020-09-06 文章 MuChen
hi, all??




1. DataStream APIkafka??DataStream ds1??
2. tableEnvhive catalog??
  tableEnv.registerCatalog(catalogName, catalog);
  tableEnv.useCatalog(catalogName);
3. ??ds1??table
  Table sourcetable = tableEnv.fromDataStream(ds1);
  String souceTableName = "music_source";
  tableEnv.createTemporaryView(souceTableName, sourcetable);
4. hive
CREATE TABLE `dwd_music_copyright_test`(   `url` string COMMENT 'url',   `md5` 
string COMMENT 'md5',   `utime` bigint COMMENT '',   `title` string COMMENT 
'??',   `singer` string COMMENT '??',   `company` string COMMENT 
'',   `level` int COMMENT 
'??.0??,1??acrcloud??,3??') PARTITIONED BY (   `dt` 
string,   `hour` string) ROW FORMAT SERDE   
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS 
INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION   
'hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test' 
TBLPROPERTIES (   'connector'='HiveCatalog',   
'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',   
'sink.partition-commit.delay'='1 min',   
'sink.partition-commit.policy.kind'='metastore,success-file',   
'sink.partition-commit.trigger'='partition-time',   
'sink.rolling-policy.check-interval'='30s',   
'sink.rolling-policy.rollover-interval'='1min',   
'sink.rolling-policy.file-size'='1MB');


5. ??step3??dwd_music_copyright_test



flink:1.11 kafka:1.1.1 hadoop:2.6.0 hive:1.2.0

  hive 
catalog??hour=02??hour=03??
show partitions rt_dwd.dwd_music_copyright_test; | dt=2020-08-29/hour=00  | | 
dt=2020-08-29/hour=01  | | dt=2020-08-29/hour=04  | | dt=2020-08-29/hour=05  |
hdfs??
$ hadoop fs -du -h 
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/ 4.5 K   
13.4 K  
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=00 
2.0 K   6.1 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=01 
1.7 K   5.1 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=02 
1.3 K   3.8 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=03 
3.1 K   9.2 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=04 


??add partition


flink 
WebUI??checkpoint??StreamingFileCommitter??









??


1. exactly-once??sink??catalog
2. 
3. 
EXACTLY_ONCE??kafkaisolation.level=read_committed??enable.auto.commit=false??EXACTLY_ONCE??
streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE);

flink ???? StreamingFileSink ??catalog??????????????

2020-09-04 文章 MuChen
hi, all??




DataStream APIkafka??DataStream ds1??

tableEnvhive catalog??
tableEnv.registerCatalog(catalogName, catalog); 
tableEnv.useCatalog(catalogName); 
??ds1??table
Table sourcetable = tableEnv.fromDataStream(ds1); String souceTableName = 
"music_source"; tableEnv.createTemporaryView(souceTableName, sourcetable); 
hive
CREATE TABLE `dwd_music_copyright_test`(   `url` string COMMENT 'url',   `md5` 
string COMMENT 'md5',   `utime` bigint COMMENT '',   `title` string COMMENT 
'??',   `singer` string COMMENT '??',   `company` string COMMENT 
'',   `level` int COMMENT 
'??.0??,1??acrcloud??,3??') PARTITIONED BY (   `dt` 
string,   `hour` string) ROW FORMAT SERDE   
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS 
INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION   
'hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test' 
TBLPROPERTIES (   'connector'='HiveCatalog',   
'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',   
'sink.partition-commit.delay'='1 min',   
'sink.partition-commit.policy.kind'='metastore,success-file',   
'sink.partition-commit.trigger'='partition-time',   
'sink.rolling-policy.check-interval'='30s',   
'sink.rolling-policy.rollover-interval'='1min',   
'sink.rolling-policy.file-size'='1MB'); 
??step3??dwd_music_copyright_test


flink:1.11 kafka:1.1.1 hadoop:2.6.0 hive:1.2.0 


hive 
catalog??hour=02??hour=03??
show partitions rt_dwd.dwd_music_copyright_test; | dt=2020-08-29/hour=00  | | 
dt=2020-08-29/hour=01  | | dt=2020-08-29/hour=04  | | dt=2020-08-29/hour=05  | 
hdfs??
$ hadoop fs -du -h 
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/ 4.5 K   
13.4 K  
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=00 
2.0 K   6.1 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=01 
1.7 K   5.1 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=02 
1.3 K   3.8 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=03 
3.1 K   9.2 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=04 
??add partition

flink 
WebUI??checkpoint??StreamingFileCommitter??





??

exactly-once??sink??catalog



EXACTLY_ONCE??kafkaisolation.level=read_committed??enable.auto.commit=false??EXACTLY_ONCE??
streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE);

?????? flink????yarn??HA??????????????HA??????????????????????state

2020-07-01 文章 MuChen
hi,jiliang1993:


??yarn??yarn.resourcemanager.am.max-attempts
yarn.application-attempt-failures-validity-interval??attempts10??10attempts??110??attempts??attemptsmin(yarnyarn.resourcemanager.am.max-attempts,flinkyarn.application-attempts)yarn??



Best,
MuChen.


----
??:"jiliang1993"https://blog.csdn.net/cndotaci/article/details/106870413 gt; 
??flinkyarn??2??6yarn??
 gt; gt;  gt; gt; 1. 
 gt; gt; 2. 
HAstate?? gt; gt; 
flink??1.10.0 gt; gt; flink-conf.yaml?? gt; $ grep -v ^# 
flink-conf.yaml |grep -v ^$ jobmanager.rpc.address: localhost gt; 
jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m gt; 
taskmanager.memory.process.size: 1568m taskmanager.numberOfTaskSlots: 1 
gt; parallelism.default: 1 high-availability: zookeeper gt; 
high-availability.storageDir: hdfs:///flink/ha/ gt; 
high-availability.zookeeper.quorum: gt; 
uhadoop-op3raf-master1,uhadoop-op3raf-master2,uhadoop-op3raf-core1 gt; 
state.checkpoints.dir: hdfs:///flink/checkpoint state.savepoints.dir: gt; 
hdfs:///flink/flink-savepoints state.checkpoints.num-retained:60 gt; 
state.backend.incremental: true jobmanager.execution.failover-strategy: 
gt; region jobmanager.archive.fs.dir: hdfs:///flink/flink-jobs/ gt; 
historyserver.web.port: 8082 historyserver.archive.fs.dir: gt; 
hdfs:///flink/flink-jobs/ historyserver.archive.fs.refresh-interval: 1 
gt; # HA yarn.application-attempts: 2 gt; 
ssh??jm??kill gt; [root@uhadoop-op3raf-task48 ~]# 
jps 34785 YarnTaskExecutorRunner 16853 gt; YarnTaskExecutorRunner 17527 
PrestoServer 33289 YarnTaskExecutorRunner gt; 18026 
YarnJobClusterEntrypoint 20283 Jps 39599 NodeManager gt; 
[root@uhadoop-op3raf-task48 ~]# kill -9 18026 [root@uhadoop-op3raf-task48 
gt; ~]# jps 34785 YarnTaskExecutorRunner 16853 -- process information 
gt; unavailable 17527 PrestoServer 21383 Jps 33289 YarnTaskExecutorRunner 
20412 gt; YarnJobClusterEntrypoint 39599 NodeManager 
[root@uhadoop-op3raf-task48 gt; ~]# kill -9 20412 
[root@uhadoop-op3raf-task48 ~]# jps 34785 gt; YarnTaskExecutorRunner 21926 
YarnJobClusterEntrypoint 23207 Jps 17527 gt; PrestoServer 33289 
YarnTaskExecutorRunner 39599 NodeManager gt; [root@uhadoop-op3raf-task48 
~]# kill -9 21926 [root@uhadoop-op3raf-task48 gt; ~]# jps 34785 
YarnTaskExecutorRunner 23318 YarnJobClusterEntrypoint 26279 gt; Jps 17527 
PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager gt; 
[root@uhadoop-op3raf-task48 ~]# kill -9 23318

?????? flink????yarn??HA??????????????HA??????????????????????state

2020-07-01 文章 MuChen
hi
??


Best,
MuChen.




----
??:""https://blog.csdn.net/cndotaci/article/details/106870413
 
??flinkyarn??2??6yarn??

 

 1. 

 2. HAstate??

 flink??1.10.0

 flink-conf.yaml??
 $ grep -v ^# flink-conf.yaml |grep -v ^$ jobmanager.rpc.address: localhost
 jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m
 taskmanager.memory.process.size: 1568m taskmanager.numberOfTaskSlots: 1
 parallelism.default: 1 high-availability: zookeeper
 high-availability.storageDir: hdfs:///flink/ha/
 high-availability.zookeeper.quorum:
 uhadoop-op3raf-master1,uhadoop-op3raf-master2,uhadoop-op3raf-core1
 state.checkpoints.dir: hdfs:///flink/checkpoint state.savepoints.dir:
 hdfs:///flink/flink-savepoints state.checkpoints.num-retained:60
 state.backend.incremental: true jobmanager.execution.failover-strategy:
 region jobmanager.archive.fs.dir: hdfs:///flink/flink-jobs/
 historyserver.web.port: 8082 historyserver.archive.fs.dir:
 hdfs:///flink/flink-jobs/ historyserver.archive.fs.refresh-interval: 1
 # HA yarn.application-attempts: 2
 ssh??jm??kill
 [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 16853
 YarnTaskExecutorRunner 17527 PrestoServer 33289 YarnTaskExecutorRunner
 18026 YarnJobClusterEntrypoint 20283 Jps 39599 NodeManager
 [root@uhadoop-op3raf-task48 ~]# kill -9 18026 [root@uhadoop-op3raf-task48
 ~]# jps 34785 YarnTaskExecutorRunner 16853 -- process information
 unavailable 17527 PrestoServer 21383 Jps 33289 YarnTaskExecutorRunner 20412
 YarnJobClusterEntrypoint 39599 NodeManager [root@uhadoop-op3raf-task48
 ~]# kill -9 20412 [root@uhadoop-op3raf-task48 ~]# jps 34785
 YarnTaskExecutorRunner 21926 YarnJobClusterEntrypoint 23207 Jps 17527
 PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager
 [root@uhadoop-op3raf-task48 ~]# kill -9 21926 [root@uhadoop-op3raf-task48
 ~]# jps 34785 YarnTaskExecutorRunner 23318 YarnJobClusterEntrypoint 26279
 Jps 17527 PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager
 [root@uhadoop-op3raf-task48 ~]# kill -9 23318

flink????yarn??HA??????????????HA??????????????????????state

2020-07-01 文章 MuChen
hi??all??

??https://blog.csdn.net/cndotaci/article/details/106870413??flinkyarn??2??6yarn??



1. 

2. HAstate??

flink??1.10.0

flink-conf.yaml??
$ grep -v ^# flink-conf.yaml |grep -v ^$ jobmanager.rpc.address: localhost 
jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m 
taskmanager.memory.process.size: 1568m taskmanager.numberOfTaskSlots: 1 
parallelism.default: 1 high-availability: zookeeper 
high-availability.storageDir: hdfs:///flink/ha/ 
high-availability.zookeeper.quorum: 
uhadoop-op3raf-master1,uhadoop-op3raf-master2,uhadoop-op3raf-core1 
state.checkpoints.dir: hdfs:///flink/checkpoint state.savepoints.dir: 
hdfs:///flink/flink-savepoints state.checkpoints.num-retained:60 
state.backend.incremental: true jobmanager.execution.failover-strategy: region 
jobmanager.archive.fs.dir: hdfs:///flink/flink-jobs/ historyserver.web.port: 
8082 historyserver.archive.fs.dir: hdfs:///flink/flink-jobs/ 
historyserver.archive.fs.refresh-interval: 1 # HA 
yarn.application-attempts: 2 
ssh??jm??kill
[root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 16853 
YarnTaskExecutorRunner 17527 PrestoServer 33289 YarnTaskExecutorRunner 18026 
YarnJobClusterEntrypoint 20283 Jps 39599 NodeManager 
[root@uhadoop-op3raf-task48 ~]# kill -9 18026 [root@uhadoop-op3raf-task48 ~]# 
jps 34785 YarnTaskExecutorRunner 16853 -- process information unavailable 17527 
PrestoServer 21383 Jps 33289 YarnTaskExecutorRunner 20412 
YarnJobClusterEntrypoint 39599 NodeManager [root@uhadoop-op3raf-task48 ~]# kill 
-9 20412 [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 21926 
YarnJobClusterEntrypoint 23207 Jps 17527 PrestoServer 33289 
YarnTaskExecutorRunner 39599 NodeManager [root@uhadoop-op3raf-task48 ~]# kill 
-9 21926 [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 23318 
YarnJobClusterEntrypoint 26279 Jps 17527 PrestoServer 33289 
YarnTaskExecutorRunner 39599 NodeManager [root@uhadoop-op3raf-task48 ~]# kill 
-9 23318

?????? flinksql????????????????????

2020-06-29 文章 MuChen
??


??hive 
table??dag??hive??hive3??subtask??
https://s1.ax1x.com/2020/06/30/N4qxNq.png


subtaskrunning19??SUCCESS??




----
??:"Rui Li"https://s1.ax1x.com/2020/06/29/Nf2dIA.png
 gt;
 gt; INFO15:34??
 gt; 2020-06-29 14:53:20,260 INFOamp;nbsp;
 gt;
 
org.apache.flink.api.common.io.LocatableInputSplitAssigneramp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29
 gt; 14:53:22,845 INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.executiongraph.ExecutionGraphamp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Source: HiveTableSource(vid, q70) TablePath: 
dw.video_pic_title_q70,
 gt; PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5
 gt; 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED.
 2020-06-29
 gt; 15:34:52,982 INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.entrypoint.ClusterEntrypointamp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Shutting YarnSessionClusterEntrypoint down with application 
status
 gt; SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 
INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpointamp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Shutting down rest endpoint. 2020-06-29 15:34:53,072 
INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpointamp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Removing cache directory
 gt; /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui
 2020-06-29
 gt; 15:34:53,073 INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpointamp;nbsp;amp;nbsp;amp;nbsp;
 gt; - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29
 gt; 15:34:53,074 INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpointamp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Shut down complete. 2020-06-29 15:34:53,074 INFOamp;nbsp;
 gt;
 
org.apache.flink.yarn.YarnResourceManageramp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Shut down cluster because application is in SUCCEEDED, 
diagnostics
 null.
 gt; 2020-06-29 15:34:53,076 INFOamp;nbsp;
 gt;
 
org.apache.flink.yarn.YarnResourceManageramp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Unregister application from the YARN Resource Manager with final
 status
 gt; SUCCEEDED. 2020-06-29 15:34:53,088 INFOamp;nbsp;
 gt;
 
org.apache.hadoop.yarn.client.api.impl.AMRMClientImplamp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Waiting for application to be successfully unregistered. 
2020-06-29
 gt; 15:34:53,306 INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentamp;nbsp;
 gt; - Closing components. 2020-06-29 15:34:53,308 INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcessamp;nbsp;
 gt; - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309
 gt; INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.dispatcher.StandaloneDispatcheramp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1
 :38817/user/dispatcher.
 gt; 2020-06-29 15:34:53,310 INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.dispatcher.StandaloneDispatcheramp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Stopping all currently running jobs of dispatcher
 gt; akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher.
 2020-06-29
 gt; 15:34:53,311 INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.jobmaster.JobMasteramp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Stopping the JobMaster for job default: insert into
 gt; rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322
 INFOamp;nbsp;
 gt;
 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImplamp;nbsp; -
 gt; Interrupted while waiting for queue
 gt;
 
java.lang.InterruptedExceptionamp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; at
 gt;
 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; at
 gt;
 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; at
 gt;
 

?????? flinksql????????????????????

2020-06-29 文章 MuChen



$ grep -v \# sql-client-defaults.yaml |grep -v ^$ catalogs:- name: myhive   
  type: hive hive-conf-dir: /home/fsql/hive/conf default-database: 
default execution:   planner: blink   type: streaming   time-characteristic: 
event-time   periodic-watermarks-interval: 200   result-mode: table   
max-table-result-rows: 100   parallelism: 4   max-parallelism: 128   
min-idle-state-retention: 0   max-idle-state-retention: 0   current-catalog: 
myhive   current-database: default   restart-strategy: type: fixed-delay 
deployment:   response-timeout: 5000   gateway-address: ""   gateway-port: 0




----
??:"zhisheng"https://s1.ax1x.com/2020/06/29/Nf2dIA.png

 INFO15:34??
 2020-06-29 14:53:20,260 INFOnbsp;
 
org.apache.flink.api.common.io.LocatableInputSplitAssignernbsp;nbsp;nbsp;
 - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29
 14:53:22,845 INFOnbsp;
 
org.apache.flink.runtime.executiongraph.ExecutionGraphnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70,
 PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5
 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. 2020-06-29
 15:34:52,982 INFOnbsp;
 
org.apache.flink.runtime.entrypoint.ClusterEntrypointnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 - Shutting YarnSessionClusterEntrypoint down with application status
 SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFOnbsp;
 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpointnbsp;nbsp;nbsp;
 - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFOnbsp;
 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpointnbsp;nbsp;nbsp;
 - Removing cache directory
 /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 2020-06-29
 15:34:53,073 INFOnbsp;
 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpointnbsp;nbsp;nbsp;
 - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29
 15:34:53,074 INFOnbsp;
 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpointnbsp;nbsp;nbsp;
 - Shut down complete. 2020-06-29 15:34:53,074 INFOnbsp;
 
org.apache.flink.yarn.YarnResourceManagernbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 - Shut down cluster because application is in SUCCEEDED, diagnostics null.
 2020-06-29 15:34:53,076 INFOnbsp;
 
org.apache.flink.yarn.YarnResourceManagernbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 - Unregister application from the YARN Resource Manager with final status
 SUCCEEDED. 2020-06-29 15:34:53,088 INFOnbsp;
 
org.apache.hadoop.yarn.client.api.impl.AMRMClientImplnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 - Waiting for application to be successfully unregistered. 2020-06-29
 15:34:53,306 INFOnbsp;
 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentnbsp;
 - Closing components. 2020-06-29 15:34:53,308 INFOnbsp;
 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcessnbsp;
 - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309
 INFOnbsp;
 
org.apache.flink.runtime.dispatcher.StandaloneDispatchernbsp;nbsp;nbsp;nbsp;nbsp;
 - Stopping dispatcher 
akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher.
 2020-06-29 15:34:53,310 INFOnbsp;
 
org.apache.flink.runtime.dispatcher.StandaloneDispatchernbsp;nbsp;nbsp;nbsp;nbsp;
 - Stopping all currently running jobs of dispatcher
 akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29
 15:34:53,311 INFOnbsp;
 
org.apache.flink.runtime.jobmaster.JobMasternbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 - Stopping the JobMaster for job default: insert into
 rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 
INFOnbsp;
 org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImplnbsp; 
-
 Interrupted while waiting for queue
 
java.lang.InterruptedExceptionnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 at
 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 at
 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 at
 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 at
 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
 2020-06-29 15:34:53,324 INFOnbsp;
 
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxynbsp;
 - Opening proxy : uhadoop-op3raf-core12:2

 nbsp;
 ps:amp;nbsp;

 1. kafka
 2. flink1.10.0
 ??SUCCEEDED

 ??




 

flinksql????????????????????

2020-06-29 文章 MuChen
hi,

yarn-session??bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm 
fsql-cli 21 

sql-clientsql??

kafkahive??joinmysql

succeeded??https://s1.ax1x.com/2020/06/29/Nf2dIA.png

INFO15:34??
2020-06-29 14:53:20,260 INFO  
org.apache.flink.api.common.io.LocatableInputSplitAssigner- Assigning 
remote split to host uhadoop-op3raf-core12 2020-06-29 14:53:22,845 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70, PartitionPruned: 
false, PartitionNums: null (1/1) (68c24aa5 9c898cefbb20fbc929ddbafd) switched 
from RUNNING to FINISHED. 2020-06-29 15:34:52,982 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting 
YarnSessionClusterEntrypoint down with application status SUCCEEDED. 
Diagnostics null. 2020-06-29 15:34:52,984 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Shutting down 
rest endpoint. 2020-06-29 15:34:53,072 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Removing cache 
directory /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 
2020-06-29 15:34:53,073 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- 
http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29 15:34:53,074 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Shut down 
complete. 2020-06-29 15:34:53,074 INFO  
org.apache.flink.yarn.YarnResourceManager - Shut down 
cluster because application is in SUCCEEDED, diagnostics null. 2020-06-29 
15:34:53,076 INFO  org.apache.flink.yarn.YarnResourceManager
 - Unregister application from the YARN Resource Manager with final status 
SUCCEEDED. 2020-06-29 15:34:53,088 INFO  
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Waiting for 
application to be successfully unregistered. 2020-06-29 15:34:53,306 INFO  
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
  - Closing components. 2020-06-29 15:34:53,308 INFO  
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess  - 
Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Stopping 
dispatcher akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 
2020-06-29 15:34:53,310 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Stopping all 
currently running jobs of dispatcher 
akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 
15:34:53,311 INFO  org.apache.flink.runtime.jobmaster.JobMaster 
 - Stopping the JobMaster for job default: insert into 
rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 INFO  
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Interrupted 
while waiting for queue java.lang.InterruptedException at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
 at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) 
at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
 2020-06-29 15:34:53,324 INFO  
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
Opening proxy : uhadoop-op3raf-core12:2 

 
ps:

1. kafka
2. flink1.10.0
??SUCCEEDED

??




sql??
#  -- 
??5??vid??vid_group 
-- ??55mysql insert into 
rt_app.app_video_cover_abtest_test  select  begin_time,  vid,  vid_group,  
max(dv),  max(click),  max(vv),  max(effectivevv) from(  select   t1.begin_time 
begin_time,   t1.u_vid vid,   t1.u_vid_group vid_group,   dv,   click,   vv,   
if(effectivevv is null,0,effectivevv) effectivevv  from  (   -- dv??click??vv   
select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) begin_time,   
 cast(u_vid as bigint) u_vid,u_vid_group,
sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and 
u_c_module='M011',1,0)) dv,
sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and 
u_c_module='M011',1,0)) click,sum(if(concat(u_mod,'-',u_ac)='top-hits' and 
u_f_module='M011',1,0)) vv   FROM rt_ods.ods_applog_vidsplit   where u_vid is 
not null and trim(u_vid)<''and u_vid_group is not null and 
trim(u_vid_group) not in ('','-1')and (  (concat(u_mod,'-',u_ac) in 

??????flinksql????hbase??????????

2020-06-22 文章 MuChen
Hi,Roc Marshal:



Best,
MuChen.




----
??:"Roc Marshal"http://uhadoop-op3raf-core24:42976 
sql-client:
bin/sql-client.sh embedded 
hbaseflinksql??
# CREATE TABLE hbase_video_pic_title_q70 ( key 
string, cf1 ROW

flinksql????hbase??????????

2020-06-22 文章 MuChen
Hi, All:


??flinksqlhbase






hadoop??masterflink??

yarn-session:
bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli 21 
 # ?? [admin@uhadoop-op3raf-master2 
flink10]$ 2020-06-23 09:30:56,402 ERROR 
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
Authentication failed 2020-06-23 09:30:56,515 ERROR 
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
Authentication failed JobManager Web Interface: 
http://uhadoop-op3raf-core24:42976 
sql-client:
bin/sql-client.sh embedded 
hbaseflinksql??
#  CREATE TABLE hbase_video_pic_title_q70 (   key string,   cf1 ROW

?????? ???? yarn-session.sh??????????????????????????????

2020-06-22 文章 MuChen
hi,Yang Wang:


HDFSstaging($HOME/.flink/application_id)??*flink-conf.yaml*??-qu??-nm??-jm??-tm


??-n??-n


??


Best,
MuChen


----
??:"Yang Wang"https://imgchr.com/i/NJIn4x


 ??Flink session cluster
 ??root.default


 ??


 flink??1.10.0
 flink-conf.yaml??
 [fsql@10-42-63-116 conf]$ grep -v ^# flink-conf.yaml |grep -v ^$
 jobmanager.rpc.address: localhost
 jobmanager.rpc.port: 6123
 jobmanager.heap.size: 1024m
 taskmanager.memory.process.size: 2048m
 taskmanager.numberOfTaskSlots: 10
 parallelism.default: 1
 jobmanager.execution.failover-strategy: region

???? yarn-session.sh??????????????????????????????

2020-06-22 文章 MuChen
hi,all:


hadoop??A.


Abin/yarn-session.sh -n 5 -jm 1g -tm 4g -s 4 -qu root.flink -nm 
fsql-cli  /dev/null 21 yarn-session??
root.flinkfsql-cli.


yarn
https://imgchr.com/i/NJIn4x


??Flink session cluster
??root.default


??


flink??1.10.0
flink-conf.yaml??
[fsql@10-42-63-116 conf]$ grep -v ^# flink-conf.yaml |grep -v ^$
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.memory.process.size: 2048m
taskmanager.numberOfTaskSlots: 10
parallelism.default: 1
jobmanager.execution.failover-strategy: region

sql-client????????????????SUCCEEDED????

2020-06-18 文章 MuChen
hi,

yarn-session:bin/yarn-session.sh -n 5 -jm 1g -tm 4g -s 4 -qu root.flink 
-nm fsql-cli  /dev/null 21 

sql-clientsql??

kafkahive??joinmysql 
sql??
#  -- 
??5??vid??vid_group 
-- ??55mysql insert into 
rt_app.app_video_cover_abtest_test  select  begin_time,  vid,  vid_group,  
max(dv),  max(click),  max(vv),  max(effectivevv) from(  select   t1.begin_time 
begin_time,   t1.u_vid vid,   t1.u_vid_group vid_group,   dv,   click,   vv,   
if(effectivevv is null,0,effectivevv) effectivevv  from  (   -- dv??click??vv   
select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) begin_time,   
 cast(u_vid as bigint) u_vid,u_vid_group,
sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and 
u_c_module='M011',1,0)) dv,
sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and 
u_c_module='M011',1,0)) click,sum(if(concat(u_mod,'-',u_ac)='top-hits' and 
u_f_module='M011',1,0)) vv   FROM rt_ods.ods_applog_vidsplit   where u_vid is 
not null and trim(u_vid)<''and u_vid_group is not null and 
trim(u_vid_group) not in ('','-1')and (  (concat(u_mod,'-',u_ac) in 
('emptylog-video_display','emptylog-video_click')  and u_c_module='M011')  or  
(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011') )   group by 
TUMBLE(bjdt, INTERVAL '5' MINUTE),cast(u_vid as bigint),u_vid_group  ) 
t1  left join  (   -- effectivevv   selectbegin_time,u_vid,
u_vid_group,count(1) effectivevv   from   (select  begin_time,  u_vid,  
u_vid_group,  u_diu,  u_playid,  m_pt,  q70fromdw.video_pic_title_q70 a 
   join( select   CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS 
STRING) begin_time,  cast(u_vid as bigint) u_vid,  u_vid_group,  u_diu,  
u_playid,  max(u_playtime) m_pt FROM rt_ods.ods_applog_vidsplit where 
u_vid is not null and trim(u_vid)<''  and u_vid_group is not null and 
trim(u_vid_group) not in ('','-1')  and 
concat(u_mod,'-',u_ac)='emptylog-video_play_speed'  and u_f_module='M011'  and 
u_playtime0 group by   TUMBLE(bjdt, INTERVAL '5' MINUTE),  cast(u_vid 
as bigint),  u_vid_group,  u_diu,  u_playid) bon a.vid=b.u_vidgroup 
by   begin_time,  u_vid,  u_vid_group,  u_diu,  u_playid,  m_pt,  q70   ) temp  
 where m_pt=q70   group by begin_time,u_vid,u_vid_group  ) t2  
on t1.begin_time=t2.begin_time   and t1.u_vid=t2.u_vid   and 
t1.u_vid_group=t2.u_vid_group )t3   group by begin_time,  vid,  vid_group ; 
succeeded??https://s1.ax1x.com/2020/06/18/NnyX24.png

INFO??:
2020-06-17 21:27:07,968 INFO  
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Interrupted 
while waiting for queue java.lang.InterruptedException at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:201
 4) at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
 at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) 
at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
 
ps: 1. kafka
2. flink1.10.0
??SUCCEEDED

??

sql-client????????????????SUCCEEDED????

2020-06-18 文章 MuChen
hi,


yarn-session:bin/yarn-session.sh -n 5 -jm 1g -tm 4g -s 4 -qu root.flink 
-nm fsql-cli  /dev/null 21 
sql-clientsql??
kafkahive??joinmysql


??succeeded??







ps:kafka
??SUCCEEDED


??