Re: flinksql ttl不生效

2021-06-09 Thread HunterXHunter
建议关闭state.backend.incremental ,因为我遇到过,开启增量ckp导致ckp一直增大,关掉就正常了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 【问题分析】Fink任务无限反压

2021-06-07 Thread HunterXHunter
掐头去尾的提问,完全不知道是什么问题,没法回答你,最好是贴出代码,贴出图片等大家才能帮忙分析



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

2021-06-03 Thread HunterXHunter
一般是 Job A出现背压了,checkpoint的时候是要等背压的数据都处理完了才会处理barrier。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-01 Thread HunterXHunter
那会一直增大下去吗,我跑了4天,ckp一直变大,没有稳定的迹象。是不是我需要调整compaction的配置



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 Thread HunterXHunter
我遇到过的问题就是开了 增量checkpoint后,checkpoint会越来越大,关闭之后checkpoint就正常了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 Thread HunterXHunter
关闭 增量checkpoint



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 维度表 处理时间

2021-05-20 Thread HunterXHunter
使用cdc的方式读取
mysql的更新数据,使用connect流的方式连接两个流,这样可以避免掉watermark才能触发计算的问题。建议在事实表join不到数据的时候
进行缓存用 onTime进行触发。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 请教,有哪些连接器能让我获得一个非 update/delete,但又有主键的流呢?

2021-05-18 Thread HunterXHunter
json格式改debezium-json试试



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql源表定义字段列表问题

2021-05-17 Thread HunterXHunter
不需要提供全部字段



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re:flink sql怎样将change log stream转换成append log stream?

2021-05-17 Thread HunterXHunter
 tableEnv.executeSql(DDLSourceSQLManager.cdcCreateTableSQL("order_info"));
tableEnv
.toRetractStream(tableEnv.from("order_info"), Row.class)
.filter((FilterFunction>)
booleanRowTuple2 -> booleanRowTuple2.f0)
.map((MapFunction, Row>)
booleanRowTuple2 -> booleanRowTuple2.f1)
.assignTimestampsAndWatermarks(
   
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner(((element, recordTimestamp)
-> System.currentTimeMillis(
.keyBy((KeySelector) row ->
row.getField("consignee").toString())
.window(TumblingEventTimeWindows.of(Time.seconds(100)))
   
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
.process(new ProcessWindowFunction,
String,
TimeWindow>() {
@Override
public void process(String s, Context context,
Iterable elements, Collector> out) throws
Exception {
Long count = 0L;
for (Row element : elements) {
count += 1;
}
out.collect(new Tuple2(context.window(),
count));
}
})
.print();
;
streamEnv.execute();



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Exception: Could not perform checkpoint

2021-05-17 Thread HunterXHunter
看源码是在
catch里面的(应该是在executeCheckpointing的时候报错了,但是catch里面还有一个nullpoint没catch导致程序退出):
if (LOG.isDebugEnabled()) {
LOG.debug("{} - did NOT finish 
synchronous part of checkpoint {}. " +
"Alignment duration: {} 
ms, snapshot duration {} ms",
owner.getName(), 
checkpointMetaData.getCheckpointId(),

checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,

checkpointMetrics.getSyncDurationMillis());
}

的 checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
部分报错的。建议关掉 debug日志看看。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于union之后的keyBy问题

2021-05-16 Thread HunterXHunter
这部分没人报bug,所以社区一般是不会去验证的。所以你要给出你的代码,同时对问题描述尽量清晰,否则很难有人去自己写代码验证问题。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Table-api sql 预检查

2021-04-29 Thread HunterXHunter
tableEnv.explainSql("");



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 背压问题

2021-04-28 Thread HunterXHunter
中间有错误数据或者其他错误原因,背压不会导致数据丢失



--
Sent from: http://apache-flink.147419.n8.nabble.com/

问题:flink 1.13编译 flink-parquet报错 -类重复:org.apache.flink.formats.parquet.generated.SimpleRecord

2021-04-26 Thread HunterXHunter
查看发现 

org.apache.avro
avro-maven-plugin
${avro.version}


generate-sources

schema



${project.basedir}/src/test/resources/avro

${project.basedir}/target/generated-test-sources/





这个编译插件的问题
同  flink 1.12 (编译没问题)比较发现:
区别:
${project.basedir}/src/test/java/

当我把flink 1.13/ master 改成这个就可以编译通过了。
我想知道有人遇到跟我一样的问题吗。是我环境的问题吗??





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: DataStreamAPI 与flink sql疑问

2021-04-26 Thread HunterXHunter
你试过吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 疑问:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大

2021-04-22 Thread HunterXHunter
没解决,我只能把它关闭了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 設置look up table source

2021-04-21 Thread HunterXHunter
理论上只要实现了LookupTableSource。你在 TableFunction 里面怎么重写 eval 都可以,不管你是要读取哪里的数据怎么读。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

2021-04-21 Thread HunterXHunter
在ddl的时候设置了 watermark。在任务页面查看watermark的时候一直没有更新watermark



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming update and delete changes

2021-04-19 Thread HunterXHunter
toAppendDataStream试试看



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于Flink SQL中Interval Join使用时watermark的疑惑

2021-04-15 Thread HunterXHunter
问题一: Interval Join doesn't support consuming update and delete
changes是因为输入A或者B是一个更新流
问题二:interval
join使用buffer来缓存A和B的数据,没有放在state里,只有在watermark超过下边界会触发onEventtime清理 buffer。
延迟问题:没有类似statettl的配置,interval join不受statettl控制清除状态
乱序问题:如果 B的数据时间小于 watermark则匹配不到,一切是跟watermmark相关

以上个人看源码理解的。希望有帮助



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink CEP事件查询,如何在flink运行中动态更新cep规则

2021-04-15 Thread HunterXHunter
可能需要修改源码:
https://blog.csdn.net/young_0609/article/details/110407781



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

2021-04-15 Thread HunterXHunter
但是用process-time是有数据的,目前用partition-time一直没成功写出过数据



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink CEP 使用Keyby之后无法输出结果

2021-04-15 Thread HunterXHunter
这不是一个问题,是我理解的问题。感谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于Flink SQL中Interval Join使用时watermark的疑惑

2021-04-15 Thread HunterXHunter
1: Interval Join doesn't support consuming update and delete changes
是因为A或B是一个update stream

2:
 Interval Join 的临时数据是放在buffer中,当wtm超过边界时间就会清理掉 buffer也就join不到了。所以
statettl无法控制A流的缓存数据。
 延迟问题:所以如果wtm不更新,A流的数据不会被清理因为不受statettl控制

 乱序问题:如果B流的旧时间小于 watermark就join不上

以上是个人理解、、



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-04-15 Thread HunterXHunter
有人知道这个bug吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 分组滚动窗口 无法触发计算,由于 watermark 没有被生成,或者被计算。

2021-04-12 Thread HunterXHunter
1.12默认是 eventtime不需要设置



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 求问Hive DDL TBLPROPERTIES不生效

2021-04-08 Thread HunterXHunter
你好,
1:我设置的时候就是 使用的 partition-time 同时
设定checkpoint间隔为60s。但是我发现watermark一直没有生成或者更新,导致我的数据一直无法commit。想知道
为什么watermark无法生成。当时使用process-time是没问题的。
2:因为写hive的话会有小文件的问题。所以我使用file sink来设置合并文件和控制文件大小。但是写文件是无法写hive
metastore。所以hive查不出数据。

想知道有什么方法解决hive小文件问题,难道只能T+1做小文件合并吗。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-sql 客户端采用execution.target: yarn-per-job 模式,如何指定提交的队列??

2021-04-02 Thread HunterXHunter
flink run -yD yarn.application.queue=x



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

2021-04-02 Thread HunterXHunter
重要: 在流模式下使用 FileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint
被禁用,部分文件(part file)将永远处于 'in-progress' 或 'pending' 状态,下游系统无法安全地读取。

在官方文档找到了这个,必须要有checkpoint才行,但是我 手动savepoint之后,虽然有sucess文件,但是没有数据



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

2021-04-01 Thread HunterXHunter
查看hdfs文件:
分区一直是这样的一个文件,没有生成 _SUCCESS文件
.part-40a2c94d-0437-4666-8d43-31c908aaa02e-0-0.inprogress.73dcc10b-44f4-47e3-abac-0c14bd59f9c9



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

2021-04-01 Thread HunterXHunter
你好,这个问题已经解决了。
我现在通过官方例子:


SET table.sql-dialect=default;

create table  flink_kafka(
sys_time bigint,
rt  AS TO_TIMESTAMP(FROM_UNIXTIME(sys_time / 1000, '-MM-dd HH:mm:ss')),
WATERMARK FOR rt AS rt - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'xx',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '',
'properties.group.id' = 'test-sql',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);

SET table.sql-dialect=hive;

CREATE TABLE hive_table (
  sys_time bigint
) PARTITIONED BY (dt STRING, hr STRING) STORED AS orc TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='process-time',
  'sink.partition-commit.delay'='0s',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);


INSERT INTO hive_table SELECT sys_time, DATE_FORMAT(rt, '-MM-dd') as dt,
DATE_FORMAT(rt, 'HH') as hr  FROM flink_kafka;

发现数据一直无法写入hive。程序没有报错,
select * from flink_kafka;是有数据的。
但是hive_table一直没有数据,
我发送各个时间段的数据,watermark应该也是超过了分区时间的,但是hive_table一直没有数据




--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink 1.12.2 sql-cli 写入Hive报错 is_generic

2021-04-01 Thread HunterXHunter
当配置好HiveCatalog后,
SQL-Cli 也可以查到hive库表信息
创建kafka表:

create table test.test_kafka(
word VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'xx',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'xx',
'properties.group.id' = 'test',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);
在 Hive里面可以查到改表
hive > DESCRIBE FORMATTED test_kafka
   ...
is_generic  true
   .

但是我在 Flink SQL > select * from test.test_kafka;
报错:
org.apache.flink.table.api.ValidationException: Unsupported options found
for connector 'kafka'.
Unsupported options:
is_generic
Supported options:
connector
format
json.fail-on-missing-field
json.ignore-parse-errors




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink1.11如何实现Tumble Window后基于event time倒序取第一条作统计

2021-03-17 Thread HunterXHunter
GroupWindowAggregate不支持update或者delete的datasource。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: interval join 如何用 process time

2021-03-17 Thread HunterXHunter
你不定义watermark好像用的就是pro time



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 提交两个SQL任务,其中一个不生效。

2021-03-11 Thread HunterXHunter
StatementSet inserts = tableEnv.createStatementSet();
inserts.addInsertSql("insert into xxx select * from xxx") // topic1
-》topic2任务
inserts.addInsertSql("insert into xxx select * from xxx") // topic2
-》Postgre 任务
inserts.execute();



--
Sent from: http://apache-flink.147419.n8.nabble.com/


疑问:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大

2021-03-11 Thread HunterXHunter
1:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大
我10分钟一次checkpoint,每次都增大2M,两天增大到400M,但其实我的实际应该只有20M(只做一个窗口计算)(我做savepoint之后也才20M)。
已设置了 ttl。
2:当我关闭state.backend.incremental 后 。每次checkpoint也就20M左右,不会变大了。

按我的理解:state.backend.incremental 开启后,Checkpointed Data
Size大小应该是增量的大小,可能也就几k左右。为什么会一直增大。

想问下这是为什么??是bug??



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql如何从远程加载jar包中的udf

2021-03-10 Thread HunterXHunter
通过 createTemporarySystemFunction 试试看呢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 如何定义时态表

2021-03-10 Thread HunterXHunter
把格式调整下,很乱看不明白



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-10 Thread HunterXHunter
试了 1.12.2,还是一样问题。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-10 Thread HunterXHunter
但是看情况好像是只有在:DataStream发生Keyby或者 setParallelism的时候才会发生



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-10 Thread HunterXHunter
1.12.1



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-09 Thread HunterXHunter
再试了一下:
修改并行度也不行
.setParallelism(9)




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-09 Thread HunterXHunter
经过再一次验证:
即使我做group by rowtime的操作,
我对datastream做keyby(rowtime) 也有这个问题
例如:
tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test "));
SingleOutputStreamOperator r =
tableEnv.toRetractStream(tableEnv.from("test3"), Row.class)
.filter(x -> x.f0)
.keyby(_.f1)
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(
WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(((element,
recordTimestamp) -> element.f1))
);

结果也是无法触发窗口



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-09 Thread HunterXHunter
https://github.com/zhp8341/flink-streaming-platform-web
这个你可以参考下



--
Sent from: http://apache-flink.147419.n8.nabble.com/


BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-08 Thread HunterXHunter
1:当DataStream是由 一个table 经过 group by rowtime 转换过来的就无法触发窗口

例如:
 tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test group by msg,rowtime"));

// 获得 DataStream,并定义wtm生成
SingleOutputStreamOperator r =
tableEnv.toRetractStream(tableEnv.from("test3"), Row.class)
.filter(x -> x.f0)
// map 
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(
WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(((element,
recordTimestamp) -> element.f1))
);


参考 官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html


// stream - 转 Table,指定Rowtime
tableEnv.createTemporaryView("test5",
r,
$("msg"),
$("rowtime").rowtime());

String sql5 = "select " +
"msg," +
"count(1) cnt" +
" from test5 " +
" group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " +
"";
tableEnv.executeSql("insert into printlnRetractSink " + sql5);


结果: 无法触发窗口操作。
查调试源码: org.apache.flink.table.runtime.operators.window.WindowOperator
// 返回的wtm永远都是 -9223372036854775808
public long getCurrentWatermark() {
return internalTimerService.currentWatermark();
}

//
查看任务,watermark是正常在生成的。InternalTimerServiceImpl.advanceWatermark是正常为currentWatermark赋值。但是
internalTimerService.currentWatermark() 却拿的是-9223372036854775808

// 当  tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select
msg,rowtime from test group by msg,rowtime"));
语句改为
tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test"));

结果就是正确的。
所以这是一个bug吗??








--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql中如何使用异步io关联维表?

2021-03-03 Thread HunterXHunter
定义一个 sourcetable



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: UDF 重复调用的问题、

2021-03-02 Thread HunterXHunter
为什么4次是没问题的,感觉只执行一次才是最优的



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于flink窗口state

2021-02-07 Thread HunterXHunter
 你这代码贴的乱七八糟。。。
你需要再richjoinfunction里面设置valuestate的生命周期,他不随着窗口而销毁,窗口只会销毁自己设定的state,有空你可以看看window的源码,里面有清理state的逻辑



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: getCurrentWatermark

2021-02-04 Thread HunterXHunter
currentMaxTimestamp 只是当前数据流里面最大,但不一定是全部的最大。
当数据出现延迟,或者多流的情况下,lastEmittedWatermark 不一定会比 currentMaxTimestamp 小



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql

2021-02-04 Thread HunterXHunter
我做了。。
添加了一个sql语法类似

"select " +
"msg," +
"count(1) cnt" +
" from test" +
" where msg = 'hello' " +
" group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " +
" EMIT \n" +
"  WITH DELAY '10' SECOND BEFORE WATERMARK,\n" +
"  WITHOUT DELAY AFTER WATERMARK";

每10s触发一次窗口计算。
参考阿里云的Emit。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: StreamAPi Window 在使用 .evictor(TimeEvictor.of(Time.seconds(0))).sum().print 报NullPoint

2021-02-04 Thread HunterXHunter
我认为这可能是一个bug (当然也可能是故意这样设计的):
在 EvictingWindowOperator.emitWindowContents()位置:
userFunction.process(triggerContext.key, triggerContext.window,
processContext, projectedContents, timestampedCollector);
当timestampedCollector的size = 0时;
执行到 ReduceApplyWindowFunction部分:
public void apply(K k, W window, Iterable input, Collector out) throws
Exception {
T curr = null;
for (T val: input) {
if (curr == null) {
curr = val;
} else {
curr = reduceFunction.reduce(curr, val);
}
}
wrappedFunction.apply(k, window, 
Collections.singletonList(curr), out);
}

wrappedFunction.apply(k, window, Collections.singletonList(curr),
out);将会产生一个Collections.singletonList(null)结果。
我认为这里应该需要判断一下, 既然input进来是空的,就不应该输出一个null结果




--
Sent from: http://apache-flink.147419.n8.nabble.com/


StreamAPi Window 在使用 .evictor(TimeEvictor.of(Time.seconds(0))).sum().print 报NullPoint

2021-02-03 Thread HunterXHunter
代码如下: evictor设置的在窗口触发前清理所有数据,按理进入sum是没有数据,但是调试的时候发现,sum经过计算会输出 null 进入
print,导致报 Nullpoint。不知道是bug还是我的问题;

class A {
String word;
Long time;

public A(String word, Long time) {
this.word = word;
this.time = time;
}
};
streamEnv.fromElements(new A("a", 1L))
.assignTimestampsAndWatermarks(
WatermarkStrategy.
forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner(((element,
recordTimestamp) -> element.time))
)
.keyBy(x -> x.time)
.map(x -> new Tuple2<>(x.word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy((KeySelector, String>) o -> o.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
.evictor(TimeEvictor.of(Time.seconds(0)))
.sum(1)
.print();
streamEnv.execute();



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink window evictor(TimeEvictor.of(Time.seconds(0))) 会出现 NullPoint问题

2021-02-03 Thread HunterXHunter
代码如下:
stream
.keyBy((KeySelector, String>) o -> o.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(100)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(2)))
.evictor(TimeEvictor.of(Time.seconds(0)))
.sum(1)
.print();

当 数据在窗口计算前被全部清除时,sum结果会是一个null,会传入print,导致 nullpoint



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink window evictor(TimeEvictor.of(Time.seconds(0))) 会出现 NullPoint问题

2021-02-03 Thread HunterXHunter
当程序使用 
evictor(TimeEvictor.of(Time.seconds(0)))
来清除 窗口触发前数据时,当数据全部被清除了,在print时会报Null point

Caused by: java.lang.NullPointerException
at
org.apache.flink.api.common.functions.util.PrintSinkOutputWriter.write(PrintSinkOutputWriter.java:73)
at
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction.invoke(PrintSinkFunction.java:81)


为什么没数据也会传一个 null 到 Sink?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?

2021-01-20 Thread HunterXHunter
CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink ddl sql 在 Test和在Main里面执行结果不同

2021-01-20 Thread HunterXHunter
同一段代码,在main里面可以正常正常,在Test里面却直接结束

StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);

bsTableEnv.executeSql(
DDLSourceSQLManager.createStreamFromKafka("localhost:9092",
"test",
"test",
"test",
"json"));
   
bsTableEnv.executeSql(com.ddlsql.DDLSourceSQLManager.createDynamicPrintlnRetractSinkTbl("printlnRetractSink"));
bsTableEnv.executeSql("insert into printlnRetractSink select
msg,count(*) as cnt from test group by msg");



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink CEP 使用Keyby之后无法输出结果

2021-01-13 Thread HunterXHunter
Pattern.begin("start")
.where(new
SimpleCondition() {
@Override
public boolean filter(KafkaTopicOffsetTimeMsg
kafkaTopicOffsetTimeMsg) throws Exception {
return
kafkaTopicOffsetTimeMsg.msg().equals("start");
}
})
.next("middle") 
.where(new
SimpleCondition() {
@Override
public boolean filter(KafkaTopicOffsetTimeMsg
kafkaTopicOffsetTimeMsg) throws Exception {
return
kafkaTopicOffsetTimeMsg.msg().equals("middle");
}
})
 
当我有next的时候 使用 KeyedStream 无法生效,使用DataStream 可以。
但是如果只有一个start的话,KeyedStream就可以生效了。
请教下:
这是 CEP本身设计的(可能是keyby之后无法保证有一个链路的数据会在一个task),还是我代码的问题?






--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql es写入时,用户名密码认证不支持

2020-12-16 Thread HunterXHunter
是的,需要上传certificate文件,1.12好像没有上传文件的配置



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql es写入时,用户名密码认证不支持

2020-11-30 Thread HunterXHunter
你说的是es的 xpack 认证吗,需要你载入certificate文件是吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 1.11 table.executeInsert 程序退出

2020-09-29 Thread HunterXHunter
当我在使用 StreamTableEnvironment Api的时候;

 Table a = getStreamTable(getKafkaDataStream("test", "localhost:9092",
"latest"),"topic,offset,msg");
  tableEnv.createTemporaryView("test", a);
 
tableEnv.executeSql(DDLSourceSQLManager.createCustomPrintlnRetractSinkTbl("printlnSink_retract"));
  tableEnv.executeSql("insert into printlnSink_retract select
topic,msg,count(*) as ll from test group by topic,msg");

程序直接结束退出,但如果最后加Thread.sleep(1L) 就可以消费10s钟,如果加
tableEnv.execute("jobname");
报错:No operators defined in streaming topology



--
Sent from: http://apache-flink.147419.n8.nabble.com/