??????flink1.9.1????hdfs??????????keberos????????????????????
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /home/abc.keytab
security.kerberos.login.principal: abc/[email protected]



2020-04-14 11:14:20,650 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 288 @ 1586834050629 for job 45e07f380ba6607cb93b01bbbcd45729. 
2020-04-14 11:14:40,936 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
checkpoint 288 for job 45e07f380ba6607cb93b01bbbcd45729 (3705 bytes in 30224 
ms). 2020-04-14 11:19:25,501 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 289 @ 1586834350629 for job 45e07f380ba6607cb93b01bbbcd45729. 
2020-04-14 11:19:35,579 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Decline 
checkpoint 289 by task 7efc00b038a3f6a89892f83851ec2fde of job 
45e07f380ba6607cb93b01bbbcd45729 at container_1585822587175_2427_01_000004 @ 
host68 (dataPort=11329). 2020-04-14 11:19:35,580 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding 
checkpoint 289 of job 45e07f380ba6607cb93b01bbbcd45729. java.lang.Exception: 
Could not materialize checkpoint 289 for operator Source: canaltest_sourceKafka 
-> SourceConversion(table=[Unregistered_DataStream_1], fields=[_TOPIC, 
_MESSAGEKEY, _MESSAGE, _PARTITION, _OFFSET]) -> SinkConversionToTuple2 -> 
Map -> SourceConversion(table=[Unregistered_DataStream_4], fields=[_TOPIC, 
_MESSAGEKEY, _MESSAGE, _PARTITION, _OFFSET, PROCTIME]) -> 
Correlate(invocation=[PARSE_CANAL($cor0._MESSAGE)], 
correlate=[table(PARSE_CANAL($cor0._MESSAGE))], 
select=[_TOPIC,_MESSAGEKEY,_MESSAGE,_PARTITION,_OFFSET,PROCTIME,table_name,op_type,op_ts,current_ts,pos,primary_keys,data,before_data],
 rowType=[RecordType(VARCHAR(2147483647) _TOPIC, VARCHAR(2147483647) 
_MESSAGEKEY, VARCHAR(2147483647) _MESSAGE, INTEGER _PARTITION, BIGINT _OFFSET, 
TIME ATTRIBUTE(PROCTIME) PROCTIME, VARCHAR(2147483647) table_name, 
VARCHAR(2147483647) op_type, VARCHAR(2147483647) op_ts, VARCHAR(2147483647) 
current_ts, VARCHAR(2147483647) pos, VARCHAR(2147483647) primary_keys, 
VARCHAR(2147483647) data, VARCHAR(2147483647) before_data)], joinType=[INNER]) 
-> Calc(select=[op_type, primary_keys, data, before_data]) -> 
SinkConversionToTuple2 -> Sink: HBase (1/3).     at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:748) Caused by: 
java.util.concurrent.ExecutionException: java.io.IOException: Could not flush 
and close the file system output stream to null in order to obtain the stream 
state handle    at java.util.concurrent.FutureTask.report(FutureTask.java:122)  
at java.util.concurrent.FutureTask.get(FutureTask.java:192)     at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
     at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init&gt;(OperatorSnapshotFinalizer.java:53)
      at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
        ... 3 more Caused by: java.io.IOException: Could not flush and close 
the file system output stream to null in order to obtain the stream state 
handle   at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:334)
      at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179)
      at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
      at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
     ... 5 more Caused by: java.io.IOException: Could not open output stream 
for state backend       at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
   at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
  at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:309)
      ... 10 more Caused by: java.io.IOException: Failed on local exception: 
java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed 
[Caused by GSSException: No valid credentials provided (Mechanism level: Failed 
to find any Kerberos tgt)]; Host Details : local host is: "host68/10.45.47.68"; 
destination host is: "host66":8020;          at 
org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:772)      at 
org.apache.hadoop.ipc.Client.call(Client.java:1474)  at 
org.apache.hadoop.ipc.Client.call(Client.java:1401)  at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
   at com.sun.proxy.$Proxy18.create(Unknown Source)        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:295)
     at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)         at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)     at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
      at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy19.create(Unknown Source)        at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1721)
         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1657)        
 at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1582)         at 
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393)
        at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)  at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)  at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)  at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
  at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
   at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
      at 
org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
         at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
   ... 12 more Caused by: java.io.IOException: 
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]   at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:682)       
at java.security.AccessController.doPrivileged(Native Method)   at 
javax.security.auth.Subject.doAs(Subject.java:422)   at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
         at 
org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:645)
         at 
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:732)      at 
org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:370)         at 
org.apache.hadoop.ipc.Client.getConnection(Client.java:1523)         at 
org.apache.hadoop.ipc.Client.call(Client.java:1440)  ... 38 more Caused by: 
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]        at 
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
        at 
org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:413)    
     at 
org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:555)    
     at org.apache.hadoop.ipc.Client$Connection.access$1800(Client.java:370)    
     at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:724)       at 
org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:720)       at 
java.security.AccessController.doPrivileged(Native Method)   at 
javax.security.auth.Subject.doAs(Subject.java:422)   at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
         at 
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:719)      
... 41 more Caused by: GSSException: No valid credentials provided (Mechanism 
level: Failed to find any Kerberos tgt)   at 
sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
   at 
sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:122)
        at 
sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
         at 
sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:224)   
     at 
sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)     at 
sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)     at 
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
        ... 50 more

回复