??????????Async I/O??exactly-once

2019-09-03 文章 star
??io?? 
??at least once??The 
asynchronous I/O operator offers full exactly-once fault tolerance guarantees. 
It stores the records for in-flight asynchronous requests in checkpoints and 
restores/re-triggers the requests when recovering from a failure.

iPhone


--  --
??: Dino Zhang 

Re: Streaming File Sink疑问

2019-09-03 文章 周美娜
我的做法是 重新配置 HADOOP_CONF_DIR 环境变量:在flink集群里面配置 core-site.xml 和 hdfs-site.xml,同时将  
HADOOP_CONF_DIR 环境变量 指向这个文件目录
> 在 2019年9月4日,上午11:16,戴嘉诚  写道:
> 
> 大家好:
>   我在看到streamingFileSink 
> 中看到可以把数据转成文件写入到Flink的fileSystem中,但是这里有个问题就是,我写入的hdfs不是在flink的集群(集群A)上,是在另一个hdfs集群(集群B)上,然后那个集群(集群B)上面配置了namenode的HA,如果只是直接指定namenode感觉不怎么可靠,但是flink默认的flinkSystem中,是指定了flink默认的hdfs集群(集群A),请问这个连接器中,能单独指定fs的配置吗?因为flink开启了checkpoint,而checkpoint的默认路径是在fink自身的集群上(集群A),所以不能粗暴的直接把默认的fileSystem直接指向集群B。谢谢
> 



Streaming File Sink疑问

2019-09-03 文章 戴嘉诚
大家好:
我在看到streamingFileSink 
中看到可以把数据转成文件写入到Flink的fileSystem中,但是这里有个问题就是,我写入的hdfs不是在flink的集群(集群A)上,是在另一个hdfs集群(集群B)上,然后那个集群(集群B)上面配置了namenode的HA,如果只是直接指定namenode感觉不怎么可靠,但是flink默认的flinkSystem中,是指定了flink默认的hdfs集群(集群A),请问这个连接器中,能单独指定fs的配置吗?因为flink开启了checkpoint,而checkpoint的默认路径是在fink自身的集群上(集群A),所以不能粗暴的直接把默认的fileSystem直接指向集群B。谢谢



????: flink??????????????????????taskmanager

2019-09-03 文章 pengcheng...@bonc.com.cn
flink??slave3bin/taskmanager.sh start??



pengcheng...@bonc.com.cn
 
 
?? 2019-09-04 10:27
 user-zh
?? flink??taskmanager
??
 
 
 ??flink??1??jobmanager,??2??taskmanager(slave1,slave2)??
??slave3 
taskmnanager??spark

 
 
--



Re: flink集群模式下如何动态添加taskmanager

2019-09-03 文章 Wesley Peng

Hi

on 2019/9/4 10:27, 如影随形 wrote:

  在flink集群部署时,有1个jobmanager,有2个taskmanager(slave1,slave2),
现在想添加slave3作为 taskmnanager。如何在不停止集群的情况下,可以像spark一样动态添加吗?


AFAIK the answer is NO for now. However, community tells that this has 
been under consideration from the FLIP-6 Flink Development and Process 
Model.


https://docs.google.com/document/d/1zwBP3LKnJ0LI1R4-9hLXBVjOXAkQS5jKrGYYoPz-xRk/edit#


flink??????????????????????taskmanager

2019-09-03 文章 ????????
??


 ??flink??1??jobmanager,??2??taskmanager(slave1,slave2)??
??slave3 
taskmnanager??spark



--


回复: Flink 周期性创建watermark,200ms的周期是怎么控制的

2019-09-03 文章 Yuan,Youjun
源码参考:PeriodicWatermarkEmitter


-邮件原件-
发件人: Dino Zhang  
发送时间: Tuesday, September 3, 2019 3:14 PM
收件人: user-zh@flink.apache.org
主题: Re: Flink 周期性创建watermark,200ms的周期是怎么控制的

hi venn,


基于EventTIme的Watermark间隔默认200ms,可以通过ExecutionConfig的setAutoWatermarkInterval方法进行设置,见StreamExecutionEnvironment:

  /**
   * Sets the time characteristic for all streams create from this environment, 
e.g., processing
   * time, event time, or ingestion time.
   *
   * If you set the characteristic to IngestionTime of EventTime this will 
set a default
   * watermark update interval of 200 ms. If this is not applicable for your 
application
   * you should change it using {@link
ExecutionConfig#setAutoWatermarkInterval(long)}.
   *
   * @param characteristic The time characteristic.
   */
  @PublicEvolving
  public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
  getConfig().setAutoWatermarkInterval(0);
} else {
  getConfig().setAutoWatermarkInterval(200);
}
  }



On Tue, Sep 3, 2019 at 2:39 PM venn  wrote:

> 各位大佬, 今天看flink 指派Timestamp 和watermark 的源码,发现周期性创建
> watermark 确实是周期性的,从打印到控制台的时间可以看到差不多是200毫秒执行一
> 次, 200毫秒是在哪里控制的,在debug 的调用栈找不到(源码位置)?
>
>
>
> 周期新创建watermark  方法如下:
>
> .assignAscendingTimestamps(element =>
> sdf.parse(element.createTime).getTime)
>
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor[Event](Time.milliseconds(50))
>
>
>
>
>
> 生成Timestamp的方法:
>
> TimestampsAndPeriodicWatermarksOperator 类的 :
>
>
>
>
> @Override
> public void processElement(StreamRecord element) throws Exception {
>final long newTimestamp =
> userFunction.extractTimestamp(element.getValue(),
>  element.hasTimestamp() ? element.getTimestamp() : 
> Long.MIN_VALUE);
>
>output.collect(element.replace(element.getValue(), newTimestamp)); 
> }
>
>
>
>
>
> 生成watermark的方法:
>
> TimestampsAndPeriodicWatermarksOperator 类的 :
>
>
> @Override
> public void onProcessingTime(long timestamp) throws Exception {
>// 从这里可以看到,每200ms 打印一次
>System.out.println("timestamp : " + timestamp + ", system.current : 
> " + System.currentTimeMillis());
>// register next timer
>Watermark newWatermark = userFunction.getCurrentWatermark();
>if (newWatermark != null && newWatermark.getTimestamp() >
> currentWatermark) {
>   currentWatermark = newWatermark.getTimestamp();
>   // emit watermark
>   output.emitWatermark(newWatermark);
>}
>
>long now = getProcessingTimeService().getCurrentProcessingTime();
>getProcessingTimeService().registerTimer(now + watermarkInterval, 
> this); }
>
>
>
>
>
>
>
> 感谢各位大佬
>
>


Re: 关于Async I/O的exactly-once

2019-09-03 文章 Dino Zhang
hi star,

exactly-once指flink内部的,要保证end-to-end
exactly可以通过两阶段提交,需要实现TwoPhaseCommitSinkFunction,或者做幂等处理

On Wed, Sep 4, 2019 at 8:20 AM star <3149768...@qq.com> wrote:

> 看文档我的理解是会将异步的请求保存在检查点中,failover的时候重新触发请求。我的问题是既然是重新触发请求,并没有回滚,那之前的请求已经对外部系统造成影响了,不就是at
> least-once了吗?
> 比如ck1:发送了a b c三个请求更新外部数据库,ck2:发送:d,e,f。假设ck1做完了checkpoint,a ,b请求成功,c没成功.
>
>
> ck2在执行到e的时候任务被cancel了,但c,d都已经成功了。那么我重新启动的时候从最近一次成功的ck1拉起,c,d岂不是又要被重新请求一次
>
>
> 谢谢
>
> 发自我的iPhone



-- 
Regards,
DinoZhang


????Async I/O??exactly-once

2019-09-03 文章 star
??failover??at
 least-once??
ck1:??a b 
cck2d??e??f??ck1??checkpoint??a 
??b??c??.


ck2ecancel??c??d??ck1??c??d




iPhone

回复: Flink SQL 时间问题

2019-09-03 文章 Jimmy Wong
Hi:
时间你可以转成Long,关于UTC,你说要生成Table,这样的话如果用的是SQL,可以采用UDF进行转换


| |
Jimmy
|
|
wangzmk...@163.com
|
签名由网易邮箱大师定制


在2019年09月3日 21:25,JingsongLee 写道:
Hi:
1.是的,目前只能是UTC,如果你有计算要求,你可以考虑改变的业务的窗口时间。
2.支持long的,你输入是不是int才会报错的,具体报错的信息?

Best,
Jingsong Lee


--
From:hb <343122...@163.com>
Send Time:2019年9月3日(星期二) 10:44
To:user-zh 
Subject:Flink SQL 时间问题

使用kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table


```
...


schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()


schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000)
)
```


问题1.  生成的 _proctime 处理时间字段, 结果显示的时区是UTC, 怎么调整成 +8时区.
问题2.  eventTime 事件时间字段怎么支持Long类型.


我输入到kafka记录为 {"eventTime": 10, "id":1,"name":"hb"} 会提示 eventTime 字段类型问题

Re: Flink SQL 时间问题

2019-09-03 文章 JingsongLee
Hi:
1.是的,目前只能是UTC,如果你有计算要求,你可以考虑改变的业务的窗口时间。
2.支持long的,你输入是不是int才会报错的,具体报错的信息?

Best,
Jingsong Lee


--
From:hb <343122...@163.com>
Send Time:2019年9月3日(星期二) 10:44
To:user-zh 
Subject:Flink SQL 时间问题

使用kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table


```
  ...


  schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()


  schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
  new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000)
)
```


问题1.  生成的 _proctime 处理时间字段, 结果显示的时区是UTC, 怎么调整成 +8时区.
问题2.  eventTime 事件时间字段怎么支持Long类型.


我输入到kafka记录为 {"eventTime": 10, "id":1,"name":"hb"} 会提示 eventTime 字段类型问题

Re: 回复: flink使用StateBackend问题

2019-09-03 文章 Yun Tang
Hi


  1.  Checkpoint 超时时间设置是多少(是默认的10min么),如果超时时间太短,容易checkpoint failure
  2.  所有的subtask都是n/a 么,source 
task的checkpoint没有rocksDb的参与,与使用默认的MemoryStateBackend其实是一样的,不应该source 
task也没有完成checkpoint(除非一直都拿不到StreamTask里面的锁,一直都在process element)
  3.  作业的反压情况如何,是不是使用RocksDB时候存在严重的反压(back 
pressure)情况?如果作业反压的话,barrier一直都流不到下游,容易造成checkpoint超时。

建议分享一下作业webUI上的checkpoint 信息。

祝好
唐云

From: Wesley Peng 
Sent: Tuesday, September 3, 2019 15:44
To: user-zh@flink.apache.org 
Subject: Re: 回复: flink使用StateBackend问题



on 2019/9/3 15:38, 守护 wrote:
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
> message for now expired checkpoint attempt 3 from 
> 24674987621178ed1a363901acc5b128 of job fd5010cbf20501339f1136600f0709c3.
> 请问这个是什么问题呢?

可以根据这些失败的task的id去查询这些任务落在哪一个taskmanager上,经过排查发现,是同一台机器,通过ui看到该机器流入的数据明显比别的流入量大
因此是因为数据倾斜导致了这个问题,追根溯源还是下游消费能力不足的问题

also reference:
https://juejin.im/post/5c374fe3e51d451bd1663756


Re: 回复: flink使用StateBackend问题

2019-09-03 文章 Wesley Peng




on 2019/9/3 15:38, 守护 wrote:

org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 3 from 
24674987621178ed1a363901acc5b128 of job fd5010cbf20501339f1136600f0709c3.
请问这个是什么问题呢?


可以根据这些失败的task的id去查询这些任务落在哪一个taskmanager上,经过排查发现,是同一台机器,通过ui看到该机器流入的数据明显比别的流入量大 
因此是因为数据倾斜导致了这个问题,追根溯源还是下游消费能力不足的问题


also reference:
https://juejin.im/post/5c374fe3e51d451bd1663756


?????? flink????StateBackend????

2019-09-03 文章 ????
1.FlinkAPI??windowenv.setStateBackend(new 
RocksDBStateBackend("hdfs://host51:9000/flink/checkpoints",true))rocksDBcheckpoint??Completed
 checkpoint 142 for job 25a50baff7d16ee22aecb7b1 (806418 bytes in 426 
ms).
2.??SQLwindowrocksDB??checkpointwindow??checkpoint??n/a
2019-09-03 15:17:52,142 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 1 from 
915988718742f11135a6e96ba42c4e35 of job fd5010cbf20501339f1136600f0709c3. 
2019-09-03 15:17:52,143 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 1 from 
191ca20964de323300551d9a144c272c of job fd5010cbf20501339f1136600f0709c3. 
2019-09-03 15:17:52,144 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 1 from 
2a24625ac03ea6ac4b8632719768c502 of job fd5010cbf20501339f1136600f0709c3. 
2019-09-03 15:17:52,144 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 1 from 
51b376365d044c672677c38ca1caee78 of job fd5010cbf20501339f1136600f0709c3. 
2019-09-03 15:17:52,227 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 1 from 
9266a7307add46cb26f5a9352c82ceb0 of job fd5010cbf20501339f1136600f0709c3. 
2019-09-03 15:18:21,881 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 4 from 
24674987621178ed1a363901acc5b128 of job fd5010cbf20501339f1136600f0709c3. 
2019-09-03 15:18:21,882 WARN  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
message for now expired checkpoint attempt 3 from 
24674987621178ed1a363901acc5b128 of job fd5010cbf20501339f1136600f0709c3.
??




--  --
??: "Wesley Peng";
: 2019??9??3??(??) 3:24
??: "user-zh";

: Re: flinkStateBackend



Hi

on 2019/9/3 12:14,  wrote:
> ??StateBackend??env.setStateBackend(new 
> RocksDBStateBackend("hdfs://host51:9000/flink/checkpoints",true))SQLwindowcheckpoint??n/a??
> 
> flink-conf.xml??
> state.checkpoints.dir: 
> hdfs://host51:9000/flink/flink-checkpoints??
> SQLwindow??RocksDBStateBackend


It seems like a filesystem issue, though I have no experience on it too.

regards.

Re: flink使用StateBackend问题

2019-09-03 文章 Wesley Peng

Hi

on 2019/9/3 12:14, 々守护々 wrote:

现象:我是在代码中配置StateBackend的,代码是这样写的env.setStateBackend(new 
RocksDBStateBackend("hdfs://host51:9000/flink/checkpoints",true)),但是我发现在下面的SQLwindow中始终在checkpoint那就过不去,总是报n/a,

下面我就把代码中的这句给注释掉了,在flink-conf.xml文件中配置
state.checkpoints.dir: hdfs://host51:9000/flink/flink-checkpoints这样就能正常执行,
请问SQLwindow和RocksDBStateBackend有什么冲突吗,上面的原因是什么?



It seems like a filesystem issue, though I have no experience on it too.

regards.


Re: Flink 周期性创建watermark,200ms的周期是怎么控制的

2019-09-03 文章 Dino Zhang
hi venn,


基于EventTIme的Watermark间隔默认200ms,可以通过ExecutionConfig的setAutoWatermarkInterval方法进行设置,见StreamExecutionEnvironment:

  /**
   * Sets the time characteristic for all streams create from this
environment, e.g., processing
   * time, event time, or ingestion time.
   *
   * If you set the characteristic to IngestionTime of EventTime
this will set a default
   * watermark update interval of 200 ms. If this is not applicable
for your application
   * you should change it using {@link
ExecutionConfig#setAutoWatermarkInterval(long)}.
   *
   * @param characteristic The time characteristic.
   */
  @PublicEvolving
  public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
  getConfig().setAutoWatermarkInterval(0);
} else {
  getConfig().setAutoWatermarkInterval(200);
}
  }



On Tue, Sep 3, 2019 at 2:39 PM venn  wrote:

> 各位大佬, 今天看flink 指派Timestamp 和watermark 的源码,发现周期性创建
> watermark 确实是周期性的,从打印到控制台的时间可以看到差不多是200毫秒执行一
> 次, 200毫秒是在哪里控制的,在debug 的调用栈找不到(源码位置)?
>
>
>
> 周期新创建watermark  方法如下:
>
> .assignAscendingTimestamps(element =>
> sdf.parse(element.createTime).getTime)
>
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor[Event](Time.milliseconds(50))
>
>
>
>
>
> 生成Timestamp的方法:
>
> TimestampsAndPeriodicWatermarksOperator 类的 :
>
>
>
>
> @Override
> public void processElement(StreamRecord element) throws Exception {
>final long newTimestamp =
> userFunction.extractTimestamp(element.getValue(),
>  element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
>
>output.collect(element.replace(element.getValue(), newTimestamp));
> }
>
>
>
>
>
> 生成watermark的方法:
>
> TimestampsAndPeriodicWatermarksOperator 类的 :
>
>
> @Override
> public void onProcessingTime(long timestamp) throws Exception {
>// 从这里可以看到,每200ms 打印一次
>System.out.println("timestamp : " + timestamp + ", system.current : " +
> System.currentTimeMillis());
>// register next timer
>Watermark newWatermark = userFunction.getCurrentWatermark();
>if (newWatermark != null && newWatermark.getTimestamp() >
> currentWatermark) {
>   currentWatermark = newWatermark.getTimestamp();
>   // emit watermark
>   output.emitWatermark(newWatermark);
>}
>
>long now = getProcessingTimeService().getCurrentProcessingTime();
>getProcessingTimeService().registerTimer(now + watermarkInterval, this);
> }
>
>
>
>
>
>
>
> 感谢各位大佬
>
>


Flink 周期性创建watermark,200ms的周期是怎么控制的

2019-09-03 文章 venn
各位大佬, 今天看flink 指派Timestamp 和watermark 的源码,发现周期性创建
watermark 确实是周期性的,从打印到控制台的时间可以看到差不多是200毫秒执行一
次, 200毫秒是在哪里控制的,在debug 的调用栈找不到(源码位置)?

 

周期新创建watermark  方法如下:

.assignAscendingTimestamps(element =>
sdf.parse(element.createTime).getTime)

.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[Event](Time.milliseconds(50))

 

 

生成Timestamp的方法:

TimestampsAndPeriodicWatermarksOperator 类的 :

 


@Override
public void processElement(StreamRecord element) throws Exception {
   final long newTimestamp =
userFunction.extractTimestamp(element.getValue(),
 element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

   output.collect(element.replace(element.getValue(), newTimestamp));
}

 

 

生成watermark的方法:

TimestampsAndPeriodicWatermarksOperator 类的 :


@Override
public void onProcessingTime(long timestamp) throws Exception {
   // 从这里可以看到,每200ms 打印一次
   System.out.println("timestamp : " + timestamp + ", system.current : " +
System.currentTimeMillis());
   // register next timer
   Watermark newWatermark = userFunction.getCurrentWatermark();
   if (newWatermark != null && newWatermark.getTimestamp() >
currentWatermark) {
  currentWatermark = newWatermark.getTimestamp();
  // emit watermark
  output.emitWatermark(newWatermark);
   }

   long now = getProcessingTimeService().getCurrentProcessingTime();
   getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}

 

 

 

感谢各位大佬