????Pyflink????kerberos??????hdfs????????????

2021-02-03 文章 ??????




Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException):
 org.apache.hadoop.security.AccessControlException: SIMPLE authentication is 
not enabled. Available:[TOKEN, KERBEROS]


kerberos??flink??

??????flinkSQL??ValueStateDescriptor????????StateTtlConfig

2021-02-03 文章 ???????L
streamTableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1));




----
??: "stgztsw"http://apache-flink.147419.n8.nabble.com/

flinkSQL的ValueStateDescriptor没有设置StateTtlConfig

2021-02-03 文章 stgztsw
目前用的是flink1.10的版本,发现flinkSQL的场景(rocksdb),除了streamingJoinOperator以外,ValueStateDescriptor都没有设置StateTtlConfig,这样的话会不会导致groupby之类的聚合操作的状态永远不会被清理,而导致越来越大?好像也没有任何配置来调整?这是不是不太合理?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

StreamAPi Window 在使用 .evictor(TimeEvictor.of(Time.seconds(0))).sum().print 报NullPoint

2021-02-03 文章 HunterXHunter
代码如下: evictor设置的在窗口触发前清理所有数据,按理进入sum是没有数据,但是调试的时候发现,sum经过计算会输出 null 进入
print,导致报 Nullpoint。不知道是bug还是我的问题;

class A {
String word;
Long time;

public A(String word, Long time) {
this.word = word;
this.time = time;
}
};
streamEnv.fromElements(new A("a", 1L))
.assignTimestampsAndWatermarks(
WatermarkStrategy.
forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner(((element,
recordTimestamp) -> element.time))
)
.keyBy(x -> x.time)
.map(x -> new Tuple2<>(x.word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy((KeySelector, String>) o -> o.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
.evictor(TimeEvictor.of(Time.seconds(0)))
.sum(1)
.print();
streamEnv.execute();



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink window evictor(TimeEvictor.of(Time.seconds(0))) 会出现 NullPoint问题

2021-02-03 文章 HunterXHunter
代码如下:
stream
.keyBy((KeySelector, String>) o -> o.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(100)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(2)))
.evictor(TimeEvictor.of(Time.seconds(0)))
.sum(1)
.print();

当 数据在窗口计算前被全部清除时,sum结果会是一个null,会传入print,导致 nullpoint



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.11 session cluster相关问题

2021-02-03 文章 zilong xiao
捞一下自己

zilong xiao  于2021年2月2日周二 上午10:35写道:

> 请问社区大佬,1.11版本的session
> cluster模式不支持在启动时指定启动taskmanager个数了吗?好像只能动态申请资源了?在1.4版本可以用-n,现在该参数已移除,为什么要这么做呢?我理解在启动一个session
> cluster的同时申请好TM个数也是一种常见场景吧?
>
> 求社区大佬指点
>


?????? ??????????????????????????????????

2021-02-03 文章 op





----
??: 
   "user-zh"



Re:回复: 如何在程序里面判断作业是否是重启了

2021-02-03 文章 zapjone



下游数据做好幂等操作,就不怕重复操作了。。














在 2021-02-04 11:26:56,"op" <520075...@qq.com> 写道:
>重启可能会导致数据重发,想加个告警
>
>
>
>
>--原始邮件--
>发件人:   
> "user-zh" 
>   
>发送时间:2021年2月4日(星期四) 中午11:11
>收件人:"user-zh"
>主题:Re: 如何在程序里面判断作业是否是重启了
>
>
>
>业务上的需求是什么?
>
>Best,
>tison.
>
>
>op <520075...@qq.com 于2021年2月4日周四 上午11:04写道:
>
> 大家好:
> nbsp;
> 
>nbsp;我在程序里通过RestartStrategies设置了重启策略,现在想在算子里面判断是否是触发了Restart,请问有哪些方法能实现呢?


?????? ??????????????????????????????????

2021-02-03 文章 op
??




----
??: 
   "user-zh"



pyflink1.12 定义源表后, 执行表关联的query效率较慢?

2021-02-03 文章 肖越
不知道大家有没有遇到这种情况,请求大佬帮忙分析一下。


我在flink中定义了两张源表,分别对应于 Mysql 数据库中的表格,
表 a 有6934行数据;表 b 有11415574行数据;
在关联操作后,进行常规的SELECT  WHERE等操作,最后查找符合条件的250条数据。
最后是print() 查找结果操作,每次单机执行都会跑10分钟!


相比于,pyflink1.11 的connector read.query()操作慢了好多,
请问pyflink1.12中是什么操作增加了执行时间,是将query这部分操作放到flink执行了么?
是否有其他的改善方式?



Re: 如何在程序里面判断作业是否是重启了

2021-02-03 文章 tison
业务上的需求是什么?

Best,
tison.


op <520075...@qq.com> 于2021年2月4日周四 上午11:04写道:

> 大家好:
> 
> 我在程序里通过RestartStrategies设置了重启策略,现在想在算子里面判断是否是触发了Restart,请问有哪些方法能实现呢?


Flink window evictor(TimeEvictor.of(Time.seconds(0))) 会出现 NullPoint问题

2021-02-03 文章 HunterXHunter
当程序使用 
evictor(TimeEvictor.of(Time.seconds(0)))
来清除 窗口触发前数据时,当数据全部被清除了,在print时会报Null point

Caused by: java.lang.NullPointerException
at
org.apache.flink.api.common.functions.util.PrintSinkOutputWriter.write(PrintSinkOutputWriter.java:73)
at
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction.invoke(PrintSinkFunction.java:81)


为什么没数据也会传一个 null 到 Sink?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


??????????????????????????????????

2021-02-03 文章 op

 
??RestartStrategiesRestart??

Re: HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

2021-02-03 文章 陈康
谢谢、我试试



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

2021-02-03 文章 Xingbo Huang
Hi,

你其实可以在open方法里面进行加载的,这样只会加载一次,在eval方法中加载将会导致多次加载。

Best,
Xingbo

陈康 <844256...@qq.com> 于2021年2月4日周四 上午9:25写道:

> 感谢回复、之前是在__init__方法中加载Keras模型、经钉钉群大佬指教在eval中使用再加载、问题解决了,谢谢!
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming update and delete changes

2021-02-03 文章 yang nick
flink window  doesn't support update stream.

HongHuangNeu  于2021年2月4日周四 上午9:24写道:

> 如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming
> update and delete changes,有没有什么替代方案?输入是来自于流式去重,就是
>
> SELECT [column_list]
> FROM (
>SELECT [column_list],
>  ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
>ORDER BY time_attr [asc|desc]) AS rownum
>FROM table_name)
> WHERE rownum = 1
>
> 这样的语句
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

2021-02-03 文章 陈康
感谢回复、之前是在__init__方法中加载Keras模型、经钉钉群大佬指教在eval中使用再加载、问题解决了,谢谢!



--
Sent from: http://apache-flink.147419.n8.nabble.com/

如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming update and delete changes

2021-02-03 文章 HongHuangNeu
如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming
update and delete changes,有没有什么替代方案?输入是来自于流式去重,就是

SELECT [column_list]
FROM (
   SELECT [column_list],
 ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
   ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1

这样的语句



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: PyFlink How to set timeout for UDF

2021-02-03 文章 Dian Fu
先理解一下你的需求:是说Python UDF的实现,处理一条数据的时间可能非常长,如果很长时间没有执行完,希望作业停止?

> 在 2021年2月3日,下午1:04,苗红宾  写道:
> 
> Hi:
> 
> Hope you are doing well!
> 
> My UDF always running in a long time, so I'm wondering, how to set timeout 
> for UDF in Pyflink, in order to auto-stop the execution when it running in a 
> long time.
> 
> Many Thanks!
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 



如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming update and delete changes

2021-02-03 文章 HongHuangNeu
如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming
update and delete changes,有没有什么替代方案?输入是来自于流式去重,就是

SELECT [column_list]
FROM (
   SELECT [column_list],
 ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
   ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1

这样的语句



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

2021-02-03 文章 Dian Fu
可以发一下你的__init__方法吗?应该是在__init__方法里有不能pickle的对象。

> 在 2021年2月3日,下午6:01,陈康 <844256...@qq.com> 写道:
> 
>  
> https://blog.csdn.net/weixin_44904816/article/details/108744530
> 看到一篇博客说:“PyFlink以后还可以支持 Tensorflow、Keras”.好吧..
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: 请问,flink支持StreamFileSink在将pending文件转为finished的时候做一些操作吗

2021-02-03 文章 Paul Lam
如果使用 RollOnCheckpoint 的文件滚动策略,可以开发一个 UDF 实现 CheckpointListener 接口,在 
notifyCheckpointComplete 函数里面发消息。不过要注意这个消息可能会重复。

Best,
Paul Lam

> 2021年2月3日 17:36,上官 <17635713...@163.com> 写道:
> 
> 各位大神,我的工作需要Flink将DataStream中的数据写入到HDFS上,我需要在flink将写入文件变为下游可读的时候,发送一个消息到消息队列,请问Flink支持这种操作吗



Re: HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

2021-02-03 文章 陈康
 
https://blog.csdn.net/weixin_44904816/article/details/108744530
看到一篇博客说:“PyFlink以后还可以支持 Tensorflow、Keras”.好吧..



--
Sent from: http://apache-flink.147419.n8.nabble.com/


请问,flink支持StreamFileSink在将pending文件转为finished的时候做一些操作吗

2021-02-03 文章 上官
各位大神,我的工作需要Flink将DataStream中的数据写入到HDFS上,我需要在flink将写入文件变为下游可读的时候,发送一个消息到消息队列,请问Flink支持这种操作吗

HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

2021-02-03 文章 陈康
在pyflink自定义UDF加载Keras模型并注册UDF时、报错:TypeError: can't pickle _thread.lock
objects、有大佬遇到过吗?谢谢!(插入的图不知看不看的到)

class myKerasMLP(ScalarFunction):
def __init__(self):
...

def open(self, function_context):
...

def eval(self, x, y):
...

def load_model(self):
"""
加载模型,如果 redis 里存在模型,则优先从 redis 加载,否则初始化一个新模型
:return:
"""
import redis
import pickle
import logging

logging.info('载入模型!')
r = redis.StrictRedis(**self.redis_params)
model = None

try:
# redis加载model json
model = model_from_json(r.get(self.model_name))
# redis加载model权重
weights = pickle.loads(r.get(self.weights))
# # 设置权重
model.set_weights(weights)
model.summary()
except TypeError:
logging.info('Redis 内没有指定名称的模型,因此初始化一个新模型')
except (redis.exceptions.RedisError, TypeError, Exception):
logging.warning('Redis 出现异常,因此初始化一个新模型')
finally:
print("MLP model", model)
return model

myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.FLOAT(),
DataTypes.FLOAT()],
 result_type=DataTypes.FLOAT())
print('UDF 模型加载完成!')
t_env.create_temporary_system_function('train_and_predict', myKerasMLP)
print('UDF 注册成功')
---
_
Layer (type) Output Shape  Param #   
=
dense_1 (Dense)  (None, 8) 72
_
dense_2 (Dense)  (None, 10)90
_
dense_3 (Dense)  (None, 1) 11
=
Total params: 173
Trainable params: 173
Non-trainable params: 0
_
MLP model 
UDF 模型加载完成!


 
 




--
Sent from: http://apache-flink.147419.n8.nabble.com/