官网关于Table和DataStream相互转化部分:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#converting-between-datastream-and-table
①样例代码中,
// interpret the insert-only Table as a DataStream again
DataStream resultStream = tableEnv.toDataStream(resultTable);
请教下,flinkCEP只能用在eventTime 模式下吗,因为我发现写了个cep程序,申明采用processingTime,没有数据发出
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我有一个flinkCEP程序,采用eventTime,监控形如如下的数据
[13/May/2021:20:45:36 +0800]
[13/May/2021:20:45:36 +0800]
[13/May/2021:20:45:37 +0800]
[13/May/2021:20:45:37 +0800]
[13/May/2021:20:45:50 +0800]
程序中关键设置如下:
设置了水印延迟2s
跳过测略AfterMatchSkipStrategy.skipPastLastEvent()
.times(3)
.within(Time.seconds(3));
我有一个flinkCEP的程序,检测nginx日志,假如同一ip,60s内超过3次访问,则报警。
我访问了7次,代号为1~7
检测到了4组报警分别是
[/1, /2, /3]
[/2, /3, /4]
[/3, /4, /5]
[/4, /5, /6]
请问下,如果想之前已经参与过匹配的数据不再参与匹配,应该怎样做,比如其实我想得到两组报警:
[/1, /2, /3]
[/4, /5, /6]
如下是我检测的关键代码:
Pattern pattern =
Pattern.begin("start").times(3).within(Time.seconds(60));
--
我最近正在研究flink
Connector相关的内容,官网:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/;又了解到Flink
CDC相关的概念:https://github.com/ververica/flink-cdc-connectors;请教一下flink
Connector和Flink CDC二者之间是什么样的关系?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
val outputTagDate = OutputTag[String]("Date-side-output")
你的outputtag定义的泛型是string
ctx.output(outputTagDate,Date(first_retain,noob_endtime,noob_first_endtime))
这个Date是个什么函数,返回的是string类型么
--
Sent from: http://apache-flink.147419.n8.nabble.com/
请教一下,flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
和kafka自己的"enable.auto.commit"=true【默认就是true,
interval=5s】,在checkpoint的时候有啥区别,假如我已经enable了chk?
看注释flinkKafkaConsumer.setCommitOffsetsOnCheckpoints()方法的注释如下:
/**
* Specifies
flink1.12正常程序中,有如下告警:
19:38:37,557 WARN org.apache.kafka.clients.consumer.ConsumerConfig
[] - The configuration 'flink.partition-discovery.interval-millis' was
supplied but isn't a known config.
我有一行如下配置:
我使用flink1.12版本,采用flink-kafka-connector从kafka的topicA中读取数据,然后sink会topicB,在sink
to topicB的FlinkProducer设置如下时,程序会偶现报错,去掉后异常消失,请问是什么原因呢?
flinkKafkaProducer.setWriteTimestampToKafka(true);
--
Sent from: http://apache-flink.147419.n8.nabble.com/
查了些资料,好像说是因为FlinkKafkaProducer.setWriteTimestampToKafka(true);导致的,我使用的是flink1.12.1,
相关代码片段如下,请教是什么原因导致的呢?
//sink
Properties producerPro = new Properties();
producerPro.setProperty("bootstrap.servers",kafkaAddr);
程序一直正常运行,后来突然偶尔报错如下,显示flatMap的Collect时出错:
我的flatMap transform操作代码片段如下,收集的数据是来自kafka的topic
--
SingleOutputStreamOperator text2Bean =
consumerRecordDataStreamSource.flatMap(new FlatMapFunction() {
@Override
我写了一个flink kafka connector的作业,从kafka topicA消费数据,做处理后,又sink回 topicB,
程序正常running中,偶现如下报错:
java.net.SocketException: Permission denied: connect
at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231]
at
中间还有这样的错误:
20:14:48,707 WARN org.apache.kafka.common.utils.AppInfoParser
[] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException:
kafka.producer:type=app-info,id="producer-Source: Custom Source -> Flat Map
-> Map -> Sink:
我写了一个stream程序,从kafka
topicA中读取数据,进行简单预处理后,sink回kafka的topicB,程序本地正常running,但是中间报了一些错误如下:
-
20:11:47,078 INFO org.apache.kafka.clients.Metadata
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink:
好的,谢谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
如题,除了通过这种全局配置文件中的方式修改,能在程序中通过代码的方式修改吗
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我写了一个带状态的function
采用了如下cp配置:
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
env.getCheckpointConfig().setCheckpointTimeout(6L);
有个疑问,
如下程序片段:
--
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",kafkaAddr);
properties.setProperty("group.id",kafkaOdsGroup);
properties.setProperty("auto.offset.reset",kafkaOdsAutoOffsetReset);
谢谢!
我摘录的是flink1.11.2版本文档最后那部分:Background / Internals,介绍flink 如何在yarn上运行的
的内容:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html
。感觉版本比较新了,应该没有问题吧,也是我们生产上在用的版本。1.12版本中没有找到相关内容。
仔细看了下文档,可能是我对flink on yarn的理解不太清楚,还是有几个问题请教下:
①flink on yarn模式下,jobmanager 和
或者说,我知道,对于MapReduce任务,ApplicationMaster的实现是MRAppMaster,那flink on yarn
,ApplicationMaster对应的实现是啥?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
flink-parcel是什么提交方式,能详细发下么。如果采用per-job mode 或者application mode ,各个job的flink
集群在yarn上是独立的,kill一个job并不会影响宁一个
--
Sent from: http://apache-flink.147419.n8.nabble.com/
flink on yarn中,yarn的applicationMaster和flink
JobManager的关系是啥,我对yarn不是很熟悉,之前的理解是
JobManager就是yarn中的applicationMaster的角色。但我在官网中看到如下摘录:...Once that has
finished, the ApplicationMaster (AM) is started.The JobManager and AM are
running in the same container. Once they successfully started, the AM knows
应该说是否:1.11和1.12这里这两种提交方式 是不是一样的,只不过命令有了变化?
官网中的摘录如下:
flink-1.11:
Run a single Flink job on YARN
Example:
./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
--
flink-1.12:
Per-Job Cluster Mode
Example:
./bin/flink run -t
如题,在 ./flink --help中看到提交job的命令有两个相似的,貌似都会将jobManager发送yarn
node上去之行,但不明白他们区别,官网也未找到他们的区别,请帮忙解释下他们之间的区别?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
谢答。查看我的pom.xml文件,和打包后的压缩包,确实包含kafka(org.apache.kafka.common)的相关依赖;所以我将相关的pom中的依赖都设置为provide,然后重新打包,并确认了我打好的jar包中不包含了任何kafka的依赖,发布运行,这次jobmanager直接报错:Caused
by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
我写了一个 process
function的demo,自定义source产生数据sink到kafka,然后发布到yarn集群运行,flink版本是1.11.2,采用application
Mode 部署,然后发现jobmanager-log报错: Failed to construct kafka producer;Caused by:
org.apache.kafka.common.KafkaException: class
org.apache.kafka.common.serialization.ByteArraySerializer is not an instance
of
测试代码如下:
--
public class Sink_KafkaSink_1{
public static void main(String[] args) throws Exception {
final ParameterTool params =
ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties"));
String host =
flink1.11.2
自定义source循环产生数据然后sink到kafka
采用application Mode部署作业到yarn,
jobmanager.log报错如下:(jobmanager和taskmanager的container都分配了,报错都是如下)
2021-01-21 10:52:17,742 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to
jobManager的完整报错日志如下:
2021-01-21 07:53:23,023 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
2021-01-21 07:53:23,027 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
flink
processFunction程序,main()中采用ParameterTool读取resources文件夹下的pro.properties配置文件(kafka地址等);IDEA本地执行完全OK,maven打成jar包后,采用yarn
application的方式部署作业, bin/flink run-application -t yarn-application
/opt/quickstart-0.1.jar ;作业失败,查看yarn的container日志发现如下错误:
Caused by:
好的,谢谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
operator操作:processWindowFunction的代码如下:
class MyProcessWindowFuncation extends
ProcessWindowFunction>, String, TimeWindow>{
private transient MapState>
eveShareNoMaxPrice;
private transient ValueState>> shareAndMaxPrice;
@Override
public void process(String s, Context
我有如下代码,从kafka消费数据,然后根据数据所在的秒(服务器时钟)进行keyby,获取数据所在的分钟的代码:
public static String timeStampToDate(Long timestamp){
ThreadLocal threadLocal =
ThreadLocal.withInitial(() -> new SimpleDateFormat("-MM-dd HH:mm:ss"));
String format = threadLocal.get().format(new Date(timestamp));
33 matches
Mail list logo