代码如下: package com.didichuxing.iov.func import com.didichuxing.iov.util.JedisTool import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction} import org.slf4j.LoggerFactory import redis.clients.jedis.Jedis
import java.util.concurrent.{CompletableFuture, ExecutorService, Executors} import java.util.function.{Consumer, Supplier} import scala.collection.JavaConverters.asScalaBufferConverter import scala.collection.mutable abstract class RedisMgetAsyncFunc[IN, OUT] extends RichAsyncFunction[IN, Option[OUT]] with RedisMgetAsyncBase[IN, OUT] { private val logger = LoggerFactory.getLogger(this.getClass) var service: ExecutorService = _ var start_ts: Long = 0l var connect_complete_ts: Long = 0l var research_complete_ts: Long = 0l var timeout_ts: Long = 0l var end_ts: Long = 0l override def open(parameters: Configuration): Unit = { service = Executors.newFixedThreadPool(20) } override def close(): Unit = { service.shutdown() } override def asyncInvoke(input: IN, resultFuture: ResultFuture[Option[OUT]]): Unit = { try { CompletableFuture //异步发送查询请求 .supplyAsync(new Supplier[mutable.Buffer[String]] { override def get(): mutable.Buffer[String] = { start_ts = System.currentTimeMillis() var jedis: Jedis = null try { jedis = JedisTool.getJedis() connect_complete_ts = System.currentTimeMillis() val gpsCellKeys = getKey(input) jedis.mget(gpsCellKeys: _*).asScala } catch { case exception: Exception => { logger.error("mget异步查询 redis获取连接失败:" + exception.fillInStackTrace()) logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" + connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" + end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input) mutable.Buffer() } } finally { try { jedis.close() } catch { case exception: Exception => { logger.error("mget异步查询 redis关闭失败:" + exception.fillInStackTrace()) logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" + connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" + end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input) } } } } }, service) //查询结果回调 .thenAccept(new Consumer[mutable.Buffer[String]] { override def accept(gpsCellResults: mutable.Buffer[String]): Unit = { research_complete_ts = System.currentTimeMillis() process(input, gpsCellResults, resultFuture) end_ts = System.currentTimeMillis() } }) } catch { case exception: Exception => { logger.error("mget异步查询 失败:" + exception.fillInStackTrace()) logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" + connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" + end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input) } resultFuture.complete(Iterable(None)) } } override def timeout(input: IN, resultFuture: ResultFuture[Option[OUT]]): Unit = { timeout_ts = System.currentTimeMillis() logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" + connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" + end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input) resultFuture.complete(Iterable(None)) // super.timeout(input, resultFuture) } }