Re: flinksql ttl不生效
建议关闭state.backend.incremental ,因为我遇到过,开启增量ckp导致ckp一直增大,关掉就正常了 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 【问题分析】Fink任务无限反压
掐头去尾的提问,完全不知道是什么问题,没法回答你,最好是贴出代码,贴出图片等大家才能帮忙分析 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink checkpoint 速度很慢 问题排查
一般是 Job A出现背压了,checkpoint的时候是要等背压的数据都处理完了才会处理barrier。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?
那会一直增大下去吗,我跑了4天,ckp一直变大,没有稳定的迹象。是不是我需要调整compaction的配置 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?
我遇到过的问题就是开了 增量checkpoint后,checkpoint会越来越大,关闭之后checkpoint就正常了 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?
关闭 增量checkpoint -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 维度表 处理时间
使用cdc的方式读取 mysql的更新数据,使用connect流的方式连接两个流,这样可以避免掉watermark才能触发计算的问题。建议在事实表join不到数据的时候 进行缓存用 onTime进行触发。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 请教,有哪些连接器能让我获得一个非 update/delete,但又有主键的流呢?
json格式改debezium-json试试 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql源表定义字段列表问题
不需要提供全部字段 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re:flink sql怎样将change log stream转换成append log stream?
tableEnv.executeSql(DDLSourceSQLManager.cdcCreateTableSQL("order_info")); tableEnv .toRetractStream(tableEnv.from("order_info"), Row.class) .filter((FilterFunction>) booleanRowTuple2 -> booleanRowTuple2.f0) .map((MapFunction, Row>) booleanRowTuple2 -> booleanRowTuple2.f1) .assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withTimestampAssigner(((element, recordTimestamp) -> System.currentTimeMillis( .keyBy((KeySelector) row -> row.getField("consignee").toString()) .window(TumblingEventTimeWindows.of(Time.seconds(100))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) .process(new ProcessWindowFunction, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable elements, Collector> out) throws Exception { Long count = 0L; for (Row element : elements) { count += 1; } out.collect(new Tuple2(context.window(), count)); } }) .print(); ; streamEnv.execute(); -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Exception: Could not perform checkpoint
看源码是在 catch里面的(应该是在executeCheckpointing的时候报错了,但是catch里面还有一个nullpoint没catch导致程序退出): if (LOG.isDebugEnabled()) { LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " + "Alignment duration: {} ms, snapshot duration {} ms", owner.getName(), checkpointMetaData.getCheckpointId(), checkpointMetrics.getAlignmentDurationNanos() / 1_000_000, checkpointMetrics.getSyncDurationMillis()); } 的 checkpointMetrics.getAlignmentDurationNanos() / 1_000_000, 部分报错的。建议关掉 debug日志看看。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于union之后的keyBy问题
这部分没人报bug,所以社区一般是不会去验证的。所以你要给出你的代码,同时对问题描述尽量清晰,否则很难有人去自己写代码验证问题。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Table-api sql 预检查
tableEnv.explainSql(""); -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 背压问题
中间有错误数据或者其他错误原因,背压不会导致数据丢失 -- Sent from: http://apache-flink.147419.n8.nabble.com/
问题:flink 1.13编译 flink-parquet报错 -类重复:org.apache.flink.formats.parquet.generated.SimpleRecord
查看发现 org.apache.avro avro-maven-plugin ${avro.version} generate-sources schema ${project.basedir}/src/test/resources/avro ${project.basedir}/target/generated-test-sources/ 这个编译插件的问题 同 flink 1.12 (编译没问题)比较发现: 区别: ${project.basedir}/src/test/java/ 当我把flink 1.13/ master 改成这个就可以编译通过了。 我想知道有人遇到跟我一样的问题吗。是我环境的问题吗?? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: DataStreamAPI 与flink sql疑问
你试过吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 疑问:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大
没解决,我只能把它关闭了 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 設置look up table source
理论上只要实现了LookupTableSource。你在 TableFunction 里面怎么重写 eval 都可以,不管你是要读取哪里的数据怎么读。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic
在ddl的时候设置了 watermark。在任务页面查看watermark的时候一直没有更新watermark -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming update and delete changes
toAppendDataStream试试看 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于Flink SQL中Interval Join使用时watermark的疑惑
问题一: Interval Join doesn't support consuming update and delete changes是因为输入A或者B是一个更新流 问题二:interval join使用buffer来缓存A和B的数据,没有放在state里,只有在watermark超过下边界会触发onEventtime清理 buffer。 延迟问题:没有类似statettl的配置,interval join不受statettl控制清除状态 乱序问题:如果 B的数据时间小于 watermark则匹配不到,一切是跟watermmark相关 以上个人看源码理解的。希望有帮助 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink CEP事件查询,如何在flink运行中动态更新cep规则
可能需要修改源码: https://blog.csdn.net/young_0609/article/details/110407781 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic
但是用process-time是有数据的,目前用partition-time一直没成功写出过数据 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink CEP 使用Keyby之后无法输出结果
这不是一个问题,是我理解的问题。感谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于Flink SQL中Interval Join使用时watermark的疑惑
1: Interval Join doesn't support consuming update and delete changes 是因为A或B是一个update stream 2: Interval Join 的临时数据是放在buffer中,当wtm超过边界时间就会清理掉 buffer也就join不到了。所以 statettl无法控制A流的缓存数据。 延迟问题:所以如果wtm不更新,A流的数据不会被清理因为不受statettl控制 乱序问题:如果B流的旧时间小于 watermark就join不上 以上是个人理解、、 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: BUG :DataStream 转 Table 后无法 触发窗口计算
有人知道这个bug吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 分组滚动窗口 无法触发计算,由于 watermark 没有被生成,或者被计算。
1.12默认是 eventtime不需要设置 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 求问Hive DDL TBLPROPERTIES不生效
你好, 1:我设置的时候就是 使用的 partition-time 同时 设定checkpoint间隔为60s。但是我发现watermark一直没有生成或者更新,导致我的数据一直无法commit。想知道 为什么watermark无法生成。当时使用process-time是没问题的。 2:因为写hive的话会有小文件的问题。所以我使用file sink来设置合并文件和控制文件大小。但是写文件是无法写hive metastore。所以hive查不出数据。 想知道有什么方法解决hive小文件问题,难道只能T+1做小文件合并吗。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-sql 客户端采用execution.target: yarn-per-job 模式,如何指定提交的队列??
flink run -yD yarn.application.queue=x -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic
重要: 在流模式下使用 FileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 'in-progress' 或 'pending' 状态,下游系统无法安全地读取。 在官方文档找到了这个,必须要有checkpoint才行,但是我 手动savepoint之后,虽然有sucess文件,但是没有数据 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic
查看hdfs文件: 分区一直是这样的一个文件,没有生成 _SUCCESS文件 .part-40a2c94d-0437-4666-8d43-31c908aaa02e-0-0.inprogress.73dcc10b-44f4-47e3-abac-0c14bd59f9c9 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic
你好,这个问题已经解决了。 我现在通过官方例子: SET table.sql-dialect=default; create table flink_kafka( sys_time bigint, rt AS TO_TIMESTAMP(FROM_UNIXTIME(sys_time / 1000, '-MM-dd HH:mm:ss')), WATERMARK FOR rt AS rt - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '', 'properties.group.id' = 'test-sql', 'format' = 'json', 'json.ignore-parse-errors' = 'true' ); SET table.sql-dialect=hive; CREATE TABLE hive_table ( sys_time bigint ) PARTITIONED BY (dt STRING, hr STRING) STORED AS orc TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='process-time', 'sink.partition-commit.delay'='0s', 'sink.partition-commit.policy.kind'='metastore,success-file' ); INSERT INTO hive_table SELECT sys_time, DATE_FORMAT(rt, '-MM-dd') as dt, DATE_FORMAT(rt, 'HH') as hr FROM flink_kafka; 发现数据一直无法写入hive。程序没有报错, select * from flink_kafka;是有数据的。 但是hive_table一直没有数据, 我发送各个时间段的数据,watermark应该也是超过了分区时间的,但是hive_table一直没有数据 -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink 1.12.2 sql-cli 写入Hive报错 is_generic
当配置好HiveCatalog后, SQL-Cli 也可以查到hive库表信息 创建kafka表: create table test.test_kafka( word VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'xx', 'properties.group.id' = 'test', 'format' = 'json', 'json.ignore-parse-errors' = 'true' ); 在 Hive里面可以查到改表 hive > DESCRIBE FORMATTED test_kafka ... is_generic true . 但是我在 Flink SQL > select * from test.test_kafka; 报错: org.apache.flink.table.api.ValidationException: Unsupported options found for connector 'kafka'. Unsupported options: is_generic Supported options: connector format json.fail-on-missing-field json.ignore-parse-errors -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink1.11如何实现Tumble Window后基于event time倒序取第一条作统计
GroupWindowAggregate不支持update或者delete的datasource。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: interval join 如何用 process time
你不定义watermark好像用的就是pro time -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 提交两个SQL任务,其中一个不生效。
StatementSet inserts = tableEnv.createStatementSet(); inserts.addInsertSql("insert into xxx select * from xxx") // topic1 -》topic2任务 inserts.addInsertSql("insert into xxx select * from xxx") // topic2 -》Postgre 任务 inserts.execute(); -- Sent from: http://apache-flink.147419.n8.nabble.com/
疑问:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大
1:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大 我10分钟一次checkpoint,每次都增大2M,两天增大到400M,但其实我的实际应该只有20M(只做一个窗口计算)(我做savepoint之后也才20M)。 已设置了 ttl。 2:当我关闭state.backend.incremental 后 。每次checkpoint也就20M左右,不会变大了。 按我的理解:state.backend.incremental 开启后,Checkpointed Data Size大小应该是增量的大小,可能也就几k左右。为什么会一直增大。 想问下这是为什么??是bug?? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql如何从远程加载jar包中的udf
通过 createTemporarySystemFunction 试试看呢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 如何定义时态表
把格式调整下,很乱看不明白 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: BUG :DataStream 转 Table 后无法 触发窗口计算
试了 1.12.2,还是一样问题。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: BUG :DataStream 转 Table 后无法 触发窗口计算
但是看情况好像是只有在:DataStream发生Keyby或者 setParallelism的时候才会发生 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: BUG :DataStream 转 Table 后无法 触发窗口计算
1.12.1 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: BUG :DataStream 转 Table 后无法 触发窗口计算
再试了一下: 修改并行度也不行 .setParallelism(9) -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: BUG :DataStream 转 Table 后无法 触发窗口计算
经过再一次验证: 即使我做group by rowtime的操作, 我对datastream做keyby(rowtime) 也有这个问题 例如: tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime from test ")); SingleOutputStreamOperator r = tableEnv.toRetractStream(tableEnv.from("test3"), Row.class) .filter(x -> x.f0) .keyby(_.f1) .returns(Types.TUPLE(Types.STRING, Types.LONG)) .assignTimestampsAndWatermarks( WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(1)) .withTimestampAssigner(((element, recordTimestamp) -> element.f1)) ); 结果也是无法触发窗口 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink Job 如何集成到自己的系统,方便管理
https://github.com/zhp8341/flink-streaming-platform-web 这个你可以参考下 -- Sent from: http://apache-flink.147419.n8.nabble.com/
BUG :DataStream 转 Table 后无法 触发窗口计算
1:当DataStream是由 一个table 经过 group by rowtime 转换过来的就无法触发窗口 例如: tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime from test group by msg,rowtime")); // 获得 DataStream,并定义wtm生成 SingleOutputStreamOperator r = tableEnv.toRetractStream(tableEnv.from("test3"), Row.class) .filter(x -> x.f0) // map .returns(Types.TUPLE(Types.STRING, Types.LONG)) .assignTimestampsAndWatermarks( WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(1)) .withTimestampAssigner(((element, recordTimestamp) -> element.f1)) ); 参考 官方文档: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html // stream - 转 Table,指定Rowtime tableEnv.createTemporaryView("test5", r, $("msg"), $("rowtime").rowtime()); String sql5 = "select " + "msg," + "count(1) cnt" + " from test5 " + " group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " + ""; tableEnv.executeSql("insert into printlnRetractSink " + sql5); 结果: 无法触发窗口操作。 查调试源码: org.apache.flink.table.runtime.operators.window.WindowOperator // 返回的wtm永远都是 -9223372036854775808 public long getCurrentWatermark() { return internalTimerService.currentWatermark(); } // 查看任务,watermark是正常在生成的。InternalTimerServiceImpl.advanceWatermark是正常为currentWatermark赋值。但是 internalTimerService.currentWatermark() 却拿的是-9223372036854775808 // 当 tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime from test group by msg,rowtime")); 语句改为 tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime from test")); 结果就是正确的。 所以这是一个bug吗?? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql中如何使用异步io关联维表?
定义一个 sourcetable -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: UDF 重复调用的问题、
为什么4次是没问题的,感觉只执行一次才是最优的 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于flink窗口state
你这代码贴的乱七八糟。。。 你需要再richjoinfunction里面设置valuestate的生命周期,他不随着窗口而销毁,窗口只会销毁自己设定的state,有空你可以看看window的源码,里面有清理state的逻辑 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: getCurrentWatermark
currentMaxTimestamp 只是当前数据流里面最大,但不一定是全部的最大。 当数据出现延迟,或者多流的情况下,lastEmittedWatermark 不一定会比 currentMaxTimestamp 小 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql
我做了。。 添加了一个sql语法类似 "select " + "msg," + "count(1) cnt" + " from test" + " where msg = 'hello' " + " group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " + " EMIT \n" + " WITH DELAY '10' SECOND BEFORE WATERMARK,\n" + " WITHOUT DELAY AFTER WATERMARK"; 每10s触发一次窗口计算。 参考阿里云的Emit。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: StreamAPi Window 在使用 .evictor(TimeEvictor.of(Time.seconds(0))).sum().print 报NullPoint
我认为这可能是一个bug (当然也可能是故意这样设计的): 在 EvictingWindowOperator.emitWindowContents()位置: userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector); 当timestampedCollector的size = 0时; 执行到 ReduceApplyWindowFunction部分: public void apply(K k, W window, Iterable input, Collector out) throws Exception { T curr = null; for (T val: input) { if (curr == null) { curr = val; } else { curr = reduceFunction.reduce(curr, val); } } wrappedFunction.apply(k, window, Collections.singletonList(curr), out); } wrappedFunction.apply(k, window, Collections.singletonList(curr), out);将会产生一个Collections.singletonList(null)结果。 我认为这里应该需要判断一下, 既然input进来是空的,就不应该输出一个null结果 -- Sent from: http://apache-flink.147419.n8.nabble.com/
StreamAPi Window 在使用 .evictor(TimeEvictor.of(Time.seconds(0))).sum().print 报NullPoint
代码如下: evictor设置的在窗口触发前清理所有数据,按理进入sum是没有数据,但是调试的时候发现,sum经过计算会输出 null 进入 print,导致报 Nullpoint。不知道是bug还是我的问题; class A { String word; Long time; public A(String word, Long time) { this.word = word; this.time = time; } }; streamEnv.fromElements(new A("a", 1L)) .assignTimestampsAndWatermarks( WatermarkStrategy. forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withTimestampAssigner(((element, recordTimestamp) -> element.time)) ) .keyBy(x -> x.time) .map(x -> new Tuple2<>(x.word, 1L)) .returns(Types.TUPLE(Types.STRING, Types.LONG)) .keyBy((KeySelector, String>) o -> o.f0) .window(TumblingEventTimeWindows.of(Time.seconds(20))) .evictor(TimeEvictor.of(Time.seconds(0))) .sum(1) .print(); streamEnv.execute(); -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink window evictor(TimeEvictor.of(Time.seconds(0))) 会出现 NullPoint问题
代码如下: stream .keyBy((KeySelector, String>) o -> o.f0) .window(TumblingEventTimeWindows.of(Time.seconds(100))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(2))) .evictor(TimeEvictor.of(Time.seconds(0))) .sum(1) .print(); 当 数据在窗口计算前被全部清除时,sum结果会是一个null,会传入print,导致 nullpoint -- Sent from: http://apache-flink.147419.n8.nabble.com/
Flink window evictor(TimeEvictor.of(Time.seconds(0))) 会出现 NullPoint问题
当程序使用 evictor(TimeEvictor.of(Time.seconds(0))) 来清除 窗口触发前数据时,当数据全部被清除了,在print时会报Null point Caused by: java.lang.NullPointerException at org.apache.flink.api.common.functions.util.PrintSinkOutputWriter.write(PrintSinkOutputWriter.java:73) at org.apache.flink.streaming.api.functions.sink.PrintSinkFunction.invoke(PrintSinkFunction.java:81) 为什么没数据也会传一个 null 到 Sink? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?
CREATE TABLE KafkaTable ( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', `partition` BIGINT METADATA VIRTUAL, `offset` BIGINT METADATA VIRTUAL, `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); -- Sent from: http://apache-flink.147419.n8.nabble.com/
Flink ddl sql 在 Test和在Main里面执行结果不同
同一段代码,在main里面可以正常正常,在Test里面却直接结束 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); bsTableEnv.executeSql( DDLSourceSQLManager.createStreamFromKafka("localhost:9092", "test", "test", "test", "json")); bsTableEnv.executeSql(com.ddlsql.DDLSourceSQLManager.createDynamicPrintlnRetractSinkTbl("printlnRetractSink")); bsTableEnv.executeSql("insert into printlnRetractSink select msg,count(*) as cnt from test group by msg"); -- Sent from: http://apache-flink.147419.n8.nabble.com/
Flink CEP 使用Keyby之后无法输出结果
Pattern.begin("start") .where(new SimpleCondition() { @Override public boolean filter(KafkaTopicOffsetTimeMsg kafkaTopicOffsetTimeMsg) throws Exception { return kafkaTopicOffsetTimeMsg.msg().equals("start"); } }) .next("middle") .where(new SimpleCondition() { @Override public boolean filter(KafkaTopicOffsetTimeMsg kafkaTopicOffsetTimeMsg) throws Exception { return kafkaTopicOffsetTimeMsg.msg().equals("middle"); } }) 当我有next的时候 使用 KeyedStream 无法生效,使用DataStream 可以。 但是如果只有一个start的话,KeyedStream就可以生效了。 请教下: 这是 CEP本身设计的(可能是keyby之后无法保证有一个链路的数据会在一个task),还是我代码的问题? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql es写入时,用户名密码认证不支持
是的,需要上传certificate文件,1.12好像没有上传文件的配置 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql es写入时,用户名密码认证不支持
你说的是es的 xpack 认证吗,需要你载入certificate文件是吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Flink 1.11 table.executeInsert 程序退出
当我在使用 StreamTableEnvironment Api的时候; Table a = getStreamTable(getKafkaDataStream("test", "localhost:9092", "latest"),"topic,offset,msg"); tableEnv.createTemporaryView("test", a); tableEnv.executeSql(DDLSourceSQLManager.createCustomPrintlnRetractSinkTbl("printlnSink_retract")); tableEnv.executeSql("insert into printlnSink_retract select topic,msg,count(*) as ll from test group by topic,msg"); 程序直接结束退出,但如果最后加Thread.sleep(1L) 就可以消费10s钟,如果加 tableEnv.execute("jobname"); 报错:No operators defined in streaming topology -- Sent from: http://apache-flink.147419.n8.nabble.com/