代码如下:
package com.didichuxing.iov.func
import com.didichuxing.iov.util.JedisTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.async.{ResultFuture,
RichAsyncFunction}
import org.slf4j.LoggerFactory
import redis.clients.jedis.Jedis
import java.util.concurrent.{CompletableFuture, ExecutorService, Executors}
import java.util.function.{Consumer, Supplier}
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.mutable
abstract class RedisMgetAsyncFunc[IN, OUT] extends RichAsyncFunction[IN,
Option[OUT]] with RedisMgetAsyncBase[IN, OUT] {
private val logger = LoggerFactory.getLogger(this.getClass)
var service: ExecutorService = _
var start_ts: Long = 0l
var connect_complete_ts: Long = 0l
var research_complete_ts: Long = 0l
var timeout_ts: Long = 0l
var end_ts: Long = 0l
override def open(parameters: Configuration): Unit = {
service = Executors.newFixedThreadPool(20)
}
override def close(): Unit = {
service.shutdown()
}
override def asyncInvoke(input: IN, resultFuture: ResultFuture[Option[OUT]]):
Unit = {
try {
CompletableFuture
//异步发送查询请求
.supplyAsync(new Supplier[mutable.Buffer[String]] {
override def get(): mutable.Buffer[String] = {
start_ts = System.currentTimeMillis()
var jedis: Jedis = null
try {
jedis = JedisTool.getJedis()
connect_complete_ts = System.currentTimeMillis()
val gpsCellKeys = getKey(input)
jedis.mget(gpsCellKeys: _*).asScala
}
catch {
case exception: Exception => {
logger.error("mget异步查询 redis获取连接失败:" +
exception.fillInStackTrace())
logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" +
connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" +
end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input)
mutable.Buffer()
}
} finally {
try {
jedis.close()
}
catch {
case exception: Exception => {
logger.error("mget异步查询 redis关闭失败:" +
exception.fillInStackTrace())
logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" +
connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" +
end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input)
}
}
}
}
}, service)
//查询结果回调
.thenAccept(new Consumer[mutable.Buffer[String]] {
override def accept(gpsCellResults: mutable.Buffer[String]): Unit = {
research_complete_ts = System.currentTimeMillis()
process(input, gpsCellResults, resultFuture)
end_ts = System.currentTimeMillis()
}
})
}
catch {
case exception: Exception => {
logger.error("mget异步查询 失败:" + exception.fillInStackTrace())
logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" +
connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" +
end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input)
}
resultFuture.complete(Iterable(None))
}
}
override def timeout(input: IN, resultFuture: ResultFuture[Option[OUT]]): Unit
= {
timeout_ts = System.currentTimeMillis()
logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" +
connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" +
end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input)
resultFuture.complete(Iterable(None))
//super.timeout(input, resultFuture)
}
}