退订

2024-03-13 文章
退订




flink异步IO超时时,如何释放连接池资源

2023-07-20 文章
代码如下:
package com.didichuxing.iov.func

import com.didichuxing.iov.util.JedisTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.async.{ResultFuture, 
RichAsyncFunction}
import org.slf4j.LoggerFactory
import redis.clients.jedis.Jedis

import java.util.concurrent.{CompletableFuture, ExecutorService, Executors}
import java.util.function.{Consumer, Supplier}
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.mutable

abstract class RedisMgetAsyncFunc[IN, OUT] extends RichAsyncFunction[IN, 
Option[OUT]] with RedisMgetAsyncBase[IN, OUT] {
 private val logger = LoggerFactory.getLogger(this.getClass)
 var service: ExecutorService = _
 var start_ts: Long = 0l
 var connect_complete_ts: Long = 0l
 var research_complete_ts: Long = 0l
 var timeout_ts: Long = 0l
 var end_ts: Long = 0l

 override def open(parameters: Configuration): Unit = {
   service = Executors.newFixedThreadPool(20)
 }

 override def close(): Unit = {
   service.shutdown()
 }

 override def asyncInvoke(input: IN, resultFuture: ResultFuture[Option[OUT]]): 
Unit = {
   try {
 CompletableFuture
   //异步发送查询请求
   .supplyAsync(new Supplier[mutable.Buffer[String]] {
 override def get(): mutable.Buffer[String] = {
   start_ts = System.currentTimeMillis()
   var jedis: Jedis = null
   try {
 jedis = JedisTool.getJedis()
 connect_complete_ts = System.currentTimeMillis()
 val gpsCellKeys = getKey(input)
 jedis.mget(gpsCellKeys: _*).asScala
   }
   catch {
 case exception: Exception => {
   logger.error("mget异步查询 redis获取连接失败:" + 
exception.fillInStackTrace())
   logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" + 
connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" + 
end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input)
   mutable.Buffer()
 }
   } finally {
 try {
   jedis.close()
 }
 catch {
   case exception: Exception => {
 logger.error("mget异步查询 redis关闭失败:" + 
exception.fillInStackTrace())
 logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" + 
connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" + 
end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input)
   }
 }
   }
 }
   }, service)
   //查询结果回调
   .thenAccept(new Consumer[mutable.Buffer[String]] {
 override def accept(gpsCellResults: mutable.Buffer[String]): Unit = {
   research_complete_ts = System.currentTimeMillis()
   process(input, gpsCellResults, resultFuture)
   end_ts = System.currentTimeMillis()
 }
   })
   }
   catch {
 case exception: Exception => {
   logger.error("mget异步查询 失败:" + exception.fillInStackTrace())
   logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" + 
connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" + 
end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input)
 }
   resultFuture.complete(Iterable(None))
   }
 }

 override def timeout(input: IN, resultFuture: ResultFuture[Option[OUT]]): Unit 
= {
   timeout_ts = System.currentTimeMillis()
   logger.error("mget异步查询 开始:" + start_ts + ",redis连接获取成功:" + 
connect_complete_ts + ",查询成功,开始结果过滤:" + research_complete_ts + ",异步方法回调完成:" + 
end_ts + ",异步查询超时,超时时间:" + timeout_ts + ",输入参数:" + input)
   resultFuture.complete(Iterable(None))
   //super.timeout(input, resultFuture)
 }
}




Re:​异步IO算子无法完成checkpoint

2021-10-11 文章
图片上传到附件中了
















在 2021-10-12 10:33:12,"李一飞"  写道:

异步IO算子无法完成checkpoint,帮忙看下是什么原因  




 

​异步IO算子无法完成checkpoint

2021-10-11 文章
异步IO算子无法完成checkpoint,帮忙看下是什么原因  

Flink写clickhouse怎么实现精准一次性

2021-05-07 文章
请问   Flink写clickhouse怎么实现精准一次性,有啥好办法呀

Re:请问在使用processfunction 中的processelement()和onTimer()需要考虑并发问题吗?

2021-04-22 文章
这两方法是同步的方式执行的,同时只能执行一个
在 2021-04-22 15:35:07,"x2009438"  写道:
>如题,谢谢各位。
>
>
>发自我的iPhone


Re:Re: flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-22 文章
明白了,谢谢~
在 2021-04-21 19:58:23,"Peihui He"  写道:
>fetch.min.bytes
>fetch.wait.max.ms
>还可以用着两个参数控制下的
>
>熊云昆  于2021年4月21日周三 下午7:10写道:
>
>> 有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条
>>
>>
>> | |
>> 熊云昆
>> |
>> |
>> 邮箱:xiongyun...@163.com
>> |
>>
>> 签名由 网易邮箱大师 定制
>>
>> 在2021年04月20日 18:19,李一飞 写道:
>> flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置
>> 最好分流、批场景回答一下,谢谢!


Re:回复:flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-22 文章
谢谢
在 2021-04-21 19:10:17,"熊云昆"  写道:
>有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条
>
>
>| |
>熊云昆
>|
>|
>邮箱:xiongyun...@163.com
>|
>
>签名由 网易邮箱大师 定制
>
>在2021年04月20日 18:19,李一飞 写道:
>flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置
>最好分流、批场景回答一下,谢谢!


flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-20 文章
flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置
最好分流、批场景回答一下,谢谢!