JM???????????????????????????? checkpoint ??????

18:08:07.615 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering 
checkpoint 116 @ 1595239687615 for job acd456ff6f2f9f59ee89b126503c20f0.
18:08:07.628 [flink-akka.actor.default-dispatcher-420] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed 
checkpoint 116 for job acd456ff6f2f9f59ee89b126503c20f0 (74305 bytes in 13 ms).
18:08:08.615 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering 
checkpoint 117 @ 1595239688615 for job acd456ff6f2f9f59ee89b126503c20f0.
18:08:08.626 [flink-akka.actor.default-dispatcher-420] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed 
checkpoint 117 for job acd456ff6f2f9f59ee89b126503c20f0 (74305 bytes in 11 ms).
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job ty-bi-flink 
(acd456ff6f2f9f59ee89b126503c20f0) switched from state RUNNING to CANCELLING.
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (1/4) (4d8a61b0a71ff37d1e7d7da578878e55) switched from RUNNING to 
CANCELING.
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (2/4) (97909ed1fcf34f658a3b6d9b3e8ee412) switched from RUNNING to 
CANCELING.
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (3/4) (7be70346e0c7fc8f2b2224ca3a0907f0) switched from RUNNING to 
CANCELING.
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (4/4) (4df2905ee56b06d9fc384e4beb228015) switched from RUNNING to 
CANCELING.
18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (1/4) 
(87d4c7af7d5fb5f81bae48aae77de473) switched from RUNNING to CANCELING.
18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (2/4) 
(7dfdd54faf11bc364fb6afc3dfdfb4dd) switched from RUNNING to CANCELING.
18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (3/4) 
(9035e059e465b8c520edf37ec734b43e) switched from RUNNING to CANCELING.
18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (4/4) 
(e6ff47b0da505b2aa4d775d7821b8356) switched from RUNNING to CANCELING.
18:08:09.377 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (4/4) 
(e6ff47b0da505b2aa4d775d7821b8356) switched from CANCELING to CANCELED.
18:08:09.377 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (3/4) 
(9035e059e465b8c520edf37ec734b43e) switched from CANCELING to CANCELED.
18:08:09.378 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (2/4) 
(7dfdd54faf11bc364fb6afc3dfdfb4dd) switched from CANCELING to CANCELED.
18:08:09.378 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (1/4) 
(87d4c7af7d5fb5f81bae48aae77de473) switched from CANCELING to CANCELED.
18:08:09.378 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (1/4) (4d8a61b0a71ff37d1e7d7da578878e55) switched from CANCELING to 
CANCELED.
18:08:09.379 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (2/4) (97909ed1fcf34f658a3b6d9b3e8ee412) switched from CANCELING to 
CANCELED.
18:08:09.379 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (3/4) (7be70346e0c7fc8f2b2224ca3a0907f0) switched from CANCELING to 
CANCELED.
18:08:09.381 [flink-akka.actor.default-dispatcher-416] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (4/4) (4df2905ee56b06d9fc384e4beb228015) switched from CANCELING to 
CANCELED.
18:08:09.381 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job ty-bi-flink 
(acd456ff6f2f9f59ee89b126503c20f0) switched from state CANCELLING to CANCELED.
18:08:09.381 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Stopping 
checkpoint coordinator for job acd456ff6f2f9f59ee89b126503c20f0.
18:08:09.381 [flink-akka.actor.default-dispatcher-418] INFO  
o.a.f.runtime.checkpoint.StandaloneCompletedCheckpointStore  - Shutting 
down
18:08:09.381 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  - Checkpoint with 
ID 117 at 
'file:/opt/flink/flink-1.7.2/checkpoints/acd456ff6f2f9f59ee89b126503c20f0/chk-117'
 not discarded.
18:08:09.382 [flink-akka.actor.default-dispatcher-427] INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
acd456ff6f2f9f59ee89b126503c20f0 reached globally terminal state CANCELED.
18:08:09.384 [flink-akka.actor.default-dispatcher-416] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Stopping the JobMaster for 
job ty-bi-flink(acd456ff6f2f9f59ee89b126503c20f0).
18:08:09.385 [flink-akka.actor.default-dispatcher-416] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Close ResourceManager 
connection 7f7791cdc957a13cfaf639062c495fb9: JobManager is shutting down..
18:08:09.385 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
SlotPool.
18:08:09.385 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping SlotPool.
18:08:09.385 [flink-akka.actor.default-dispatcher-416] INFO  
o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Disconnect 
job manager 
[email protected]://flink@rcx51101:6123/user/jobmanager_4
 for job acd456ff6f2f9f59ee89b126503c20f0 from the resource manager.
18:08:09.385 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.JobManagerRunner  - JobManagerRunner 
already shutdown.
18:08:33.384 [flink-rest-server-netty-worker-thread-4] WARN  
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler  - Configuring 
the job submission via query parameters is deprecated. Please migrate to 
submitting a JSON request instead.
18:08:34.205 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Submitting job 
6dbecb3e4f536c2c92ca7931cba54fd2 (ty-bi-flink).
18:08:34.205 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting RPC endpoint 
for org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/jobmanager_5 .
18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Initializing job 
ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2).
18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Using restart strategy 
NoRestartStrategy for ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2).
18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting RPC endpoint 
for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at 
akka://flink/user/c8a89ca4-afcc-41c0-b121-bbfe4354e502 .
18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job recovers via 
failover strategy: full graph restart
18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Running initialization on 
master for job ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2).
18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Successfully ran 
initialization on master in 0 ms.
18:08:34.207 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Using application-defined 
state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend 
(checkpoints: 'file:/opt/flink/flink-1.7.2/checkpoints', savepoints: 'null', 
asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=UNDEFINED}
18:08:34.207 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Configuring 
application-defined state backend with job/cluster config
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.JobManagerRunner  - JobManager runner 
for job ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2) was granted leadership 
with session id 00000000-0000-0000-0000-000000000000 at 
akka.tcp://flink@rcx51101:6123/user/jobmanager_5.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Starting execution of job 
ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2)
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job ty-bi-flink 
(6dbecb3e4f536c2c92ca7931cba54fd2) switched from state CREATED to RUNNING.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (1/4) (c06f0e753f644bdbcfe50cc8d2364cf6) switched from CREATED to 
SCHEDULED.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (2/4) (5436dd5759d18472fcf171f5df9d9bc9) switched from CREATED to 
SCHEDULED.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (3/4) (2a8cf04be945d59a70a3d82f50b38cd6) switched from CREATED to 
SCHEDULED.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (4/4) (9797fe0ec397922dff0c8bde4fb89ba2) switched from CREATED to 
SCHEDULED.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (1/4) 
(4127cdcd8ad7bd2011b7f8a8330663b9) switched from CREATED to SCHEDULED.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (2/4) 
(c0e50cbfbab0b29973cc517056f3f561) switched from CREATED to SCHEDULED.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (3/4) 
(989517f5535736062e6ce870e30742ee) switched from CREATED to SCHEDULED.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (4/4) 
(eaf47a632d3e735f1341e1d6d4ec7b7f) switched from CREATED to SCHEDULED.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Connecting to 
ResourceManager 
akka.tcp://flink@rcx51101:6123/user/resourcemanager(00000000000000000000000000000000)
18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot 
request, no ResourceManager connected. Adding as pending request 
[SlotRequestId{092c50cc73f659cbca805205e07b239c}]
18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot 
request, no ResourceManager connected. Adding as pending request 
[SlotRequestId{16e0d6c68cbbf62c056758903c129661}]
18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot 
request, no ResourceManager connected. Adding as pending request 
[SlotRequestId{f727b58cc9c5abe1627216c5973f98b5}]
18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot 
request, no ResourceManager connected. Adding as pending request 
[SlotRequestId{bb8a60407b3fa9329ccc1ae8454bf239}]
18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Resolved ResourceManager 
address, beginning registration
18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Registration at 
ResourceManager attempt 1 (timeout=100ms)
18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO  
o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering 
job manager 
[email protected]://flink@rcx51101:6123/user/jobmanager_5
 for job 6dbecb3e4f536c2c92ca7931cba54fd2.
18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO  
o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registered 
job manager 
[email protected]://flink@rcx51101:6123/user/jobmanager_5
 for job 6dbecb3e4f536c2c92ca7931cba54fd2.
18:08:34.209 [flink-akka.actor.default-dispatcher-420] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - JobManager successfully 
registered at ResourceManager, leader id: 00000000000000000000000000000000.
18:08:34.209 [flink-akka.actor.default-dispatcher-420] INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Requesting new 
slot [SlotRequestId{bb8a60407b3fa9329ccc1ae8454bf239}] and profile 
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
18:08:34.210 [flink-akka.actor.default-dispatcher-420] INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Requesting new 
slot [SlotRequestId{f727b58cc9c5abe1627216c5973f98b5}] and profile 
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
18:08:34.210 [flink-akka.actor.default-dispatcher-415] INFO  
o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request 
slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, 
directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 
6dbecb3e4f536c2c92ca7931cba54fd2 with allocation id 
AllocationID{db118025945481bba66b8ffa734e4202}.
18:08:34.210 [flink-akka.actor.default-dispatcher-420] INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Requesting new 
slot [SlotRequestId{16e0d6c68cbbf62c056758903c129661}] and profile 
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
18:08:34.210 [flink-akka.actor.default-dispatcher-420] INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Requesting new 
slot [SlotRequestId{092c50cc73f659cbca805205e07b239c}] and profile 
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
18:08:34.210 [flink-akka.actor.default-dispatcher-415] INFO  
o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request 
slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, 
directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 
6dbecb3e4f536c2c92ca7931cba54fd2 with allocation id 
AllocationID{d5147bcc731a51f09bdb32e366d93b02}.
18:08:34.210 [flink-akka.actor.default-dispatcher-415] INFO  
o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request 
slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, 
directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 
6dbecb3e4f536c2c92ca7931cba54fd2 with allocation id 
AllocationID{14001529dd0d04ebbd169241cb59f918}.
18:08:34.210 [flink-akka.actor.default-dispatcher-415] INFO  
o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Request 
slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, 
directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 
6dbecb3e4f536c2c92ca7931cba54fd2 with allocation id 
AllocationID{bb02373b91c626c6fde666512d5b62ed}.
18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (1/4) (c06f0e753f644bdbcfe50cc8d2364cf6) switched from SCHEDULED to 
DEPLOYING.
18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Deploying 
Source: Custom Source (1/4) (attempt #0) to rcx51102
18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (3/4) (2a8cf04be945d59a70a3d82f50b38cd6) switched from SCHEDULED to 
DEPLOYING.
18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Deploying 
Source: Custom Source (3/4) (attempt #0) to rcx51102
18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (2/4) (5436dd5759d18472fcf171f5df9d9bc9) switched from SCHEDULED to 
DEPLOYING.
18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Deploying 
Source: Custom Source (2/4) (attempt #0) to rcx51102
18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (4/4) (9797fe0ec397922dff0c8bde4fb89ba2) switched from SCHEDULED to 
DEPLOYING.
18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Deploying 
Source: Custom Source (4/4) (attempt #0) to rcx51102
18:08:34.220 [flink-akka.actor.default-dispatcher-378] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (4/4) 
(eaf47a632d3e735f1341e1d6d4ec7b7f) switched from SCHEDULED to DEPLOYING.
18:08:34.220 [flink-akka.actor.default-dispatcher-378] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Deploying 
map_sub_order_detail -> Sink: Print to Std. Out (4/4) (attempt #0) to 
rcx51102
18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (3/4) 
(989517f5535736062e6ce870e30742ee) switched from SCHEDULED to DEPLOYING.
18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Deploying 
map_sub_order_detail -> Sink: Print to Std. Out (3/4) (attempt #0) to 
rcx51102
18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (2/4) 
(c0e50cbfbab0b29973cc517056f3f561) switched from SCHEDULED to DEPLOYING.
18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Deploying 
map_sub_order_detail -> Sink: Print to Std. Out (2/4) (attempt #0) to 
rcx51102
18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (1/4) 
(4127cdcd8ad7bd2011b7f8a8330663b9) switched from SCHEDULED to DEPLOYING.
18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Deploying 
map_sub_order_detail -> Sink: Print to Std. Out (1/4) (attempt #0) to 
rcx51102
18:08:34.506 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 
triggering task Source: Custom Source (1/4) of job 
6dbecb3e4f536c2c92ca7931cba54fd2 is not in state RUNNING but DEPLOYING instead. 
Aborting checkpoint.
18:08:35.036 [flink-akka.actor.default-dispatcher-430] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (3/4) 
(989517f5535736062e6ce870e30742ee) switched from DEPLOYING to RUNNING.
18:08:35.037 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (2/4) 
(c0e50cbfbab0b29973cc517056f3f561) switched from DEPLOYING to RUNNING.
18:08:35.057 [flink-akka.actor.default-dispatcher-430] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (4/4) 
(eaf47a632d3e735f1341e1d6d4ec7b7f) switched from DEPLOYING to RUNNING.
18:08:35.058 [flink-akka.actor.default-dispatcher-430] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (1/4) 
(4127cdcd8ad7bd2011b7f8a8330663b9) switched from DEPLOYING to RUNNING.
18:08:35.069 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (4/4) (9797fe0ec397922dff0c8bde4fb89ba2) switched from DEPLOYING to 
RUNNING.
18:08:35.070 [flink-akka.actor.default-dispatcher-430] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (1/4) (c06f0e753f644bdbcfe50cc8d2364cf6) switched from DEPLOYING to 
RUNNING.
18:08:35.076 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (3/4) (2a8cf04be945d59a70a3d82f50b38cd6) switched from DEPLOYING to 
RUNNING.
18:08:35.076 [flink-akka.actor.default-dispatcher-430] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (2/4) (5436dd5759d18472fcf171f5df9d9bc9) switched from DEPLOYING to 
RUNNING.
18:08:35.506 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering 
checkpoint 1 @ 1595239715506 for job 6dbecb3e4f536c2c92ca7931cba54fd2.
18:08:35.530 [flink-akka.actor.default-dispatcher-430] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed 
checkpoint 1 for job 6dbecb3e4f536c2c92ca7931cba54fd2 (74134 bytes in 24 ms).







------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<[email protected]&gt;;
????????:&nbsp;2020??7??17??(??????) ????10:58
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: state??????checkpoint??????



Hi

1 ???????????????????????????????????????? JM log ???????????? checkpoint ??????
2. ?????????????????????????????? key ???? state ???????????????? state 
??????????????state ?????????????? key
??????keyby ?? key??

Best,
Congxian


sun <[email protected]&gt; ??2020??7??17?????? ????5:22??????

&gt; ??????counts ?????? ???????????????????? List<String&amp;gt; list =
&gt; Lists.newArrayList(counts.get()) ;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
for(String ss : list){
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 System.out.println("!!!" + ss);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 log.info("!!!" + ss);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
}????????????????????????????????????????????????????
&gt; @Slf4j
&gt; public class FlatMapTestState extends RichFlatMapFunction<String,
&gt; Test222&amp;gt; {
&gt;
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; private transient ListState<String&amp;gt; counts;
&gt;
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; @Override
&gt;&nbsp;&nbsp;&nbsp;&nbsp; public void open(Configuration parameters) throws 
Exception {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; StateTtlConfig ttlConfig = 
StateTtlConfig
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 .newBuilder(Time.minutes(30))
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
&gt;
&gt; .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 .build();
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
ListStateDescriptor<String&amp;gt; lastUserLogin = new
&gt; ListStateDescriptor<&amp;gt;("lastUserLogin", String.class);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
lastUserLogin.enableTimeToLive(ttlConfig);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; counts = 
getRuntimeContext().getListState(lastUserLogin);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt;
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; @Override
&gt;&nbsp;&nbsp;&nbsp;&nbsp; public void flatMap(String s, 
Collector<Test222&amp;gt; collector) throws
&gt; Exception {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
Test222 message = JSONUtil.toObject(s, new
&gt; TypeReference<Test222&amp;gt;() {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; });
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
System.out.println(DateUtil.toLongDateString(new Date()));
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
log.info(DateUtil.toLongDateString(new Date()));
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
counts.add(message.getId());
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
List<String&amp;gt; list = Lists.newArrayList(counts.get()) ;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
for(String ss : list){
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 System.out.println("!!!" + ss);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 log.info("!!!" + ss);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 log.info(DateUtil.toLongDateString(new Date()));
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
System.out.println(DateUtil.toLongDateString(new Date()));
&gt;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; }
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;????????&amp;nbsp;------------------
&gt; ??????:
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "user-zh"
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 <
&gt; [email protected]&amp;gt;;
&gt; ????????:&amp;nbsp;2020??7??16??(??????) ????8:16
&gt; ??????:&amp;nbsp;"user-zh"<[email protected]&amp;gt;;
&gt;
&gt; ????:&amp;nbsp;Re: state??????checkpoint??????
&gt;
&gt;
&gt;
&gt; Hi
&gt;
&gt; 1 counts ??????????????????????????????????????????????????????????
&gt; 2 ???????????? counts ??????????????????
&gt; 3. ?????????????? checkpoint ???????????????????? JM log ??????
&gt; 4. ?????????????????????????????????????????? state-process-api[1] 
?????????????????????????????? restore ??????????
&gt;
&gt; [1]
&gt;
&gt; 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
&gt; Best,
&gt; Congxian
&gt;
&gt;
&gt; sun <[email protected]&amp;gt; ??2020??7??16?????? ????6:16??????
&gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; 
????????env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
&gt; &amp;gt; //????????????????
&gt; &amp;gt; env.setRestartStrategy(RestartStrategies.noRestart());
&gt; &amp;gt; env.getCheckpointConfig().setCheckpointTimeout(500);
&gt; &amp;gt;
&gt; &amp;gt;
&gt; 
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
&gt; &amp;gt;
&gt; &amp;gt;
&gt; 
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
&gt; &amp;gt; env.setStateBackend(new
&gt; &amp;gt; RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
&gt; &amp;gt;&amp;nbsp;&amp;nbsp; ??????????????private transient 
ListState<String&amp;amp;gt; counts;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; @Override
&gt; &amp;gt; public void open(Configuration parameters) throws Exception {
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; StateTtlConfig ttlConfig 
= StateTtlConfig
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .newBuilder(Time.minutes(30))
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
&gt; &amp;gt;
&gt; &amp;gt; 
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .build();
&gt; &amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
ListStateDescriptor<String&amp;amp;gt;
&gt; lastUserLogin = new
&gt; &amp;gt; ListStateDescriptor<&amp;amp;gt;("lastUserLogin", String.class);
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
lastUserLogin.enableTimeToLive(ttlConfig);
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; counts =
&gt; getRuntimeContext().getListState(lastUserLogin);
&gt; &amp;gt; }
&gt; &amp;gt; ????????task managers ????????&amp;nbsp; counts&amp;nbsp; 
??????????????????

回复