Re: FLINK Invocation error

2021-04-24 Thread Yang Wang
I think the root cause might be the firewall between your local machine and
YARN cluster.
It seems that your local machine is only allowed to access some specific
ports from the YARN cluster(e.g. Yarn ResourceManager port).

You could verify that by telnet the running JobManager address(
10.0.11.57:46637) from your local machine. The connection will be refused.

Best,
Yang

Vijayendra Yadav  于2021年4月24日周六 上午8:05写道:

> Hi Team,
>
> While restarting Flink application from CHECKPOINT, facing the following
> Error(intermittently), but it does not impact Job getting submitted or
> functionality. But still wondering what could be the reason and solution ?
>
> *RUN Command:*
>
> /usr/lib/flink/bin/flink run
>\
> -s
> *s3://bucket-app/flink/checkpoint/app/0c9be9b65962e068b6b138ed81f7ae14/chk-13229/*
>   \
> -c com.comp.App \
> -m yarn-cluster
> \
> -yjm 4096m
>\
> -ytm 6144m
>\
> -ynm flink-app\
> -yt ${app_install_path}/conf
>\
> ${app_install_path}/*.jar
> \
> --conffile ${app_install_path}/application.properties
> \
> --app App
>
>
> *ERROR Messages:*
>
> *Job has been submitted with JobID e510e34928101ed53cb08df6d3d29f69*
> 13:00:35.488 [main] ERROR org.apache.flink.client.cli.CliFrontend - Error
> while running the command.
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
> complete the operation. Number of retries has been exhausted.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_265]
> at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_265]
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
> [hadoop-common-2.10.0-amzn-0.jar:?]
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
> complete the operation. Number of retries has been exhausted.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> ~[?:1.8.0_265]
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> ~[?:1.8.0_265]
> at
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> com.directv.dis.DisFlinkService.execute(DisFlinkService.java:73) ~[?:?]
> at
> com.directv.dis.DisFlinkEmrApplication.main(DisFlinkEmrApplication.java:38)
> ~[?:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_265]
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_265]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_265]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> ... 11 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been exhausted.
> at
> 

??????Flink????????????????

2021-04-24 Thread lian



??2021??04??25?? 11:47??Natasha ??
hi,all
??
tableEnv.createTemporaryView("sensor", sensorTable);
val resultSqlTable = tableEnv.sqlQuery("select country, count(order_id) as cnt 
from sensor group by country");


??socket??
001 usa
002 usa
003 china
002 china
004 usa


??
usa, 2
china, 2


??
usa, 3
china, 2


??usa??usa

??


thanks

??????Flink????????????????

2021-04-24 Thread ??????
??

??2021??04??25?? 11:50??Natasha ??
hi,all
??
tableEnv.createTemporaryView("sensor", sensorTable);
val resultSqlTable = tableEnv.sqlQuery("select country, count(order_id) as cnt 
from sensor group by country");
tableEnv.toRetractStream[WaterSensorCnt](resultSqlTable).print("result");


??socket??
001 usa
002 usa
003 china
002 china
004 usa


??
usa, 2
china, 2


??
usa, 3
china, 2


??usa??usa

??


thanks

Flink????????????????

2021-04-24 Thread Natasha
hi,all
??
tableEnv.createTemporaryView("sensor", sensorTable);
val resultSqlTable = tableEnv.sqlQuery("select country, count(order_id) as cnt 
from sensor group by country");
tableEnv.toRetractStream[WaterSensorCnt](resultSqlTable).print("result");


??socket??
001 usa
002 usa
003 china
002 china
004 usa


??
usa, 2
china, 2


??
usa, 3
china, 2


??usa??usa

??


thanks

Flink????????????????

2021-04-24 Thread Natasha
hi,all
??
tableEnv.createTemporaryView("sensor", sensorTable);
val resultSqlTable = tableEnv.sqlQuery("select country, count(order_id) as cnt 
from sensor group by country");


??socket??
001 usa
002 usa
003 china
002 china
004 usa


??
usa, 2
china, 2


??
usa, 3
china, 2


??usa??usa

??


thanks

flink????????????????

2021-04-24 Thread Natasha
hi,all
??
tableEnv.createTemporaryView("sensor",sensorTable);
valresultSqlTable=tableEnv.sqlQuery("selectcountry,count(order_id)ascntfromsensorgroupbycountry");

??socket??
001usa
002usa
003china
002china
004usa

??
usa,2
china,2

??
usa,3
china,2

??usa??usa

??

thanks

????????????????

2021-04-24 Thread Natasha
hi,all
 ??
 tableEnv.createTemporaryView("sensor", sensorTable);
 val resultSqlTable = tableEnv.sqlQuery("select country, count(order_id) 
as cnt from sensor group by country");



 ??socket??
 001 usa
 002 usa
 003 china
 002 china
 004 usa


 ??
 usa, 2
 china, 2


 ??
 usa, 3
 china, 2


 ??usa??usa
 
 ??


thanks
 

when should `FlinkYarnSessionCli` be included for parsing CLI arguments?

2021-04-24 Thread Tony Wei
Hi Experts,

I recently tried to run yarn-application mode on my yarn cluster, and I had
a problem related to configuring `execution.target`.
After reading the source code and doing some experiments, I found that
there should be some room of improvement for `FlinkYarnSessionCli` or
`AbstractYarnCli`.

My experiments are:

   1. setting `execution.target: yarn-application` in flink-conf.yaml and
   run `flink run-application -t yarn-application`: run job successfully.
  1. `FlinkYarnSessionCli` is not active
  2. `GenericCLI` is active
   2. setting `execution.target: yarn-per-job` in flink-conf.yaml and
run `flink
   run-application -t yarn-application`: run job failed
  1. failed due to `ClusterDeploymentException` [1]
  2. `FlinkYarnSessionCli` is active
   3. setting `execution.target: yarn-application` in flink-conf.yaml and
   run `flink run -t yarn-per-job`: run job successfully.
  1. `FlinkYarnSessionCli` is not active
  2. `GenericCLI` is active
   4. setting `execution.target: yarn-per-job` in flink-conf.yaml and
run `flink
   run -t yarn-per-job`: run job successfully.
  1. `FlinkYarnSessionCli` is active

>From `AbstractYarnCli#isActive` [2] and `FlinkYarnSessionCli#isActive` [3],
`FlinkYarnSessionCli` will be active when `execution.target` is specified
with `yarn-per-job` or `yarn-session`.

According to the flink official document [4], I thought the 2nd experiment
should also work well, but it didn't.

> The --target will overwrite the execution.target
> 
>  specified
> in the config/flink-config.yaml.


The root cause is that `FlinkYarnSessionCli` only overwrite the
`execution.target` with `yarn-session` or `yarn-per-job` [5], but no
`yarn-application`.
So, my question is

   1. should we use `FlinkYarnSessionCli` in case 2?
   2. if we should, how we can improve `FlinkYarnSessionCli` so that we can
   overwrite `execution.target` via `--target`?

and one more improvement, the config description for `execution.target` [6]
should include `yarn-application` as well.

[1]
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L439-L447
[2]
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/AbstractYarnCli.java#L54-L66
[3]
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L373-L377
[4]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#selecting-deployment-targets
[5]
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L397-L413
[6]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java#L41-L46

best regards,


Re: 官网文档和样例的不完整性和不严谨性的问题

2021-04-24 Thread Shengkai Fang
Hi, xuefli.

非常感谢你指出文档的问题!

由于邮件中看代码比较吃力(没有语法高亮以及排版的问题),我只是粗略地看了下代码。

 当输入源 为 `一次性从内存中的List读取数据`,无法触发onTimer。 实际的例子中,我看到看到采用的是process time,且延时 3s
触发 。我怀疑是不是,数据量太少,所以程序很快就结束了导致没来得及触发timer,建议改成event time试试这种情况。

Best,
Shengkai

xue...@outlook.com  于2021年4月25日周日 上午9:42写道:

> Flink1.10的集群,用hdfs做backend
>
> 无论从flink最早的版本到flink 1.12都存在的一些文档和样例的不完整,或者说相同的代码,因输入源不同导致的结果差异。
>
> 比如说下面链接中的样例
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/process_function.html
>
> 如果输入源分别为
>
> 1. 一次性从内存中的List读取数据
>
> 2. 一次性从文件目录读取读取数据
>
> 3. 持续从文件目录读取数据
>
> 4. 从socket流持续读取文件
>
>
> 上面的4者,只有3和4,对于KeyedStream的process(…)中使用ValueState在处理onTimer函数时才会被触发调用,对于1和2是不会的。
>
> 相信其他的算子也存在类似的问题
>
> 具体代码如下:
> ```java
>
> package com.xxx.data.stream;
>
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.state.StateTtlConfig;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.api.java.io.TextInputFormat;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> import
> org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
> import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.util.Collector;
>
> import javax.annotation.Nullable;
> import java.text.SimpleDateFormat;
> import java.time.LocalDateTime;
> import java.time.format.DateTimeFormatter;
> import java.util.ArrayList;
> import java.util.Date;
> import java.util.List;
>
> public class KeyedStreamJob {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
> env.setParallelism(4);
>
> //1.从内存获取数据
> Tuple2 item = null;
> List> items = new ArrayList<>();
> item = new Tuple2<>("k1", 1);
> items.add(item);
> item = new Tuple2<>("k3", 3);
> items.add(item);
> item = new Tuple2<>("k1", 10);
> items.add(item);
> item = new Tuple2<>("k2", 2);
> items.add(item);
> item = new Tuple2<>("k1", 100);
> items.add(item);
> item = new Tuple2<>("k2", 20);
> items.add(item);
> DataStreamSource> streamSource =
> env.fromCollection(items);
> SingleOutputStreamOperator> listStream =
> streamSource.assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks>() {
> @Nullable
> @Override
> public Watermark getCurrentWatermark() {
> return null;
> }
>
> @Override
> public long extractTimestamp(Tuple2 element,
> long previousElementTimestamp) {
> System.out.println("---");
> return System.currentTimeMillis();
> }
> });
>
> //2.从文件夹一次性获取数据
> SingleOutputStreamOperator> fileStream =
> env.readTextFile("D:\\data", "UTF-8").map(new MapFunction Tuple2>() {
> @Override
> public Tuple2 map(String value) throws
> Exception {
> return new Tuple2<>(value, 1);
> }
> })
> .assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks>() {
> @Nullable
> @Override
> public Watermark getCurrentWatermark() {
> return null;
> }
>
> @Override
> public long extractTimestamp(Tuple2
> element, long previousElementTimestamp) {
> return System.currentTimeMillis();
> }
> });
>
> //3.从文件夹持续获取数据
> 

Re: 关于upsert-kafka connector的问题

2021-04-24 Thread Shengkai Fang
这里有对upsert-kafka完整的一个分析的讲解:深度解析 Flink upsert-kafka[1]。如果还有问题,可以继续咨询。

[1]https://flink-learning.org.cn/developers/flink-training-course3/

Shengkai Fang  于2021年4月25日周日 上午10:16写道:

> 本质上,upsert-kafka是对kafka的封装,其内部仍然是一个消息队列,只是在消费的时候,我们形成一个视图。
>
> 消息从flink进入到kafka之中,根据kafka的协议保证了at-least-once。
>
> Best,
> Shengkai
>
> op <520075...@qq.com> 于2021年4月23日周五 下午2:18写道:
>
>>
>> 谢谢,upsert-kafka作为sink可以保证相同key的数据放在同一个partition内,假如对相同key的更新数据,由于网络等原因后更新的值A的比先更新的值B提前发送到kafka,
>> 这个时候用upsert-kafka去消费数据更新这个key,收到A进行更新后,在收到B的时候会覆盖掉A对吗
>>
>>
>>
>>
>> --原始邮件--
>> 发件人:
>>   "user-zh"
>> <
>> fskm...@gmail.com;
>> 发送时间:2021年4月23日(星期五) 中午12:20
>> 收件人:"user-zh">
>> 主题:Re: 关于upsert-kafka connector的问题
>>
>>
>>
>> 如果数据在upsert-kafka中已经做到了按序存储(相同key的数据放在同一个partition内),那么flink消费的时候可以做到保序。
>>
>> Best,
>> Shengkai
>
>


Re: 关于upsert-kafka connector的问题

2021-04-24 Thread Shengkai Fang
本质上,upsert-kafka是对kafka的封装,其内部仍然是一个消息队列,只是在消费的时候,我们形成一个视图。

消息从flink进入到kafka之中,根据kafka的协议保证了at-least-once。

Best,
Shengkai

op <520075...@qq.com> 于2021年4月23日周五 下午2:18写道:

>
> 谢谢,upsert-kafka作为sink可以保证相同key的数据放在同一个partition内,假如对相同key的更新数据,由于网络等原因后更新的值A的比先更新的值B提前发送到kafka,
> 这个时候用upsert-kafka去消费数据更新这个key,收到A进行更新后,在收到B的时候会覆盖掉A对吗
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> fskm...@gmail.com;
> 发送时间:2021年4月23日(星期五) 中午12:20
> 收件人:"user-zh"
> 主题:Re: 关于upsert-kafka connector的问题
>
>
>
> 如果数据在upsert-kafka中已经做到了按序存储(相同key的数据放在同一个partition内),那么flink消费的时候可以做到保序。
>
> Best,
> Shengkai


Re: Writing to Avro from pyflink

2021-04-24 Thread Dian Fu
I guess you only need file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar 
. Could you remove 
flink-avro-1.12.2.jar and avro-1.10.2.jar and try again? 

Regards,
Dian

> 2021年4月24日 上午8:29,Edward Yang  写道:
> 
> I've been trying to write to the avro format with pyflink 1.12.2 on ubuntu, 
> I've tested my code with an iterator writing to csv and everything works as 
> expected. Reading through the flink documentation I see that I should add jar 
> dependencies to work with avro. I downloaded three jar files that I believe 
> are required for avro like so:
> 
> table_env\
> .get_config()\
> .get_configuration()\
> .set_string(
> "pipeline.jars", 
> 
> rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar"
> )
> 
> I suspect I'm not loading the jar files correctly, but it's unclear what I'm 
> supposed to do as I'm not familiar with java and when I switch the sink 
> format to avro I get some unexpected errors: 
> Py4JJavaError: An error occurred while calling o746.executeInsert.
> : java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter
>   at 
> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71)
>   at 
> org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139)
>   at 
> org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134)
>   at 
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42)
>   at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86)
>   at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
>   at 
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at 
> 

官网文档和样例的不完整性和不严谨性的问题

2021-04-24 Thread xue...@outlook.com
Flink1.10的集群,用hdfs做backend

无论从flink最早的版本到flink 1.12都存在的一些文档和样例的不完整,或者说相同的代码,因输入源不同导致的结果差异。

比如说下面链接中的样例
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/process_function.html

如果输入源分别为

1. 一次性从内存中的List读取数据

2. 一次性从文件目录读取读取数据

3. 持续从文件目录读取数据

4. 从socket流持续读取文件

上面的4者,只有3和4,对于KeyedStream的process(…)中使用ValueState在处理onTimer函数时才会被触发调用,对于1和2是不会的。

相信其他的算子也存在类似的问题

具体代码如下:
```java

package com.xxx.data.stream;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.watermark.Watermark;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class KeyedStreamJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(4);

//1.从内存获取数据
Tuple2 item = null;
List> items = new ArrayList<>();
item = new Tuple2<>("k1", 1);
items.add(item);
item = new Tuple2<>("k3", 3);
items.add(item);
item = new Tuple2<>("k1", 10);
items.add(item);
item = new Tuple2<>("k2", 2);
items.add(item);
item = new Tuple2<>("k1", 100);
items.add(item);
item = new Tuple2<>("k2", 20);
items.add(item);
DataStreamSource> streamSource = 
env.fromCollection(items);
SingleOutputStreamOperator> listStream = 
streamSource.assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks>() {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return null;
}

@Override
public long extractTimestamp(Tuple2 element, long 
previousElementTimestamp) {
System.out.println("---");
return System.currentTimeMillis();
}
});

//2.从文件夹一次性获取数据
SingleOutputStreamOperator> fileStream = 
env.readTextFile("D:\\data", "UTF-8").map(new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
return new Tuple2<>(value, 1);
}
})
.assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks>() {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return null;
}

@Override
public long extractTimestamp(Tuple2 
element, long previousElementTimestamp) {
return System.currentTimeMillis();
}
});

//3.从文件夹持续获取数据
TypeInformation typeInformation = 
BasicTypeInfo.STRING_TYPE_INFO;
TextInputFormat format = new TextInputFormat(new Path("D:\\data"));
format.setCharsetName("UTF-8");
//是否支持递归
format.setNestedFileEnumeration(true);
SingleOutputStreamOperator> continuefileStream 
= env.readFile(format, "D:\\data", FileProcessingMode.PROCESS_CONTINUOUSLY, 
6000L, typeInformation).map(new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
  

flink cep and out of order events

2021-04-24 Thread Jin Yi
does the default within behavior of flink cep handle out of order events
(relative to event time)?  obviously, it'd be best if the event time was
guaranteed correct, but sometimes it's too difficult to do.

do people end up writing different patterns with some event orderings
reversed to capture these out of order events and push
unmatched/partial/timed-out events from side channels off the main pattern
matching through this?

is it better to invest in some pre-processing to try and guarantee ordering?


Re: NoSuchMethorError when using chill-protobuf

2021-04-24 Thread Prashant Deva
ah it seems that one needs to use chill 0.7.6 in order to get it to work
with flink.

On Sat, Apr 24, 2021 at 1:18 PM Prashant Deva  wrote:

> I am trying to use chill-protobuf to serialize protobuf messages.
> However, I am getting this exception:
>
>
> Caused by: java.lang.NoSuchMethodError: 'boolean
> com.twitter.chill.java.Java8ClosureRegistrar.areOnJava8
> ()'
> at com.twitter.chill.KryoBase.isJavaLambda(KryoBase.scala:47)
> at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:50)
> at com.esotericsoftware.kryo.Kryo.getSerializer(Kryo.java:476
> )
> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862
> )
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy
> 
> (KryoSerializer.java:260 )
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(
> CopyingChainingOutput.java:69 )
> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(
> CopyingChainingOutput.java:46 )
> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(
> CopyingChainingOutput.java:26 )
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:50 )
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:28 )
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(
> StreamSourceContexts.java:322 )
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(
> StreamSourceContexts.java:426 )
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(
> AbstractFetcher.java:365 )
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(
> KafkaFetcher.java:183 )
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(
> KafkaFetcher.java:142 )
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run
> 
> (FlinkKafkaConsumerBase.java:826 
> )
> at org.apache.flink.streaming.api.operators.StreamSource.run
> (
> StreamSource.java:110 )
> at org.apache.flink.streaming.api.operators.StreamSource.run
> (
> StreamSource.java:66 )
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run
> 
> (SourceStreamTask.java:263 )
>
>
>
> Here is my dependencies. It seems fine according to the docs.
> Am i doing something wrong? How do I fix this?
>
>
>
> org.apache.flink
> flink-clients_2.12
> ${flink.version}
> 
>
> 
> org.apache.flink
> flink-streaming-java_2.12
> ${flink.version}
> 
>
> 
> org.apache.flink
> flink-connector-kafka_2.12
> ${flink.version}
> 
>
> 
> com.google.protobuf
> protobuf-java
> 3.15.8
> 
>
> 
> com.twitter
> chill-protobuf
> 0.9.5
> 
> 
> 
> com.esotericsoftware.kryo
> kryo
> 
> 
> 
>
> Sent via Superhuman 
>
>


NoSuchMethorError when using chill-protobuf

2021-04-24 Thread Prashant Deva
I am trying to use chill-protobuf to serialize protobuf messages.

However, I am getting this exception:

Caused by: java.lang.NoSuchMethodError: 'boolean 
com.twitter.chill.java.Java8ClosureRegistrar.areOnJava8 ( 
http://com.twitter.chill.java.java8closureregistrar.areonjava8/ ) ()'

at com.twitter.chill.KryoBase.isJavaLambda(KryoBase.scala:47)

at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:50)

at com.esotericsoftware.kryo.Kryo.getSerializer( Kryo.java:476 ( 
http://kryo.java:476/ ) )

at com.esotericsoftware.kryo.Kryo.copy( Kryo.java:862 ( http://kryo.java:862/ ) 
)

at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy ( 
http://org.apache.flink.api.java.typeutils.runtime.kryo.kryoserializer.copy/ ) 
( KryoSerializer.java:260 ( http://kryoserializer.java:260/ ) )

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator( 
CopyingChainingOutput.java:69 ( http://copyingchainingoutput.java:69/ ) )

at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect( 
CopyingChainingOutput.java:46 ( http://copyingchainingoutput.java:46/ ) )

at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect( 
CopyingChainingOutput.java:26 ( http://copyingchainingoutput.java:26/ ) )

at org.apache.flink.streaming.api.operators.CountingOutput.collect( 
CountingOutput.java:50 ( http://countingoutput.java:50/ ) )

at org.apache.flink.streaming.api.operators.CountingOutput.collect( 
CountingOutput.java:28 ( http://countingoutput.java:28/ ) )

at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(
 StreamSourceContexts.java:322 ( http://streamsourcecontexts.java:322/ ) )

at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(
 StreamSourceContexts.java:426 ( http://streamsourcecontexts.java:426/ ) )

at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(
 AbstractFetcher.java:365 ( http://abstractfetcher.java:365/ ) )

at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(
 KafkaFetcher.java:183 ( http://kafkafetcher.java:183/ ) )

at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(
 KafkaFetcher.java:142 ( http://kafkafetcher.java:142/ ) )

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run ( 
http://org.apache.flink.streaming.connectors.kafka.flinkkafkaconsumerbase.run/ 
) ( FlinkKafkaConsumerBase.java:826 ( http://flinkkafkaconsumerbase.java:826/ ) 
)

at org.apache.flink.streaming.api.operators.StreamSource.run ( 
http://org.apache.flink.streaming.api.operators.streamsource.run/ ) ( 
StreamSource.java:110 ( http://streamsource.java:110/ ) )

at org.apache.flink.streaming.api.operators.StreamSource.run ( 
http://org.apache.flink.streaming.api.operators.streamsource.run/ ) ( 
StreamSource.java:66 ( http://streamsource.java:66/ ) )

at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run
 ( 
http://org.apache.flink.streaming.runtime.tasks.sourcestreamtask$legacysourcefunctionthread.run/
 ) ( SourceStreamTask.java:263 ( http://sourcestreamtask.java:263/ ) )

Here is my dependencies. It seems fine according to the docs.

Am i doing something wrong? How do I fix this?



org.apache.flink

flink-clients_2.12

${flink.version}





org.apache.flink

flink-streaming-java_2.12

${flink.version}





org.apache.flink

flink-connector-kafka_2.12

${flink.version}





 com.google.protobuf ( http://com.google.protobuf/ ) 

protobuf-java

3.15.8





com.twitter

chill-protobuf

0.9.5







com.esotericsoftware.kryo

kryo







Sent via Superhuman ( https://sprh.mn/?vip=prash...@astradot.com )

Re: pojo warning when using auto generated protobuf class

2021-04-24 Thread Prashant Deva
so i did  register the type with Kryo and the ProtobufSerializer. However I am 
still continuing to see the warnings. is this a bug in Flink?

env.config.registerTypeWithKryoSerializer(Trace.APITrace:: class.java ( 
http://class.java/ ) , ProtobufSerializer:: class.java ( http://class.java/ ) )

val stream: DataStreamSource = 
env.addSource(FlinkKafkaConsumer(Config.TOPIC_SPANS, ApiTraceSchema(), props))

Sent via Superhuman ( https://sprh.mn/?vip=prash...@astradot.com )

On Sat, Apr 24, 2021 at 8:48 AM, Yun Gao < yungao...@aliyun.com > wrote:

> 
> Hi Prashant,
> 
> 
> I think the warn is given when calling
> 
> 
> return TypeInformation.of(Trace.APITrace:: class. java ( http://class.java/
> ) )
> 
> 
> Currently flink does not have the native support
> for the protobuf types yet[1], thus it would use a
> generic serializer created by kryo.
> 
> 
> This should not affect the rightness of the program
> and should only affect its performance. One possible
> solution might be register custom serializer into the kryo
> serializer framework for protobuf classes, like the example in [2].
> 
> 
> Best,
> Yun
> 
> 
> [1] https:/ / issues. apache. org/ jira/ browse/ FLINK-11333 (
> https://issues.apache.org/jira/browse/FLINK-11333 )
> [2] https:/ / ci. apache. org/ projects/ flink/ flink-docs-stable/ dev/ 
> custom_serializers.
> html (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
> )
> 
> 
> 
>> --Original Mail --
>> *Sender:* Prashant Deva < prashant@ astradot. com ( prash...@astradot.com )
>> >
>> *Send Date:* Sat Apr 24 11:00:17 2021
>> *Recipients:* User < user@ flink. apache. org ( user@flink.apache.org ) >
>> *Subject:* pojo warning when using auto generated protobuf class
>> 
>>> I am seeing this warning msg when trying to use a custom protobuf
>>> de/serializer with kafka source with auto generated java protobuf class:
>>> 
>>> 
>>> 18:41:31.164 [main] INFO org. apache. flink. api. java. typeutils. 
>>> TypeExtractor
>>> ( http://org.apache.flink.api.java.typeutils.typeextractor/ ) - Class class
>>> com.xx.APITrace cannot be used as a POJO type because not all fields are
>>> valid POJO fields, and must be processed as GenericType. Please read the
>>> Flink documentation on "Data Types & Serialization" for details of the
>>> effect on performance.
>>> 
>>> 
>>> 
>>> here is my serializer. What am i doing wrong?
>>> 
>>> 
>>> class ApiTraceSchema: DeserializationSchema,
>>> SerializationSchema {
>>> 
>>> override fun getProducedType(): TypeInformation {
>>> return TypeInformation.of(Trace.APITrace:: class. java ( http://class.java/
>>> ) )
>>> }
>>> 
>>> override fun deserialize(message: ByteArray): Trace.APITrace {
>>> return Trace.APITrace.parseFrom(message)
>>> }
>>> 
>>> override fun isEndOfStream(nextElement: Trace.APITrace): Boolean {
>>> return false
>>> }
>>> 
>>> override fun serialize(element: Trace.APITrace): ByteArray {
>>> return element.toByteArray()
>>> }
>>> }
>>> 
>> 
>> 
> 
>

Re: Approaches for external state for Flink

2021-04-24 Thread Swagat Mishra
Why not use upserts? Wouldn't that solve the issue of duplicates and there
won't be a need to query database too?

On Sat, Apr 24, 2021, 8:12 PM David Anderson  wrote:

> What are the other techniques for bootstrapping rocksdb state?
>
>
> Bootstrapping state involves somehow creating a snapshot (typically a
> savepoint, but a retained checkpoint can be a better choice in some cases)
> containing the necessary state -- meaning that the state has the same
> operator uid and and state descriptor used by the real streaming job.
>
> You can do this by either: (1) running a variant of the live streaming job
> against the data used for bootstrapping and taking a snapshot when the data
> has been fully ingested, or (2) by using the State Processor API [1].
> You'll find a trivial example of the second approach in [2]. Once you have
> a suitable snapshot, you can run your real job against it.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/libs/state_processor_api.html
> [2] https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf
>
> Regards,
> David
>
> On Sat, Apr 24, 2021 at 3:01 PM Omngr 
> wrote:
>
>> Hi David, thank you for your response first!
>>
>> The state size is about 1 TB for now, but it will increase fastly, and
>> also I can not use the TLL for states. It will grow indefinitely.
>> What are the other techniques for bootstrapping rocksdb state?
>>
>> David Anderson , 24 Nis 2021 Cmt, 15:43 tarihinde
>> şunu yazdı:
>>
>>> Oguzhan,
>>>
>>> Note, the state size is very large and I have to feed the state from
 batch flow firstly. Thus I can not use the internal state like rocksdb.
>>>
>>>
>>> How large is "very large"? Using RocksDB, several users have reported
>>> working with jobs using many TBs of state.
>>>
>>> And there are techniques for bootstrapping the state. That doesn't have
>>> to be a showstopper.
>>>
>>> May be any bottleneck in that flow? I think to use asyncMap functions
 for state read/write operations.
>>>
>>>
>>> That's a good reason to reconsider using Flink state.
>>>
>>> Regards,
>>> David
>>>
>>>
>>>
>>> On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır <
>>> sosyalmedya.oguz...@gmail.com> wrote:
>>>
 I'm trying to design a stream flow that checks *de-duplicate* events
 and sends them to the Kafka topic.

 Basically, flow looks like that;

 kafka (multiple topics) =>  flink (checking de-duplication and event
 enrichment) => kafka (single topic)

 For de-duplication, I'm thinking of using Cassandra as an external
 state store. The details of my job;

 I have an event payload with *uuid* Field. If the event that has the
 same uuid will come, this event should be discarded. In my case, two kafka
 topics are reading. The first topic has a lot of fields, but other topics
 just have a *uuid* field, thus I have to enrich data using the same
 uuid for the events coming from the second topic.

 Stream1: Messages reading from the first topic. Read state from
 Cassandra using the *uuid*. If a state exists, ignore this event and *do
 not* emit to the Kafka. If state does not exist, save  this event to
 the Cassandra, then emit this event to the Kafka.

 Stream2: Messages reading from the second topic. Read state from
 Cassandra using the *uuid*. If state exists, check a column that
 represents this event came from topic2. If the value of this column is
 false, enrich the event using state and update the Cassandra column as
 true. If true, ignore this event because this event is a duplicate.

 def checkDeDuplication(event): Option[Event] = {
   val state = readFromCassandra(state)
   if (state exist) None //ignore this event
   else {
 saveEventToCassandra(event)
 Some(event)
   }
 }

 def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
   val state = readFromCassandra(state)
   if (state does not exist) None //ignore this event
   else {
 if (state.flag == true) None // ignore this event
 else {
updateFlagAsTrueInCassandra(event)
Some(event)
 }
   }
 }


 val stream1 = 
 readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
 val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
 stream1.union(stream2).addSink(kafkaSink)

 1- Is that a good approach?

 2- Is Cassandra the right choice here? Note, the state size is very
 large and I have to feed the state from batch flow firstly. Thus I can not
 use the internal state like rocksdb.

 3- Can i improve this logic?

 4- May be any bottleneck in that flow? I think to use asyncMap
 functions for state read/write operations.

>>>


Re: pojo warning when using auto generated protobuf class

2021-04-24 Thread Yun Gao
Hi Prashant,

I think the warn is given when calling 

return TypeInformation.of(Trace.APITrace::class.java)

Currently flink does not have the native support 
for the protobuf types yet[1], thus it would use a
generic serializer created by kryo. 

This should not affect the rightness of the program
and should only affect its performance. One possible
solution might be register custom serializer into the kryo 
serializer framework for protobuf classes, like the example in [2].

Best,
Yun

[1] https://issues.apache.org/jira/browse/FLINK-11333
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html


 --Original Mail --
Sender:Prashant Deva 
Send Date:Sat Apr 24 11:00:17 2021
Recipients:User 
Subject:pojo warning when using auto generated protobuf class

I am seeing this warning msg when trying to use a custom protobuf de/serializer 
with kafka source with auto generated java protobuf class:

18:41:31.164 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - 
Class class com.xx.APITrace cannot be used as a POJO type because not all 
fields are valid POJO fields, and must be processed as GenericType. Please read 
the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance.

here is my serializer. What am i doing wrong?

class ApiTraceSchema: DeserializationSchema, 
SerializationSchema {

override fun getProducedType(): TypeInformation {
return TypeInformation.of(Trace.APITrace::class.java)
}

override fun deserialize(message: ByteArray): Trace.APITrace {
return Trace.APITrace.parseFrom(message)
}

override fun isEndOfStream(nextElement: Trace.APITrace): Boolean {
return false
}

override fun serialize(element: Trace.APITrace): ByteArray {
return element.toByteArray()
}
}


Re: Approaches for external state for Flink

2021-04-24 Thread David Anderson
>
> What are the other techniques for bootstrapping rocksdb state?


Bootstrapping state involves somehow creating a snapshot (typically a
savepoint, but a retained checkpoint can be a better choice in some cases)
containing the necessary state -- meaning that the state has the same
operator uid and and state descriptor used by the real streaming job.

You can do this by either: (1) running a variant of the live streaming job
against the data used for bootstrapping and taking a snapshot when the data
has been fully ingested, or (2) by using the State Processor API [1].
You'll find a trivial example of the second approach in [2]. Once you have
a suitable snapshot, you can run your real job against it.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/libs/state_processor_api.html
[2] https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf

Regards,
David

On Sat, Apr 24, 2021 at 3:01 PM Omngr  wrote:

> Hi David, thank you for your response first!
>
> The state size is about 1 TB for now, but it will increase fastly, and
> also I can not use the TLL for states. It will grow indefinitely.
> What are the other techniques for bootstrapping rocksdb state?
>
> David Anderson , 24 Nis 2021 Cmt, 15:43 tarihinde
> şunu yazdı:
>
>> Oguzhan,
>>
>> Note, the state size is very large and I have to feed the state from
>>> batch flow firstly. Thus I can not use the internal state like rocksdb.
>>
>>
>> How large is "very large"? Using RocksDB, several users have reported
>> working with jobs using many TBs of state.
>>
>> And there are techniques for bootstrapping the state. That doesn't have
>> to be a showstopper.
>>
>> May be any bottleneck in that flow? I think to use asyncMap functions for
>>> state read/write operations.
>>
>>
>> That's a good reason to reconsider using Flink state.
>>
>> Regards,
>> David
>>
>>
>>
>> On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır <
>> sosyalmedya.oguz...@gmail.com> wrote:
>>
>>> I'm trying to design a stream flow that checks *de-duplicate* events
>>> and sends them to the Kafka topic.
>>>
>>> Basically, flow looks like that;
>>>
>>> kafka (multiple topics) =>  flink (checking de-duplication and event
>>> enrichment) => kafka (single topic)
>>>
>>> For de-duplication, I'm thinking of using Cassandra as an external state
>>> store. The details of my job;
>>>
>>> I have an event payload with *uuid* Field. If the event that has the
>>> same uuid will come, this event should be discarded. In my case, two kafka
>>> topics are reading. The first topic has a lot of fields, but other topics
>>> just have a *uuid* field, thus I have to enrich data using the same
>>> uuid for the events coming from the second topic.
>>>
>>> Stream1: Messages reading from the first topic. Read state from
>>> Cassandra using the *uuid*. If a state exists, ignore this event and *do
>>> not* emit to the Kafka. If state does not exist, save  this event to
>>> the Cassandra, then emit this event to the Kafka.
>>>
>>> Stream2: Messages reading from the second topic. Read state from
>>> Cassandra using the *uuid*. If state exists, check a column that
>>> represents this event came from topic2. If the value of this column is
>>> false, enrich the event using state and update the Cassandra column as
>>> true. If true, ignore this event because this event is a duplicate.
>>>
>>> def checkDeDuplication(event): Option[Event] = {
>>>   val state = readFromCassandra(state)
>>>   if (state exist) None //ignore this event
>>>   else {
>>> saveEventToCassandra(event)
>>> Some(event)
>>>   }
>>> }
>>>
>>> def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
>>>   val state = readFromCassandra(state)
>>>   if (state does not exist) None //ignore this event
>>>   else {
>>> if (state.flag == true) None // ignore this event
>>> else {
>>>updateFlagAsTrueInCassandra(event)
>>>Some(event)
>>> }
>>>   }
>>> }
>>>
>>>
>>> val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
>>> val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
>>> stream1.union(stream2).addSink(kafkaSink)
>>>
>>> 1- Is that a good approach?
>>>
>>> 2- Is Cassandra the right choice here? Note, the state size is very
>>> large and I have to feed the state from batch flow firstly. Thus I can not
>>> use the internal state like rocksdb.
>>>
>>> 3- Can i improve this logic?
>>>
>>> 4- May be any bottleneck in that flow? I think to use asyncMap functions
>>> for state read/write operations.
>>>
>>


Re: Approaches for external state for Flink

2021-04-24 Thread David Anderson
Oguzhan,

Note, the state size is very large and I have to feed the state from batch
> flow firstly. Thus I can not use the internal state like rocksdb.


How large is "very large"? Using RocksDB, several users have reported
working with jobs using many TBs of state.

And there are techniques for bootstrapping the state. That doesn't have to
be a showstopper.

May be any bottleneck in that flow? I think to use asyncMap functions for
> state read/write operations.


That's a good reason to reconsider using Flink state.

Regards,
David



On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır <
sosyalmedya.oguz...@gmail.com> wrote:

> I'm trying to design a stream flow that checks *de-duplicate* events and
> sends them to the Kafka topic.
>
> Basically, flow looks like that;
>
> kafka (multiple topics) =>  flink (checking de-duplication and event
> enrichment) => kafka (single topic)
>
> For de-duplication, I'm thinking of using Cassandra as an external state
> store. The details of my job;
>
> I have an event payload with *uuid* Field. If the event that has the same
> uuid will come, this event should be discarded. In my case, two kafka
> topics are reading. The first topic has a lot of fields, but other topics
> just have a *uuid* field, thus I have to enrich data using the same uuid
> for the events coming from the second topic.
>
> Stream1: Messages reading from the first topic. Read state from Cassandra
> using the *uuid*. If a state exists, ignore this event and *do not* emit
> to the Kafka. If state does not exist, save  this event to the Cassandra,
> then emit this event to the Kafka.
>
> Stream2: Messages reading from the second topic. Read state from Cassandra
> using the *uuid*. If state exists, check a column that represents this
> event came from topic2. If the value of this column is false, enrich the
> event using state and update the Cassandra column as true. If true, ignore
> this event because this event is a duplicate.
>
> def checkDeDuplication(event): Option[Event] = {
>   val state = readFromCassandra(state)
>   if (state exist) None //ignore this event
>   else {
> saveEventToCassandra(event)
> Some(event)
>   }
> }
>
> def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
>   val state = readFromCassandra(state)
>   if (state does not exist) None //ignore this event
>   else {
> if (state.flag == true) None // ignore this event
> else {
>updateFlagAsTrueInCassandra(event)
>Some(event)
> }
>   }
> }
>
>
> val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
> val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
> stream1.union(stream2).addSink(kafkaSink)
>
> 1- Is that a good approach?
>
> 2- Is Cassandra the right choice here? Note, the state size is very large
> and I have to feed the state from batch flow firstly. Thus I can not use
> the internal state like rocksdb.
>
> 3- Can i improve this logic?
>
> 4- May be any bottleneck in that flow? I think to use asyncMap functions
> for state read/write operations.
>