代码如下:
package com.didichuxing.iov.func

import com.didichuxing.iov.util.JedisTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.async.{ResultFuture, 
RichAsyncFunction}
import org.slf4j.LoggerFactory
import redis.clients.jedis.Jedis

import java.util.concurrent.{CompletableFuture, ExecutorService, Executors}
import java.util.function.{Consumer, Supplier}
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.mutable

abstract class RedisMgetAsyncFunc[IN, OUT] extends RichAsyncFunction[IN, 
Option[OUT]] with RedisMgetAsyncBase[IN, OUT] {
 private val logger = LoggerFactory.getLogger(this.getClass)
 var service: ExecutorService = _
 var start_ts: Long = 0l
 var connect_complete_ts: Long = 0l
 var research_complete_ts: Long = 0l
 var timeout_ts: Long = 0l
 var end_ts: Long = 0l

 override def open(parameters: Configuration): Unit = {
   service = Executors.newFixedThreadPool(20)
 }

 override def close(): Unit = {
   service.shutdown()
 }

 override def asyncInvoke(input: IN, resultFuture: ResultFuture[Option[OUT]]): 
Unit = {
   try {
     CompletableFuture
       //异步发送查询请求
       .supplyAsync(new Supplier[mutable.Buffer[String]] {
         override def get(): mutable.Buffer[String] = {
           start_ts = System.currentTimeMillis()
           var jedis: Jedis = null
           try {
             jedis = JedisTool.getJedis()
             connect_complete_ts = System.currentTimeMillis()
             val gpsCellKeys = getKey(input)
             jedis.mget(gpsCellKeys: _*).asScala
           }
           catch {
             case exception: Exception => {
               logger.error("mget异步查询 redis获取连接失败:" + 
exception.fillInStackTrace())
               logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" + 
connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" + 
end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input)
               mutable.Buffer()
             }
           } finally {
             try {
               jedis.close()
             }
             catch {
               case exception: Exception => {
                 logger.error("mget异步查询 redis关闭失败:" + 
exception.fillInStackTrace())
                 logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" + 
connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" + 
end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input)
               }
             }
           }
         }
       }, service)
       //查询结果回调
       .thenAccept(new Consumer[mutable.Buffer[String]] {
         override def accept(gpsCellResults: mutable.Buffer[String]): Unit = {
           research_complete_ts = System.currentTimeMillis()
           process(input, gpsCellResults, resultFuture)
           end_ts = System.currentTimeMillis()
         }
       })
   }
   catch {
     case exception: Exception => {
       logger.error("mget异步查询 失败:" + exception.fillInStackTrace())
       logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" + 
connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" + 
end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input)
     }
       resultFuture.complete(Iterable(None))
   }
 }

 override def timeout(input: IN, resultFuture: ResultFuture[Option[OUT]]): Unit 
= {
   timeout_ts = System.currentTimeMillis()
   logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" + 
connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" + 
end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input)
   resultFuture.complete(Iterable(None))
   //    super.timeout(input, resultFuture)
 }
}


回复