Re:回复:flink时间窗口

2020-07-08 Thread 爱吃鱼



Hi,






因为业务原因具体的keyby字段没有写清楚,我是根据warningPojo类里面的字段进行排序,源数据
是从kafka实时流传输过来的,每一分钟滑动窗口计算一次


SingleOutputStreamOperator> operator1 = 
env.addSource(stringFlinkKafkaConsumerBase)
.filter((String s) -> (s.split(",", -1).length == 34))
.flatMap(new RichFlatMapFunction() {
.keyBy("src", "msg")
.timeWindow(Time.minutes(1))
.process(new ProcessWindowFunction, Tuple, TimeWindow>() 
.setParallelism(1);




每次执行这段流代码就只有第一次的一分钟时间窗口有数据传输到es,之后就没有数据了。





在 2020-07-09 13:09:32,"Yichao Yang" <1048262...@qq.com> 写道:
>Hi,
>
>
>根据你的keyby字段来看,你是根据 warningPojo + String 进行了keyby,可以看下是否相同的key只有一条相同数据。
>并且可以看下使用到的是处理时间还是事件时间?
>如果是事件时间,可以看下 timestamp assigner 是否正确,上游数据和时间戳是否符合预期。
>
>
>Best,
>Yichao Yang
>
>
>
>
>--原始邮件--
>发件人:"爱吃鱼"发送时间:2020年7月9日(星期四) 中午11:37
>收件人:"user-zh"
>主题:flink时间窗口
>
>
>
>你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下
>SingleOutputStreamOperatorflatMap.keyBy(0,1)
>
> .timeWindow(Time.minutes(1))
>
> .process(new ProcessWindowFunction)
>当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。


回复:flink 1.11 on kubernetes 构建失败

2020-07-08 Thread SmileSmile
hi

按照新版本的部署文件[1],会部署失败.如果将部署文件改用1.10版本,只是修改镜像文件和log4j文件,可以成功构建[2]。


目前看差别在于1.11启动jm和tm是通过args: 
["jobmanager"]的方法,通过docker-entrypoint.sh[3]看到调用set_common_options方法的时候会sed 
本地挂载的flink-configuration-configmap.yaml导致失败。


1.10 版本是通过$FLINK_HOME/bin/jobmanager.sh启动。

command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
 while :;
 do
   if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
 then tail -f -n +1 log/*jobmanager*.log;
   fi;
 done"]


如果遇到该问题的,沿用1.10版本的部署方式部署1.11镜像可以成功。  1.11 版本的部署方式如果有大佬可以走通的,求分享。



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#session-cluster-resource-definitions
[3] 
https://github.com/apache/flink-docker/blob/master/1.11/scala_2.11-debian/docker-entrypoint.sh



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月08日 16:38,SmileSmile 写道:
hi yun tang!

没有对 /opt/flink/config 目录下的文件做写操作。 只是按照官网上的配置文件进行部署,镜像用的也是社区的镜像。
best!




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月08日 16:29,Yun Tang 写道:
Hi

你是不是对 /opt/flink/conf 
目录下的文件进行了sed相关写操作?社区文档中使用的方法是将configmap挂载成本地的flink-conf.yaml 
等文件,而这个挂载的目录其实是不可写的。
直接修改configmap里面的内容,这样挂载时候就会自动更新了。

祝好
唐云

From: SmileSmile 
Sent: Wednesday, July 8, 2020 13:03
To: Flink user-zh mailing list 
Subject: flink 1.11 on kubernetes 构建失败

hi

按照文档[1]的方法部署session cluster on kubernetes,集群构建的时候出现了如下报错


Starting Task Manager
sed: couldn't open temporary file /opt/flink/conf/sedVdyy6Q: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sedcj5VKQ: Read-only file 
system
/docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create 
/opt/flink/conf/flink-conf.yaml: Permission denied
sed: couldn't open temporary file /opt/flink/conf/sedB5eynR: Read-only file 
system
/docker-entrypoint.sh: 120: /docker-entrypoint.sh: cannot create 
/opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
[ERROR] The execution result is empty.
[ERROR] Could not get JVM parameters and dynamic configurations properly.


是否有遇到同样的问题,支个招



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制


Re: Use state problem

2020-07-08 Thread Yun Tang
Hi  Jiazhi

Keyed state is only supported in keyed stream as it needs key selector and key 
serializer to select specific key from the input element, this is correct.
If you dig into Flink code, the keyed state backend would only be created when 
the operator has its own serializer[1].

After 'keyBy' to transform data stream to keyed stream, you can use the keyed 
state in the following operator. And the following operator is not limited to 
only operator with "keyed" prefix.
For ProcessAllWindowFunction, we need to use it with windowAll transformation 
[2] and windowAll actually includes keyBy actions if takeing a look at the 
code[3].
For ProcessWindowFunction, we must use keyBy first and then use next window 
transformation.

Hope this explanation could help you.

[1] 
https://github.com/apache/flink/blob/c23787093a9da0e12561fbe22dd6da6164ffe951/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java#L281-L283
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#datastream-transformations
[3] 
https://github.com/apache/flink/blob/c23787093a9da0e12561fbe22dd6da6164ffe951/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L118

Best
Yun Tang


From: ゞ野蠻遊戲χ 
Sent: Thursday, July 9, 2020 9:58
To: user 
Subject: Use state problem

Deal all

Keyed state (ValueState, ReducingState, ListState, AggregatingState, MapState)
Supported in Keyed Stream, meaning only in KeyedProcessFunction? But in 
practice, I can also use these states in ProcessAllWindowFunction and 
ProcessWindowFunction. Why?

thank you
jiazhi


?????? ?????????? richfunction????????????????????????

2020-07-08 Thread Yichao Yang
Hi,


??


Best,
Yichao Yang




----
??:"hdxg1101300...@163.com"

??????flink????????

2020-07-08 Thread Yichao Yang
Hi,


keyby?? warningPojo + String 
??keybykey??

 timestamp assigner 



Best,
Yichao Yang




----
??:"??"

?????? ?????? DataStream????uv????

2020-07-08 Thread ?g???U?[????
??




----
??:"1193216154"<1193216...@qq.com;
:2020??7??9??(??) 10:25
??:"user-zh"

回复:kafka connector问题

2020-07-08 Thread lujg12


flink时间窗口

2020-07-08 Thread 爱吃鱼
你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下
SingleOutputStreamOperator> operator = 
flatMap.keyBy(0,1)
.timeWindow(Time.minutes(1))
.process(new ProcessWindowFunction)
当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。

回复: 回复:关于 richfunction中初始化数据库连接的问题

2020-07-08 Thread hdxg1101300...@163.com

FlinkKafkaConsumer kafkaConsumer = new 
FlinkKafkaConsumer<>(TrafficConstants.BILLTOPIC,new SchargeConsumerSchema(), 
props);
kafkaConsumer.setStartFromLatest();
SingleOutputStreamOperator process = 
env.addSource(kafkaConsumer).setParallelism(4)
.filter(new HiveFilterFunction(TrafficConstants.HIVEURL, 
TrafficConstants.HIVEUSERNAME, TrafficConstants.HIVEPASSWORD)).name("流量费过滤")
.keyBy((KeySelector) value -> value.getUser_id() + 
value.getSerial_number() + value.getProvince_code())
.process(***);
SingleOutputStreamOperator map = process.map();
map.addSink(new RdsFlowSink(TrafficConstants.URL, TrafficConstants.USERNAME, 
TrafficConstants.PASSWORD))
.setParallelism(1).name("sinkRds");

这是主要逻辑:Kafka取数-->自定义richfilter函数加载hive维表数据来过滤数据-->keyby-->process-->自定义sink函数

public class HiveFilterFunction extends RichFilterFunction {
Logger LOG = LoggerFactory.getLogger(HiveFilterFunction.class);
private final String jdbcUrl;
private final String username;
private final String password;
private transient volatile Statement sts;
private transient volatile Connection connection;
Map map = new ConcurrentHashMap();

public HiveFilterFunction(String jdbcUrl, String username, String password) 
{
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("org.apache.hive.jdbc.HiveDriver");
connection = DriverManager.getConnection(jdbcUrl, username, password);
LOG.info("hive connection --- " + connection);
sts = connection.createStatement();
query();
}

@Override
public boolean filter(Bill value) {
return map.containsKey(value.getIntegrate_item_code())
&& 
TrafficConstants.getProCode().contains(value.getProvince_code());
}

@Override
public void close() throws Exception {
super.close();
assert null != sts ;
assert null != connection ;
sts.close();
connection.close();
}

private void query() throws Exception {
ResultSet resultSet = null;
try {
sts.execute(TrafficConstants.SETSQL);
resultSet = sts.executeQuery(TrafficConstants.CODESQL);
while (resultSet.next()) {
map.put(resultSet.getString("charge_code_cbss"), "");
}
} catch (Exception e) {
LOG.error("hive error", e);
throw new Exception(e);
} finally {
assert resultSet != null;
resultSet.close();
}
LOG.info("hive 维表数据加载完成");
}
}

public class RdsFlowSink extends RichSinkFunction{
Logger LOG = LoggerFactory.getLogger(RdsFlowSink.class);
private final String url;
private final String name;
private final String password;

private transient volatile PreparedStatement insertStatement;
private transient volatile Connection connection;
private transient volatile Counter counter = null;
 
public RdsFlowSink(String url, String name, String password) {
this.url = url;
this.name = name;
this.password = password;
}

@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
connection = DriverManager.getConnection(url,name,password);
LOG.info("connection --- " + connection);
counter = getRuntimeContext().getMetricGroup().counter("counter");
insertStatement = connection.prepareStatement(TrafficConstants.FLOWSQL);
 
}

@Override
public void invoke(BillInfo value, Context context) throws Exception {
try {
insertStatement.setString(1,value.getSerial_number());
insertStatement.setString(2,value.getUser_id());
insertStatement.setString(3,value.getIntegrate_item_code());
insertStatement.setString(4,value.getFee());
insertStatement.setString(5,value.getCity_code());
counter.inc(1);
insertStatement.execute();
  
}catch (Exception e){  
LOG.info("invoke  --- " + connection);
LOG.error(e.getMessage());
throw new Exception(e);
}
}

@Override
public void close() throws Exception {
super.close();
assert insertStatement != null;
assert connection != null;
insertStatement.close();
connection.close();
}
}

执行的时候程序会卡在 Class.forName("org.apache.hive.jdbc.HiveDriver"); 或者 
Class.forName("com.mysql.jdbc.Driver"); 这里


hdxg1101300...@163.com
 
发件人: JasonLee
发送时间: 2020-07-08 18:46
收件人: user-zh
主题: 回复:关于 richfunction中初始化数据库连接的问题
hi
具体是卡在什么地方了呢?可以打印日志定位一下 理论上是不会有这样的问题 还有单个执行的话可以吗?
 
 
| |
JasonLee
|
|
邮箱:17610775...@163.com
|
 
Signature is customized by Netease Mail Master
 
在2020年07月08日 18:32,hdxg1101300...@163.com 写道:
您好:
   

kafka connector????

2020-07-08 Thread op
kafka tablescan.startup.modeCREATE TABLE kafkaTable ( 
user_id BIGINT,  item_id BIGINT,  category_id BIGINT,  behavior STRING,  ts 
TIMESTAMP(3) ) WITH (  'connector' = 'kafka',  'topic' = 'user_behavior',  
'properties.bootstrap.servers' = 'localhost:9092',  'properties.group.id' = 
'testGroup',  'format' = 'csv',  'scan.startup.mode' = 'earliest-offset' 
)??'earliest-offset','latest-offset','group-offsets','timestamp'and'specific-offsets'??group-offsets??offset??kafka
 broker??savepoint??offset

Re: How to ensure that job is restored from savepoint when using Flink SQL

2020-07-08 Thread shadowell
Hi Fabian,




Thanks for your reply, it helps a lot.




Best Regards,

Jie



| |
shadowell
|
|
shadow...@126.com
|
签名由网易邮箱大师定制
On 7/8/2020 18:17,Fabian Hueske wrote:
Hi Jie,


The auto-ID generation is not done by the SQL translation component but on a 
lower level, i.e., it's independent of Flink's SQL translation.

The ID generation only depends on the topology / graph structure of the 
program's operators.

The ID of an operator depends on the IDs of its predecessors (and not on its 
own processing logic or operator name).


So, as long as the operator graph structure of a program remains the same, it 
will be compatible with an earlier savepoint.
However, preserving the operator graph structure is out of the user's control.

The operator graph is automatically generated by the SQL optimizer and slight 
changes of a query can result in a different graph while other changes do not 
affect the structure.


In your example, the graph structure should remain the same because there is 
already a Filter operator (due to "where id == '001'") in the first query and 
the second query just extends the filter predicate ("id == '001' and age >= 
'28'").
If there was no WHERE clause in the first query, the plan might have been 
changed.
In order to reason about which query changes are savepoint compatible, you need 
in-depth knowledge about the optimizer's translation process.


I would not rely on being able to start a query from a savepoint of a 
(slightly) modified query.

First because it is very fragile given the query translation process and second 
because it results in incorrect results.


Given your example query, I would start it from scratch and add a predicate to 
continue after the latest result of the previous query:


select id, name, sum(salary) from user_info where id == '001' and age >= '28' 
and rowtime >= 'xxx' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;



If the last result of the first query was for '2020-07-07' I would set xxx to 
'2020-07-08-00:00:00.000'.
Of course this only works for queries with hard temporary boundaries, but it 
gives correct results.


Best, Fabian


Am Mi., 8. Juli 2020 um 04:50 Uhr schrieb shadowell :



Hi Fabian,


Thanks for your information!
Actually, I am not clear about the mechanism of auto-generated IDs in Flink SQL 
and the mechanism of how does the operator state mapping back from savepoint.
I hope to get some detail information by giving an example bellow.


I have two sql as samples:
old sql : select id, name, sum(salary) from user_info where id == '001' group 
by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;
new sql:   select id, name, sum(salary) from user_info where id == '001' and 
age >= '28' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name; 
I just add some age limitation in new SQL. Now, I want to switch the job from 
old one to the new one by trigger a savepoint. Flink will generate operator IDs 
for operators in new SQL.
In this case, just from a technical point of view,  the operator IDs in the 
savepoint of the old SQL job can match the operator IDs in the new SQL job?
My understanding is that Flink will reorder the operators and generate new IDs 
for operators. The new IDs may not match the old IDs. 
This will cause some states failed to be mapped back from the old job 
savepoint, which naturally leads to inaccurate calculation results.
I wonder if my understanding is correct.


Thanks~ 
Jie


| |
shadowell
|
|
shadow...@126.com
|
签名由网易邮箱大师定制
On 7/7/2020 17:23,Fabian Hueske wrote:
Hi Jie Feng,


As you said, Flink translates SQL queries into streaming programs with 
auto-generated operator IDs.
In order to start a SQL query from a savepoint, the operator IDs in the 
savepoint must match the IDs in the newly translated program.
Right now this can only be guaranteed if you translate the same query with the 
same Flink version (optimizer changes might change the structure of the 
resulting plan even if the query is the same).
This is of course a significant limitation, that the community is aware of and 
planning to improve in the future.


I'd also like to add that it can be very difficult to assess whether it is 
meaningful to start a query from a savepoint that was generated with a 
different query.
A savepoint holds intermediate data that is needed to compute the result of a 
query.
If you update a query it is very well possible that the result computed by 
Flink won't be equal to the actual result of the new query.



Best, Fabian



Am Mo., 6. Juli 2020 um 10:50 Uhr schrieb shadowell :



Hello, everyone,
I have some unclear points when using Flink SQL. I hope to get an 
answer or tell me where I can find the answer.
When using the DataStream API, in order to ensure that the job can 
recover the state from savepoint after adjustment, it is necessary to specify 
the uid for the operator. However, when using Flink SQL, the uid of the 
operator is automatically generated. If the SQL logic changes (operator 

Re: Re: Flink 多Sink 数据一致性保证

2020-07-08 Thread jindy_liu
请问下,你这个最后是怎么做到的,能share下源码吗?
是需要将两个sink合并到一个sink里,然后再实现下二阶段提交吗?
我也遇到个多sink的原子性场景。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复: DataStream统计uv问题

2020-07-08 Thread Yun Gao
逻辑上这么说也没有问题;可以认为它是先把收到的记录(根据key和time)分配到特定的窗口中,然后不同的窗口的状态和计算都是独立的



 --Original Mail --
Sender:ゞ野蠻遊戲χ 
Send Date:Thu Jul 9 10:16:06 2020
Recipients:user-zh 
Subject:回复: DataStream统计uv问题
意思是当我使用滚动窗口之后,在第一个滚动窗口中的state自动会被清除,第二个滚动窗口进来之后,获取相同的Descriptor,里面的值是null?


--原始邮件--
发件人:"Yun Gao"

?????? DataStream????uv????

2020-07-08 Thread ?g???U?[????
stateDescriptornull??


----
??:"Yun Gao"

Re: 代码中如何取消正在运行的Flink Streaming作业

2020-07-08 Thread godfrey he
可以通过 StreamExecutionEnvironment#executeAsync 提交作业,返回 JobClient [1], 通过
JobClient 可以 cancel 作业,获取 job status。

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API

Best,
Godfrey

Evan  于2020年7月9日周四 上午9:40写道:

> 这个问题之前看到过有人在问,但是没有看到答案,我想问一下,Flink Streaming
> API有没有提供类似的接口,调用后就能停止这个Stream作业呢?


Re: DataStream统计uv问题

2020-07-08 Thread Yun Gao

您好:
   1. 这个应该是预期内的用法,如果执行起来没有遇到问题应该是Ok的。
   2. 
窗口的状态会在超过最大时间+最大允许的延迟时间之后被清理。--
Sender:ゞ野蠻遊戲χ
Date:2020/07/07 22:32:51
Recipient:user-zh
Theme:DataStream统计uv问题

大家好!


  想问下,现在在用DataStream的api来统计每天的UV,代码如下,有2个使用问题:
  
1、在使用Tumbling窗口的时候,由于使用窗口跨度是1天(Time.days(1)),只有以一天结束的时候,才能输出一个uv值,
 
这样时间等待太长了,所以加了一个trigger,每来一条都触发一次窗口,不知道这样的用法没有问题。
  2、还有想问下在窗口结束后,里面的state状态会自动释放吗?还是要自己手动设置TTL的。


DataStream

Use state problem

2020-07-08 Thread ?g???U?[????
Deal all


Keyed state (ValueState, ReducingState, ListState, AggregatingState, MapState)
Supported in Keyed Stream, meaning only in KeyedProcessFunction? But in 
practice, I can also use these states in ProcessAllWindowFunction and 
ProcessWindowFunction. Why?


thank you
jiazhi

?????? Flink SQL??????????????????????(??????)????????????

2020-07-08 Thread cs
join




----
??:"godfrey he"

代码中如何取消正在运行的Flink Streaming作业

2020-07-08 Thread Evan
这个问题之前看到过有人在问,但是没有看到答案,我想问一下,Flink Streaming API有没有提供类似的接口,调用后就能停止这个Stream作业呢?

Re: Limiting metrics logs to custom metric

2020-07-08 Thread Chesnay Schepler
Not really; but essentially you have to override 
SLF4JReporter#notifyOfAddedMetric and filter the metrics you're 
interested in. Then build the flink-metrics-slf4j module, and replace 
the corresponding jar in your distribution.


On 08/07/2020 18:20, Manish G wrote:

Ok.Any resource on same?


On Wed, Jul 8, 2020, 9:38 PM Chesnay Schepler > wrote:


There's no built-in functionality for this. You could customize the
reporter though.

On 08/07/2020 17:19, Manish G wrote:
> Hi,
>
> I have added a Meter in my code and pushing it to app logs using
slf4j
> reporter.
>
> I observe that apart from my custometrics, lots of other metrics
like
> gauge, histogram etc is also published. It makes it difficult to
> filter out data for generating splunk graphs.
>
> Is there a way to limit published metrics to just the custom one?
>
> With regards






回复: 一个source多个sink的同步问题

2020-07-08 Thread Sun.Zhu


窗口的触发逻辑就是这样的,必须watermark达到了窗口结束时间才会触发,可能10-11点的窗口中的数据最大只有10:59呢
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年07月7日 18:44,lgs<9925...@qq.com> 写道:
是1个小时才到来。10:00- 11:00的数据,11:01分到来。

但是现在的问题是这个数据来了,我的第一个sink马上就保存到数据库了, 11:02进数据库。但是第二个sink,因为有tumble
window,所以10:00- 11:00的数据,需要到12:01,才会触发这个窗口。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Limiting metrics logs to custom metric

2020-07-08 Thread Manish G
Ok.Any resource on same?


On Wed, Jul 8, 2020, 9:38 PM Chesnay Schepler  wrote:

> There's no built-in functionality for this. You could customize the
> reporter though.
>
> On 08/07/2020 17:19, Manish G wrote:
> > Hi,
> >
> > I have added a Meter in my code and pushing it to app logs using slf4j
> > reporter.
> >
> > I observe that apart from my custometrics, lots of other metrics like
> > gauge, histogram etc is also published. It makes it difficult to
> > filter out data for generating splunk graphs.
> >
> > Is there a way to limit published metrics to just the custom one?
> >
> > With regards
>
>
>


Re: Limiting metrics logs to custom metric

2020-07-08 Thread Chesnay Schepler
There's no built-in functionality for this. You could customize the 
reporter though.


On 08/07/2020 17:19, Manish G wrote:

Hi,

I have added a Meter in my code and pushing it to app logs using slf4j 
reporter.


I observe that apart from my custometrics, lots of other metrics like 
gauge, histogram etc is also published. It makes it difficult to 
filter out data for generating splunk graphs.


Is there a way to limit published metrics to just the custom one?

With regards





Limiting metrics logs to custom metric

2020-07-08 Thread Manish G
Hi,

I have added a Meter in my code and pushing it to app logs using slf4j
reporter.

I observe that apart from my custometrics, lots of other metrics like
gauge, histogram etc is also published. It makes it difficult to filter out
data for generating splunk graphs.

Is there a way to limit published metrics to just the custom one?

With regards


Re: Flink SQL如何将多个表的查询结果(列不同)聚合成一张表

2020-07-08 Thread godfrey he
select a.table_tmp1.r1 / a.table_tmp2.r2
这个是对同一行的数据进行操作,所以你需要先对table_tmp1和table_tmp2做一个join,将两个表的数据根据条件合并成一张表。


zilong xiao  于2020年7月8日周三 下午8:55写道:

> 列如下面这样,需要查询table1 & table2,分别查询不同的字段
> 在最外层做比值,flink貌似语法检查不通过,应该怎么写这样的SQL呢,有前辈可以指导下不~
> select a.table_tmp1.r1 / a.table_tmp2.r2 as value0 from
> (
> (SELECT r1 FROM table1) AS table_tmp1, (SELECT r2 FROM table2) AS
> table_tmp2,
> )as a
>


Flink SQL如何将多个表的查询结果(列不同)聚合成一张表

2020-07-08 Thread zilong xiao
列如下面这样,需要查询table1 & table2,分别查询不同的字段
在最外层做比值,flink貌似语法检查不通过,应该怎么写这样的SQL呢,有前辈可以指导下不~
select a.table_tmp1.r1 / a.table_tmp2.r2 as value0 from
(
(SELECT r1 FROM table1) AS table_tmp1, (SELECT r2 FROM table2) AS
table_tmp2,
)as a


Re: Flink SQL复杂JSON解析

2020-07-08 Thread Benchao Li
看代码应该是支持复合类型的,你可以试下。

hua mulan  于2020年7月8日周三 下午8:34写道:

> 我试了下 Array里是基本元素可以CROSS JOIN
> UNNEST直接解开。如果Array里是Row、POJO、Tuple这种复合类型我就只能UDTF了是吧。
>
> 来自 Outlook
>
> 
> 发件人: Benchao Li 
> 发送时间: 2020年7月6日 22:35
> 收件人: user-zh 
> 主题: Re: Flink SQL复杂JSON解析
>
> 我理解最佳实践是第一种,先读出来array,再用table function展开成多行。
> 实际上把array转成多行是Flink 内置支持的,可以参考[1]的”Expanding arrays into a relation“部分
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins
>
> 王 outlook  于2020年7月6日周一 下午9:29写道:
>
> > 像如下这种JSON输入,
> >
> > {
> >   "id": 1,
> >   "many_names": [
> > {"name": "foo"},
> > {"name": "bar"}
> >   ]
> > }
> >
> > 输出表两行  id 1, name foo  |  id 1, name bar
> >
> > 最佳实践是从Kafka读到后,调用TableFunction这个UDTF转出多行?
> > 还是Flink SQL有更方便的操作,从Source读出来就能把数组展开。
> >
> >
> > 来自 Outlook
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 

Best,
Benchao Li


Re: Manual allocation of slot usage

2020-07-08 Thread Mu Kong
Hi Song, Guo,

Thanks for the information.
I will first upgrade our flink cluster to 1.10.0 and try again.
Currently, we are encountering some dependency conflict issue, possibly
with tranquility. But that is another issue.

For your information, (also as I described in the previous email)
*What Flink deployment are you using? (Standalone/K8s/Yarn/Mesos): *we are
running a standalone cluster with version 1.9.0.
*How many times have you tried with and without
`cluster.evenly-spread-out-slots`? *Almost all the time. This is the first
time we tried it. The behavior before we changed the config, is that a
great amount of subtasks of the source (11 subtasks) were allocated in one
task manager, and the rest of the subtasks for that source we were spread
unevenly to all rest task managers. After changing the configuration, the
subtasks of this source took all the slots on 4 of our task managers, which
was more "skewed" than before.
*How many TMs do you have? And how many slots does each TM has? *We have 15
task manager with 15 slots on each.

I will try to reproduce this tomorrow(JST) when I have time.

Best regards,
Mu

On Wed, Jul 8, 2020 at 11:01 AM Xintong Song  wrote:

> Hi Mu,
> Regarding your questions.
>
>- The feature `spread out tasks evenly across task managers` is
>introduced in Flink 1.10.0, and backported to Flink 1.9.2, per the JIRA
>ticket [1]. That means if you configure this option in Flink 1.9.0, it
>should not take any effect.
>- Please be aware that this feature ATM only works for standalone
>deployment (including standalone Kubernetes deployment). For the native
>Kubernetes, Yarn and Mesos deployment, it is a known issue that this
>feature does not work as expected.
>- Regarding the scheduling behavior changes, we would need more
>information to explain this. To provide the information needed, the easiest
>way is probably to provide the jobmanager log files, if you're okay with
>sharing them. If you cannot share the logs, then it would be better to
>answer the following questions
>   - What Flink deployment are you using? (Standalone/K8s/Yarn/Mesos)
>   - How many times have you tried with and without
>   `cluster.evenly-spread-out-slots`? In other words, the described 
> behaviors
>   before and after setting `cluster.evenly-spread-out-slots`, can they be
>   stably reproduced?
>   - How many TMs do you have? And how many slots does each TM has?
>
>
> Thank you~
>
> Xintong Song
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-12122
>
> On Tue, Jul 7, 2020 at 8:33 PM Mu Kong  wrote:
>
>> Hi, Guo,
>>
>> Thanks for helping out.
>>
>> My application has a kafka source with 60 subtasks(parallelism), and we
>> have 15 task managers with 15 slots on each.
>>
>> *Before I applied the cluster.evenly-spread-out-slots,* meaning it is
>> set to default false, the operator 'kafka source" has 11 subtasks allocated
>> in one single task manager,
>> while the remaining 49 subtasks of "kafka source" distributed to the
>> remaining 14 task managers.
>>
>> *After I set cluster.evenly-spread-out-slots to true*, the 60 subtasks
>> of "kafka source" were allocated to only 4 task managers, and they took 15
>> slots on each of these 4 TMs.
>>
>> What I thought is that this config will make the subtasks of one operator
>> more evenly spread among the task managers, but it seems it made them
>> allocated in the same task manager as much as possible.
>>
>> The version I'm deploying is 1.9.0.
>>
>> Best regards,
>> Mu
>>
>> On Tue, Jul 7, 2020 at 7:10 PM Yangze Guo  wrote:
>>
>>> Hi, Mu,
>>>
>>> IIUC, cluster.evenly-spread-out-slots would fulfill your demand. Why
>>> do you think it does the opposite of what you want. Do you run your
>>> job in active mode? If so, cluster.evenly-spread-out-slots might not
>>> work very well because there could be insufficient task managers when
>>> request slot from ResourceManager. This has been discussed in
>>> https://issues.apache.org/jira/browse/FLINK-12122 .
>>>
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Tue, Jul 7, 2020 at 5:44 PM Mu Kong  wrote:
>>> >
>>> > Hi community,
>>> >
>>> > I'm running an application to consume data from kafka, and process it
>>> then put data to the druid.
>>> > I wonder if there is a way where I can allocate the data source
>>> consuming process evenly across the task manager to maximize the usage of
>>> the network of task managers.
>>> >
>>> > So, for example, I have 15 task managers and I set parallelism for the
>>> kafka source as 60, since I have 60 partitions in kafka topic.
>>> > What I want is flink cluster will put 4 kafka source subtasks on each
>>> task manager.
>>> >
>>> > Is that possible? I have gone through the document, the only thing we
>>> found is
>>> >
>>> > cluster.evenly-spread-out-slots
>>> >
>>> > which does exact the opposite of what I want. It will put the subtasks
>>> of the same operator onto one task manager as much as possible.
>>> >
>>> > 

回复: Flink SQL复杂JSON解析

2020-07-08 Thread hua mulan
我试了下 Array里是基本元素可以CROSS JOIN 
UNNEST直接解开。如果Array里是Row、POJO、Tuple这种复合类型我就只能UDTF了是吧。

来自 Outlook


发件人: Benchao Li 
发送时间: 2020年7月6日 22:35
收件人: user-zh 
主题: Re: Flink SQL复杂JSON解析

我理解最佳实践是第一种,先读出来array,再用table function展开成多行。
实际上把array转成多行是Flink 内置支持的,可以参考[1]的”Expanding arrays into a relation“部分

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins

王 outlook  于2020年7月6日周一 下午9:29写道:

> 像如下这种JSON输入,
>
> {
>   "id": 1,
>   "many_names": [
> {"name": "foo"},
> {"name": "bar"}
>   ]
> }
>
> 输出表两行  id 1, name foo  |  id 1, name bar
>
> 最佳实践是从Kafka读到后,调用TableFunction这个UDTF转出多行?
> 还是Flink SQL有更方便的操作,从Source读出来就能把数组展开。
>
>
> 来自 Outlook
>


--

Best,
Benchao Li


Savepoint fails due to RocksDB 2GiB limit

2020-07-08 Thread Ori Popowski
I've asked this question in https://issues.apache.org/jira/browse/FLINK-9268
but it's been inactive for two years so I'm not sure it will be visible.

While creating a savepoint I get a org.apache.flink.util.SerializedThrowable:
java.lang.NegativeArraySizeException. It's happening because some of my
windows have a keyed state of more than 2GiB, hitting RocksDB memory limit.

How can I prevent this?

As I understand it, I need somehow to limit the accumulated size of the
window I'm using, which is EventTimeWindow. However, I have no way of doing
so, because the WindowOperator manages its state on its own.

Below is a full stack trace.

org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint
139 for operator Window(EventTimeSessionWindows(180), EventTimeTrigger,
ScalaProcessWindowFunctionWrapper) -> Flat Map -> Sink: Unnamed (23/189).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable:
java.lang.NegativeArraySizeException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
... 3 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: null
at org.rocksdb.RocksIterator.value0(Native Method)
at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
at
org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:102)
at
org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:168)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:366)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
... 5 common frames omitted


Re: flink state使用

2020-07-08 Thread Congxian Qiu
State 可以简单理解为一个 HashMap,Key 是 curretnly key(也就是 keyby 的 key)),value 是 state
存的值(可以是 value,list,map 等)

所有 state 的读写都有一个 currently,只能读到 currently key 对应的值。

在同一个 operator 中,同一个 key 能访问到之前存储过的 state 值,但是不能读取到其他 key 对应的值

Best,
Congxian


Robert.Zhang <173603...@qq.com> 于2020年7月8日周三 下午3:58写道:

>
> 那就是在说,在iterativestream中,这个state可以正确的在每一次的迭代operator中传递并读取,那么结束迭代之后,假设得到的结果stream再进行keyby操作,因为operator不是同一个,此时就无法读取到前述迭代过程中的state,即便是同一个key
> 这样的理解对吗?
>
>
> Best Regards
>
>
>
>
>
> --原始邮件--
> 发件人: "Congxian Qiu" 发送时间: 2020年7月8日(星期三) 下午3:48
> 收件人: "user-zh" 主题: Re: flink state使用
>
>
>
> Hi
>
> KeyedState 的操作实际会针对当前的 key,也就是 keyBy 之后得到的 key,但是这个 key 用户看不到。在一个 operator
> 中 state 的数目是你创建的数目,但是每个 state 可以有多个 KV 对,其中 K 是 keyby 的 key,V 可以是
> value,list,map 等。同一个 operator 上不同的 key 的 state 保证能够正确读写的。
>
> Best,
> Congxian
>
>
> Robert.Zhang <173603...@qq.com 于2020年7月8日周三 下午3:41写道:
>
>  Hello,all
>  目前遇到一个关于state的使用问题
>  nbsp; nbsp; nbsp; DataStream Stringgt;gt; result =
>  iteration.closeWith(
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;
>  iteration.keyBy(0).process(new RichMapFunc());
> 
> 
> 
>  我在richmapfunc里使用了state,那么每一次迭代的时候,这个state还是同一个吗?
>  如果我对result这个结果stream继续使用了keyby(0)的话,state也是同一个么?
>  此外,如果我对一个stream进行keyby之后,key一直是不变的,
> 
> 但是map等operator之后返回的并非是keyedstream(operator不对key进行操作),是否只能继续keyby,还是有更好的方式


Re: State里面用guava Cache

2020-07-08 Thread Congxian Qiu
State 也可以理解为一个 cache,如果打算自己维护 cache(比如使用 guava cache)的话,那么就可以不使用
State,当然如果你还希望做状态容错的话,也可以使用类似 YiChao 的方法,积攒一批数据然后一起更新 State

Best,
Congxian


op <520075...@qq.com> 于2020年7月8日周三 下午6:50写道:

> 谢谢,
>  这是我在本地把并行度都设置成1测试的,先不管了。。。
>  这个roaringbitmap也是保存到state中吗?我后面试试
>  开窗口这个可以,十分感谢
>
>
>
>
> --原始邮件--
> 发件人:"Yichao Yang"<1048262...@qq.com;
> 发送时间:2020年7月8日(星期三) 晚上6:45
> 收件人:"user-zh"
> 主题:回复: State里面用guava Cache
>
>
>
> Hi,
> 每次cache的长度都是一有没有可能并发比较大,每一个1都是不同的算子输出的。
>
>
> 你的场景我们实践中的方法是按照用户id
> keyby之后再做localcache,并且如果用户id是long类型的话,localcache可以使用roaringbitmap,效率会比单纯的cache效率更好,占用内存更小。
>
>
> 并且频繁update state在资源有限的情况下是会有性能瓶颈的,这种场景下建议开窗口,窗口结束时update一次state即可。
>
>
> Best,
> Yichao Yang
>
>
> -- 原始邮件 --
> 发件人: user-zh-return-5056-1048262223=qq.com <520075...@qq.comgt;
> 发送时间: 2020年7月8日 18:09
> 收件人: user-zh  主题: 回复: State里面用guava Cache
>
>
>
> 比如数据里来了一个id我需要去判断这个id是新的还是已经存在的,由于历史数据量比较大,所以放全部state里面不太好。
> 把最近活跃的id放到ValueState[Cache]里面,可以在内存里关联到绝大部分的id,避免频繁访问外部存储。
> 如果不使用state保存的的话,重启作业后cache会重置,这段时间通过外部存储去关联id会很慢
> amp;nbsp;谢谢
>
>
> --amp;nbsp;原始邮件amp;nbsp;--
> 发件人:amp;nbsp;"Congxian Qiu" 发送时间:amp;nbsp;2020年7月8日(星期三) 下午5:52
> 收件人:amp;nbsp;"user-zh"
> 主题:amp;nbsp;Re: State里面用guava Cache
>
>
>
> 我尝试理解一下你的需求:
> 你希望从外部存储同步一些信息,由于访问外部存储效率不高,所以希望加一个 cache,然后 cache
> 中的数据希望在一定时间后过期,过期后重新去外部存储同步一次信息。
>
> 但是还有一些信息不太明白,那这里你打算在什么地方使用 state 呢?state 存放什么数据呢?或者说,你自己维护这个状态之后,为什么还有使用
> state 呢?
>
> 不管怎么说使用 Flink 之后,还是建议尽量使用 state,而不是使用外存,flink 提供的 state 方便做一些容错处理。
>
> Best,
> Congxian
>
>
> op <520075...@qq.comamp;gt; 于2020年7月8日周三 下午4:07写道:
>
> amp;gt;
> amp;gt;
> 您好,是这样的,我想再程序里面关联一些用户id,使用cache缓存一些热数据,设置每个id写入多久后自动清理掉,关联的时候首先访问缓存,访问不到再去访问外部存储;
> amp;gt; 业务中的key会一直出现,也就是说ttl可能不会生效,这样没办法使用state ttl对吧?
> amp;gt;
> amp;gt;
> amp;gt;
> amp;gt;
> amp;gt;
> --amp;amp;nbsp;原始邮件amp;amp;nbsp;--
> amp;gt; 发件人:amp;amp;nbsp;"Congxian Qiu" amp;amp;gt;;
> amp;gt; 发送时间:amp;amp;nbsp;2020年7月8日(星期三) 下午3:56
> amp;gt; 收件人:amp;amp;nbsp;"user-zh" amp;amp;gt;;
> amp;gt;
> amp;gt; 主题:amp;amp;nbsp;Re: State里面用guava Cache
> amp;gt;
> amp;gt;
> amp;gt;
> amp;gt; TTL state[1] 满足你的需求吗? 如果不满足的话,能否描述下你的需求呢?
> amp;gt;
> amp;gt; [1]
> amp;gt;
> amp;gt;
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
> amp;gt
> ;
> Best,
> amp;gt; Congxian
> amp;gt;
> amp;gt;
> amp;gt; op <520075...@qq.comamp;amp;gt; 于2020年7月8日周三 下午3:53写道:
> amp;gt;
> amp;gt; amp;amp;gt; 您好,我主要是觉得Cache的自动过期比较好用
> amp;gt; amp;amp;gt;
> amp;gt; amp;amp;gt;
> amp;gt; amp;amp;gt;
> --amp;amp;amp;nbsp;原始邮件amp;amp;amp;nbsp;--
> amp;gt; amp;amp;gt; 发件人:amp;amp;amp;nbsp;"Congxian Qiu"<
> qcx978132...@gmail.comamp;amp;amp;gt;;
> amp;gt; amp;amp;gt; 发送时间:amp;amp;amp;nbsp;2020年7月8日(星期三)
> 下午3:50
> amp;gt; amp;amp;gt; 收件人:amp;amp;amp;nbsp;"user-zh"<
> user-zh@flink.apache.orgamp;amp;amp;gt;;
> amp;gt; amp;amp;gt;
> amp;gt; amp;amp;gt; 主题:amp;amp;amp;nbsp;Re: State里面用guava
> Cache
> amp;gt; amp;amp;gt;
> amp;gt; amp;amp;gt;
> amp;gt; amp;amp;gt;
> amp;gt; amp;amp;gt; 你好,为什么需要在 State 里面再用 cache 呢?单纯的 State
> 不能满足需求吗?需求是什么呢?
> amp;gt; amp;amp;gt; 另外,除了 ValueState,其他的 ListState/MapState
> 能否满足你的需求呢?
> amp;gt; amp;amp;gt;
> amp;gt; amp;amp;gt; Best,
> amp;gt; amp;amp;gt; Congxian
> amp;gt; amp;amp;gt;
> amp;gt; amp;amp;gt;
> amp;gt; amp;amp;gt; op <520075...@qq.comamp;amp;amp;gt;
> 于2020年7月8日周三 上午10:31写道:
> amp;gt; amp;amp;gt;
> amp;gt; amp;amp;gt; amp;amp;amp;gt; 大家好,我想使用一个
> ValueState[Cache]的状态,但是发现这个状态的value 没办法更新,
> amp;gt; amp;amp;gt; amp;amp;amp;gt;
> amp;gt; amp;amp;gt; amp;amp;amp;gt;
> amp;gt; amp;amp;gt; amp;amp;amp;gt;
> amp;gt;
> 比如我在map里面每次往cache里面put一个字符串,然后update这个state,输出cache的长度,为什么每次输出长度都是1


?????? State??????guava Cache

2020-07-08 Thread op
??
 1??
 roaringbitmap??state
 




----
??:"Yichao Yang"<1048262...@qq.com;
:2020??7??8??(??) 6:45
??:"user-zh"https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
amp;gt; Best,
amp;gt; Congxian
amp;gt;
amp;gt;
amp;gt; op <520075...@qq.comamp;amp;gt; ??2020??7??8?? 
3:53??
amp;gt;
amp;gt; amp;amp;gt; ??Cache??
amp;gt; amp;amp;gt;
amp;gt; amp;amp;gt;
amp;gt; amp;amp;gt; 
--amp;amp;amp;nbsp;amp;amp;amp;nbsp;--
amp;gt; amp;amp;gt; ??:amp;amp;amp;nbsp;"Congxian 
Qiu"

回复:关于 richfunction中初始化数据库连接的问题

2020-07-08 Thread JasonLee
hi
具体是卡在什么地方了呢?可以打印日志定位一下 理论上是不会有这样的问题 还有单个执行的话可以吗?


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月08日 18:32,hdxg1101300...@163.com 写道:
您好:
   我使用flink1.10.1版本streamapi编写程序时,在不同的richfunction中 
分别使用Class.forName("*"); 来加载数据库驱动。是不同的两个数据库驱动;这样会导致程序卡住不往下执行;有人遇到吗?


hdxg1101300...@163.com


回复: State里面用guava Cache

2020-07-08 Thread Yichao Yang
Hi,
每次cache的长度都是一有没有可能并发比较大,每一个1都是不同的算子输出的。


你的场景我们实践中的方法是按照用户id 
keyby之后再做localcache,并且如果用户id是long类型的话,localcache可以使用roaringbitmap,效率会比单纯的cache效率更好,占用内存更小。


并且频繁update state在资源有限的情况下是会有性能瓶颈的,这种场景下建议开窗口,窗口结束时update一次state即可。


Best,
Yichao Yang


-- 原始邮件 --
发件人: user-zh-return-5056-1048262223=qq.com <520075...@qq.com
发送时间: 2020年7月8日 18:09
收件人: user-zh https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
gt; Best,
gt; Congxian
gt;
gt;
gt; op <520075...@qq.comamp;gt; 于2020年7月8日周三 下午3:53写道:
gt;
gt; amp;gt; 您好,我主要是觉得Cache的自动过期比较好用
gt; amp;gt;
gt; amp;gt;
gt; amp;gt; 
--amp;amp;nbsp;原始邮件amp;amp;nbsp;--
gt; amp;gt; 发件人:amp;amp;nbsp;"Congxian 
Qiu"

回复:关于 richfunction中初始化数据库连接的问题

2020-07-08 Thread Yichao Yang
Hi,


是执行到哪步出现了问题?可以提供下面一些内容来帮忙定位问题吗?
1.截图或者日志
2.不同的数据库都是哪些数据库,以及版本是哪些
3.单写一个测试用例加载两个数据库是否能够加载成功
4.代码伪编码


Best,
Yichao Yang


-- 原始邮件 --
发件人: hdxg1101300...@163.com 

关于 richfunction中初始化数据库连接的问题

2020-07-08 Thread hdxg1101300...@163.com
您好:
我使用flink1.10.1版本streamapi编写程序时,在不同的richfunction中 
分别使用Class.forName("*"); 来加载数据库驱动。是不同的两个数据库驱动;这样会导致程序卡住不往下执行;有人遇到吗?


hdxg1101300...@163.com


Re: How to ensure that job is restored from savepoint when using Flink SQL

2020-07-08 Thread Fabian Hueske
Hi Jie,

The auto-ID generation is not done by the SQL translation component but on
a lower level, i.e., it's independent of Flink's SQL translation.
The ID generation only depends on the topology / graph structure of the
program's operators.
The ID of an operator depends on the IDs of its predecessors (and not on
its own processing logic or operator name).

So, as long as the operator graph structure of a program remains the same,
it will be compatible with an earlier savepoint.
However, preserving the operator graph structure is out of the user's
control.
The operator graph is automatically generated by the SQL optimizer and
slight changes of a query can result in a different graph while other
changes do not affect the structure.

In your example, the graph structure should remain the same because there
is already a Filter operator (due to "where id == '001'") in the first
query and the second query just extends the filter predicate ("id == '001'
and age >= '28'").
If there was no WHERE clause in the first query, the plan might have been
changed.
In order to reason about which query changes are savepoint compatible, you
need in-depth knowledge about the optimizer's translation process.

I would not rely on being able to start a query from a savepoint of a
(slightly) modified query.
First because it is very fragile given the query translation process and
second because it results in incorrect results.

Given your example query, I would start it from scratch and add a predicate
to continue after the latest result of the previous query:

select id, name, sum(salary) from user_info where id == '001' and age >=
'28' and rowtime >= 'xxx' group by TUMBLE(rowtime, INTERVAL '1' DAY), id,
name;

If the last result of the first query was for '2020-07-07' I would set xxx
to '2020-07-08-00:00:00.000'.
Of course this only works for queries with hard temporary boundaries, but
it gives correct results.

Best, Fabian

Am Mi., 8. Juli 2020 um 04:50 Uhr schrieb shadowell :

>
> Hi Fabian,
>
> Thanks for your information!
> Actually, I am not clear about the mechanism of auto-generated IDs in
> Flink SQL and the mechanism of how does the operator state mapping back
> from savepoint.
> I hope to get some detail information by giving an example bellow.
>
> I have two sql as samples:
> old sql : select id, name, sum(salary) from user_info where id == '001'
> group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;
> new sql:   select id, name, sum(salary) from user_info where id == '001'
> and age >= '28' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;
> I just add some age limitation in new SQL. Now, I want to switch the job
> from old one to the new one by trigger a savepoint. Flink will generate
> operator IDs for operators in new SQL.
> In this case, just from a technical point of view,  the operator IDs in
> the savepoint of the old SQL job can match the operator IDs in the new SQL
> job?
> My understanding is that Flink will reorder the operators and generate new
> IDs for operators. The new IDs may not match the old IDs.
> This will cause some states failed to be mapped back from the old job
> savepoint, which naturally leads to inaccurate calculation results.
> I wonder if my understanding is correct.
>
> Thanks~
> Jie
>
> shadowell
> shadow...@126.com
>
> 
> 签名由 网易邮箱大师  定制
> On 7/7/2020 17:23,Fabian Hueske 
> wrote:
>
> Hi Jie Feng,
>
> As you said, Flink translates SQL queries into streaming programs with
> auto-generated operator IDs.
> In order to start a SQL query from a savepoint, the operator IDs in the
> savepoint must match the IDs in the newly translated program.
> Right now this can only be guaranteed if you translate the same query with
> the same Flink version (optimizer changes might change the structure of the
> resulting plan even if the query is the same).
> This is of course a significant limitation, that the community is aware of
> and planning to improve in the future.
>
> I'd also like to add that it can be very difficult to assess whether it is
> meaningful to start a query from a savepoint that was generated with a
> different query.
> A savepoint holds intermediate data that is needed to compute the result
> of a query.
> If you update a query it is very well possible that the result computed by
> Flink won't be equal to the actual result of the new query.
>
> Best, Fabian
>
> Am Mo., 6. Juli 2020 um 10:50 Uhr schrieb shadowell :
>
>>
>> Hello, everyone,
>> I have some unclear points when using Flink SQL. I hope to get an
>> answer or tell me where I can find the answer.
>> When using the DataStream API, in order to ensure that the job
>> can recover the state from savepoint after adjustment, it is necessary to
>> specify 

?????? State??????guava Cache

2020-07-08 Thread op
??idid??state
idValueState[Cache]??id
??state??cache??id??



----
??:"Congxian Qiu"https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
 Best,
 Congxian


 op <520075...@qq.comgt; ??2020??7??8?? 3:53??

 gt; ??Cache??
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:amp;nbsp;"Congxian 
Qiu"

Re: State里面用guava Cache

2020-07-08 Thread Congxian Qiu
我尝试理解一下你的需求:
你希望从外部存储同步一些信息,由于访问外部存储效率不高,所以希望加一个 cache,然后 cache
中的数据希望在一定时间后过期,过期后重新去外部存储同步一次信息。

但是还有一些信息不太明白,那这里你打算在什么地方使用 state 呢?state 存放什么数据呢?或者说,你自己维护这个状态之后,为什么还有使用
state 呢?

不管怎么说使用 Flink 之后,还是建议尽量使用 state,而不是使用外存,flink 提供的 state 方便做一些容错处理。

Best,
Congxian


op <520075...@qq.com> 于2020年7月8日周三 下午4:07写道:

>
> 您好,是这样的,我想再程序里面关联一些用户id,使用cache缓存一些热数据,设置每个id写入多久后自动清理掉,关联的时候首先访问缓存,访问不到再去访问外部存储;
> 业务中的key会一直出现,也就是说ttl可能不会生效,这样没办法使用state ttl对吧?
>
>
>
>
> --原始邮件--
> 发件人:"Congxian Qiu" 发送时间:2020年7月8日(星期三) 下午3:56
> 收件人:"user-zh"
> 主题:Re: State里面用guava Cache
>
>
>
> TTL state[1] 满足你的需求吗? 如果不满足的话,能否描述下你的需求呢?
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
> Best,
> Congxian
>
>
> op <520075...@qq.com 于2020年7月8日周三 下午3:53写道:
>
>  您好,我主要是觉得Cache的自动过期比较好用
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Congxian Qiu"  发送时间:nbsp;2020年7月8日(星期三) 下午3:50
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: State里面用guava Cache
> 
> 
> 
>  你好,为什么需要在 State 里面再用 cache 呢?单纯的 State 不能满足需求吗?需求是什么呢?
>  另外,除了 ValueState,其他的 ListState/MapState 能否满足你的需求呢?
> 
>  Best,
>  Congxian
> 
> 
>  op <520075...@qq.comgt; 于2020年7月8日周三 上午10:31写道:
> 
>  gt; 大家好,我想使用一个 ValueState[Cache]的状态,但是发现这个状态的value 没办法更新,
>  gt;
>  gt;
>  gt;
> 比如我在map里面每次往cache里面put一个字符串,然后update这个state,输出cache的长度,为什么每次输出长度都是1


回复:Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-08 Thread 夏帅
感谢


Re: Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-08 Thread godfrey he
1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 方法是直接执行sql作业
(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
或 StreamExecutionEnvironment.execute()

详细可以参考 [1] [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset

Best,
Godfrey

Zhou Zach  于2020年7月8日周三 下午4:19写道:

> 去掉就好了,感谢解答
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-08 16:07:17,"Jingsong Li"  写道:
> >Hi,
> >
> >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。
> >
> >所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")"
> >并没有真正的物理节点。你不用再调用了。
> >
> >Best,
> >Jingsong
> >
> >On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach  wrote:
> >
> >>
> >>
> >>
> >> 代码结构改成这样的了:
> >>
> >>
> >>
> >>
> >> val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> >>
> >> val blinkEnvSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >>
> >> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
> >> blinkEnvSettings)
> >>
> >>
> >>
> >>
> >>
> >> streamExecutionEnv.execute("from kafka sink hbase")
> >>
> >>
> >>
> >>
> >> 还是报一样的错
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-08 15:40:41,"夏帅"  写道:
> >> >你好,
> >> >可以看看你的代码结构是不是以下这种
> >> >val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> >> >val bsSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
> >> >val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> >> >  ..
> >> >tableEnv.execute("")
> >> >如果是的话,可以尝试使用bsEnv.execute("")
> >> >1.11对于两者的execute代码实现有改动
> >> >
> >> >
> >> >--
> >> >发件人:Zhou Zach 
> >> >发送时间:2020年7月8日(星期三) 15:30
> >> >收件人:Flink user-zh mailing list 
> >> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming
> topology
> >> >
> >> >代码在flink
> >>
> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
> >> >Exception in thread "main" java.lang.IllegalStateException: No
> operators
> >> defined in streaming topology. Cannot generate StreamGraph.
> >> >at
> >>
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> >> >at
> >>
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> >> >at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> >> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
> >> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
> >> >
> >> >
> >> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
> >> >
> >> >
> >> >
> >> >
> >> >query:
> >> >streamTableEnv.executeSql(
> >> >  """
> >> >|
> >> >|CREATE TABLE `user` (
> >> >|uid BIGINT,
> >> >|sex VARCHAR,
> >> >|age INT,
> >> >|created_time TIMESTAMP(3),
> >> >|WATERMARK FOR created_time as created_time - INTERVAL '3'
> >> SECOND
> >> >|) WITH (
> >> >|'connector.type' = 'kafka',
> >> >|'connector.version' = 'universal',
> >> >|-- 'connector.topic' = 'user',
> >> >|'connector.topic' = 'user_long',
> >> >|'connector.startup-mode' = 'latest-offset',
> >> >|'connector.properties.group.id' = 'user_flink',
> >> >|'format.type' = 'json',
> >> >|'format.derive-schema' = 'true'
> >> >|)
> >> >|""".stripMargin)
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >streamTableEnv.executeSql(
> >> >  """
> >> >|
> >> >|CREATE TABLE user_hbase3(
> >> >|rowkey BIGINT,
> >> >|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
> >> >|) WITH (
> >> >|'connector.type' = 'hbase',
> >> >|'connector.version' = '2.1.0',
> >> >|'connector.table-name' = 'user_hbase2',
> >> >|'connector.zookeeper.znode.parent' = '/hbase',
> >> >|'connector.write.buffer-flush.max-size' = '10mb',
> >> >|'connector.write.buffer-flush.max-rows' = '1000',
> >> >|'connector.write.buffer-flush.interval' = '2s'
> >> >|)
> >> >|""".stripMargin)
> >> >
> >> >
> >> >streamTableEnv.executeSql(
> >> >  """
> >> >|
> >> >|insert into user_hbase3
> >> >|SELECT uid,
> >> >|
> >> >|  ROW(sex, age, created_time ) as cf
> >> >|  FROM  

Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-08 Thread Danny Chan
Thanks Zhijiang and Piotr for the great work as release manager, and thanks
everyone who makes the release possible!

Best,
Danny Chan
在 2020年7月8日 +0800 PM4:59,Congxian Qiu ,写道:
>
> Thanks Zhijiang and Piotr for the great work as release manager, and thanks
> everyone who makes the release possible!


Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-08 Thread godfrey he
Congratulations!

Thanks Zhijiang and Piotr for the great work, and thanks everyone for their
contribution!

Best,
Godfrey

Benchao Li  于2020年7月8日周三 下午12:39写道:

> Congratulations!  Thanks Zhijiang & Piotr for the great work as release
> managers.
>
> Rui Li  于2020年7月8日周三 上午11:38写道:
>
>> Congratulations! Thanks Zhijiang & Piotr for the hard work.
>>
>> On Tue, Jul 7, 2020 at 10:06 PM Zhijiang 
>> wrote:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.11.0, which is the latest major release.
>>>
>>> Apache Flink® is an open-source stream processing framework for distributed,
>>> high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements for this new major release:
>>> https://flink.apache.org/news/2020/07/06/release-1.11.0.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346364
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Cheers,
>>> Piotr & Zhijiang
>>>
>>
>>
>> --
>> Best regards!
>> Rui Li
>>
>
>
> --
>
> Best,
> Benchao Li
>


回复:flink 1.11 on kubernetes 构建失败

2020-07-08 Thread SmileSmile
hi yun tang!

没有对 /opt/flink/config 目录下的文件做写操作。 只是按照官网上的配置文件进行部署,镜像用的也是社区的镜像。
best!




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月08日 16:29,Yun Tang 写道:
Hi

你是不是对 /opt/flink/conf 
目录下的文件进行了sed相关写操作?社区文档中使用的方法是将configmap挂载成本地的flink-conf.yaml 
等文件,而这个挂载的目录其实是不可写的。
直接修改configmap里面的内容,这样挂载时候就会自动更新了。

祝好
唐云

From: SmileSmile 
Sent: Wednesday, July 8, 2020 13:03
To: Flink user-zh mailing list 
Subject: flink 1.11 on kubernetes 构建失败

hi

按照文档[1]的方法部署session cluster on kubernetes,集群构建的时候出现了如下报错


Starting Task Manager
sed: couldn't open temporary file /opt/flink/conf/sedVdyy6Q: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sedcj5VKQ: Read-only file 
system
/docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create 
/opt/flink/conf/flink-conf.yaml: Permission denied
sed: couldn't open temporary file /opt/flink/conf/sedB5eynR: Read-only file 
system
/docker-entrypoint.sh: 120: /docker-entrypoint.sh: cannot create 
/opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
[ERROR] The execution result is empty.
[ERROR] Could not get JVM parameters and dynamic configurations properly.


是否有遇到同样的问题,支个招



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制


Re: 如何在Flink SQL中使用周期性水印?

2020-07-08 Thread noake
非常感谢大家的回复。


我们的业务场景已经解决了。
目前的做法是在TableSource中配置水印策略, 在WatermarkGenerator中判断是否需要发射新的水印




原始邮件
发件人:Jark wuimj...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年7月8日(周三) 13:26
主题:Re: 如何在Flink SQL中使用周期性水印?


嗯, 可以在 JIRA 中开个 issue 描述下你的需求~ On Wed, 8 Jul 2020 at 12:01, 1193216154 
1193216...@qq.com wrote:  nbsp; nbsp;Jark,flink有没有必要去支持这个特性?我感觉还是有一些应用场景  
--nbsp;原始邮件nbsp;--  发件人:nbsp;"Jark 
Wu"imjark@gmail.comgt;;  发送时间:nbsp;2020年7月8日(星期三) 中午11:48  
收件人:nbsp;"user-zh"user-zh@flink.apache.orggt;;   主题:nbsp;Re: 如何在Flink 
SQL中使用周期性水印? 如果所有 partition 都没有数据,还希望 watermark 往前走,那 idle source 
确实解决不了这个问题。  目前确实没有太好的解决办法。   Best,  Jark   On Wed, 8 Jul 2020 at 11:08, 
1193216154 1193216154@qq.comgt; wrote:   gt; hi Jark Wu.  gt;  gt;  
我的理解是table.exec.source.idle-timeout只能解决watermark对齐的时候去忽略某个没有watermark的并行度。但是在每个并行度都没有watermark的时候,还是无法更新watermark。
  gt;  我觉得题主的意思应该是,在kafka的所有分区都没有数据的时候,最后一个窗口无法触发(因为没有watermark大于最后那个窗口结束时间了)。  
gt; 有没有可以设置在eventTime情况下,周期性生成当前时间的一个waterMark(和数据无关),因为可能没有新数据到来了。  gt;  gt;  
gt;  gt;  gt; --amp;nbsp;原始邮件amp;nbsp;--  gt; 
发件人:amp;nbsp;"Jark Wu"imjark@gmail.comamp;gt;;  gt; 
发送时间:amp;nbsp;2020年7月7日(星期二) 晚上6:09  gt; 
收件人:amp;nbsp;"user-zh"user-zh@flink.apache.orgamp;gt;;  gt;  gt; 
主题:amp;nbsp;Re: 如何在Flink SQL中使用周期性水印?  gt;  gt;  gt;  gt; Hi,  gt;  gt; 
这个问题我理解其实和周期性水印没有关系,是属于 idle source  gt; 的问题,你可以尝试下加上配置 
table.exec.source.idle-timeout = 10s 能不能解决你的问题。[1]  gt;  gt; Best,  gt; Jark  
gt;  gt; [1]:  gt;  gt;  
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout
  gt  
">https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeoutgt
  ;  gt; On Tue, 7 Jul 2020 at 17:35, noake noake@sina.cnamp;gt; wrote:  gt;  
gt; amp;gt; Dear All:  gt; amp;gt;  gt; amp;gt;  gt; amp;gt; 大佬们, 请教下如何在Flink 
SQL中使用周期性的水印。  gt; amp;gt; 我们在消费kafka时, 想设置在没有数据时水印时间也能继续向前走, 用的是Flink SQL。

Re: Chaining the creation of a WatermarkStrategy doesn't work?

2020-07-08 Thread Niels Basjes
Thanks guys,

It is clear this is a Java thing.

Niels

On Wed, Jul 8, 2020 at 9:56 AM Tzu-Li (Gordon) Tai 
wrote:

> Ah, didn't realize Chesnay has it answered already, sorry for the
> concurrent
> reply :)
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: flink 1.11 on kubernetes 构建失败

2020-07-08 Thread Yun Tang
Hi

你是不是对 /opt/flink/conf 
目录下的文件进行了sed相关写操作?社区文档中使用的方法是将configmap挂载成本地的flink-conf.yaml 
等文件,而这个挂载的目录其实是不可写的。
直接修改configmap里面的内容,这样挂载时候就会自动更新了。

祝好
唐云

From: SmileSmile 
Sent: Wednesday, July 8, 2020 13:03
To: Flink user-zh mailing list 
Subject: flink 1.11 on kubernetes 构建失败

hi

按照文档[1]的方法部署session cluster on kubernetes,集群构建的时候出现了如下报错


Starting Task Manager
sed: couldn't open temporary file /opt/flink/conf/sedVdyy6Q: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sedcj5VKQ: Read-only file 
system
/docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create 
/opt/flink/conf/flink-conf.yaml: Permission denied
sed: couldn't open temporary file /opt/flink/conf/sedB5eynR: Read-only file 
system
/docker-entrypoint.sh: 120: /docker-entrypoint.sh: cannot create 
/opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
[ERROR] The execution result is empty.
[ERROR] Could not get JVM parameters and dynamic configurations properly.


是否有遇到同样的问题,支个招



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制


Re: Re:Re: 如何在窗口关闭的时候清除状态

2020-07-08 Thread Yun Tang
Hi

TTL需要state descriptor明确声明enableTimeToLive[1],而一旦使用window,window内使用的timer和window 
state实际上不暴露给用户 
的,没法开启TTL,二者在使用方式上存在一定互斥。从语义上来说TTL可以清理过期数据,而默认的window实现都会清理已经trigger过的window内的state,所以二者在语义上其实也是有一定互斥的。

从性能角度考虑,一天的窗口显得有点大了,往往性能不好,如果能把类似逻辑迁移到TTL上实现会对性能更友好。

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

祝好
唐云


From: flink小猪 <18579099...@163.com>
Sent: Tuesday, July 7, 2020 10:44
To: user-zh@flink.apache.org 
Subject: Re:Re: 如何在窗口关闭的时候清除状态




[1].设置TTL应该也能达到相同的效果,我还是希望在窗口关闭的时候能够做一些自定义的操作(比如这里的清除状态,也许之后会有其他的操作TTL就不一样好用了)
[2].KeyedProcessFunction,应该自己注册定时器把,在我的代码里面是timeWIndow().trigger().process(), 
ProcessWindowFunction方法我只需要处理逻辑即可,不需要管定时的窗口。














在 2020-07-05 11:56:03,"Congxian Qiu"  写道:
>看上去这个需求是 一天的窗口,每个小时都 trigger 一次,希望 state 在 1 天之后进行清理。
>你可以尝试一下 TTL[1] State
>另外想问一下,你自己写 ProcessWindowFunction 的话,为什么不考虑 KeyedProcessFunction[2] 呢
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
>[2]
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html#the-keyedprocessfunction
>Best,
>Congxian
>
>
>JasonLee <17610775...@163.com> 于2020年7月4日周六 下午8:29写道:
>
>> 设置一下状态过期的时间呢
>>
>>
>> | |
>> JasonLee
>> |
>> |
>> 邮箱:17610775...@163.com
>> |
>>
>> Signature is customized by Netease Mail Master
>>
>> 在2020年07月03日 14:02,18579099...@163.com 写道:
>>
>> 大家好,我有一个需求,我在ProcessWindowFunction算子中定义了一个valueState,我希望在窗口关闭的时候能够将状态清理。我应该在哪里清理呢?
>>
>> 1.刚开始我选择在ProcessWindowFunction算子的process方法中清理,但是这里会有一个问题,我事件时间窗口开1天,我写了一个trigger,每隔一个小时输出一次结果。
>>
>> 如果我在process方法中清理,每隔一个小时就会被清理,而valueState中存的是我的中间结果,应该在窗口关闭的时候被清理(即一天之后)。这应该怎么办呢?
>>
>>
>>
>> 18579099...@163.com
>>


Re:Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-08 Thread Zhou Zach
去掉就好了,感谢解答

















在 2020-07-08 16:07:17,"Jingsong Li"  写道:
>Hi,
>
>你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。
>
>所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")"
>并没有真正的物理节点。你不用再调用了。
>
>Best,
>Jingsong
>
>On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach  wrote:
>
>>
>>
>>
>> 代码结构改成这样的了:
>>
>>
>>
>>
>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> val blinkEnvSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>
>> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
>> blinkEnvSettings)
>>
>>
>>
>>
>>
>> streamExecutionEnv.execute("from kafka sink hbase")
>>
>>
>>
>>
>> 还是报一样的错
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-08 15:40:41,"夏帅"  写道:
>> >你好,
>> >可以看看你的代码结构是不是以下这种
>> >val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> >val bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
>> >val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
>> >  ..
>> >tableEnv.execute("")
>> >如果是的话,可以尝试使用bsEnv.execute("")
>> >1.11对于两者的execute代码实现有改动
>> >
>> >
>> >--
>> >发件人:Zhou Zach 
>> >发送时间:2020年7月8日(星期三) 15:30
>> >收件人:Flink user-zh mailing list 
>> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology
>> >
>> >代码在flink
>> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
>> >Exception in thread "main" java.lang.IllegalStateException: No operators
>> defined in streaming topology. Cannot generate StreamGraph.
>> >at
>> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
>> >at
>> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
>> >at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
>> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
>> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
>> >
>> >
>> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
>> >
>> >
>> >
>> >
>> >query:
>> >streamTableEnv.executeSql(
>> >  """
>> >|
>> >|CREATE TABLE `user` (
>> >|uid BIGINT,
>> >|sex VARCHAR,
>> >|age INT,
>> >|created_time TIMESTAMP(3),
>> >|WATERMARK FOR created_time as created_time - INTERVAL '3'
>> SECOND
>> >|) WITH (
>> >|'connector.type' = 'kafka',
>> >|'connector.version' = 'universal',
>> >|-- 'connector.topic' = 'user',
>> >|'connector.topic' = 'user_long',
>> >|'connector.startup-mode' = 'latest-offset',
>> >|'connector.properties.group.id' = 'user_flink',
>> >|'format.type' = 'json',
>> >|'format.derive-schema' = 'true'
>> >|)
>> >|""".stripMargin)
>> >
>> >
>> >
>> >
>> >
>> >
>> >streamTableEnv.executeSql(
>> >  """
>> >|
>> >|CREATE TABLE user_hbase3(
>> >|rowkey BIGINT,
>> >|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
>> >|) WITH (
>> >|'connector.type' = 'hbase',
>> >|'connector.version' = '2.1.0',
>> >|'connector.table-name' = 'user_hbase2',
>> >|'connector.zookeeper.znode.parent' = '/hbase',
>> >|'connector.write.buffer-flush.max-size' = '10mb',
>> >|'connector.write.buffer-flush.max-rows' = '1000',
>> >|'connector.write.buffer-flush.interval' = '2s'
>> >|)
>> >|""".stripMargin)
>> >
>> >
>> >streamTableEnv.executeSql(
>> >  """
>> >|
>> >|insert into user_hbase3
>> >|SELECT uid,
>> >|
>> >|  ROW(sex, age, created_time ) as cf
>> >|  FROM  (select uid,sex,age, cast(created_time as VARCHAR) as
>> created_time from `user`)
>> >|
>> >|""".stripMargin)
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>>
>
>
>-- 
>Best, Jingsong Lee


?????? ??????Flink SQL?????????????????

2020-07-08 Thread 1193216154
 https://issues.apache.org/jira/browse/FLINK-18523


----
??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout
 gt
 


?????? State??????guava Cache

2020-07-08 Thread op
??id??cacheid
keyttlstate ttl??




----
??:"Congxian Qiu"https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
Best,
Congxian


op <520075...@qq.com ??2020??7??8?? 3:53??

 ??Cache??


 --nbsp;nbsp;--
 ??:nbsp;"Congxian Qiu"

Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-08 Thread Jingsong Li
Hi,

你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。

所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")"
并没有真正的物理节点。你不用再调用了。

Best,
Jingsong

On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach  wrote:

>
>
>
> 代码结构改成这样的了:
>
>
>
>
> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> val blinkEnvSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>
> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
> blinkEnvSettings)
>
>
>
>
>
> streamExecutionEnv.execute("from kafka sink hbase")
>
>
>
>
> 还是报一样的错
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-08 15:40:41,"夏帅"  写道:
> >你好,
> >可以看看你的代码结构是不是以下这种
> >val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> >val bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
> >val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> >  ..
> >tableEnv.execute("")
> >如果是的话,可以尝试使用bsEnv.execute("")
> >1.11对于两者的execute代码实现有改动
> >
> >
> >--
> >发件人:Zhou Zach 
> >发送时间:2020年7月8日(星期三) 15:30
> >收件人:Flink user-zh mailing list 
> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology
> >
> >代码在flink
> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
> >Exception in thread "main" java.lang.IllegalStateException: No operators
> defined in streaming topology. Cannot generate StreamGraph.
> >at
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> >at
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> >at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
> >
> >
> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
> >
> >
> >
> >
> >query:
> >streamTableEnv.executeSql(
> >  """
> >|
> >|CREATE TABLE `user` (
> >|uid BIGINT,
> >|sex VARCHAR,
> >|age INT,
> >|created_time TIMESTAMP(3),
> >|WATERMARK FOR created_time as created_time - INTERVAL '3'
> SECOND
> >|) WITH (
> >|'connector.type' = 'kafka',
> >|'connector.version' = 'universal',
> >|-- 'connector.topic' = 'user',
> >|'connector.topic' = 'user_long',
> >|'connector.startup-mode' = 'latest-offset',
> >|'connector.properties.group.id' = 'user_flink',
> >|'format.type' = 'json',
> >|'format.derive-schema' = 'true'
> >|)
> >|""".stripMargin)
> >
> >
> >
> >
> >
> >
> >streamTableEnv.executeSql(
> >  """
> >|
> >|CREATE TABLE user_hbase3(
> >|rowkey BIGINT,
> >|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
> >|) WITH (
> >|'connector.type' = 'hbase',
> >|'connector.version' = '2.1.0',
> >|'connector.table-name' = 'user_hbase2',
> >|'connector.zookeeper.znode.parent' = '/hbase',
> >|'connector.write.buffer-flush.max-size' = '10mb',
> >|'connector.write.buffer-flush.max-rows' = '1000',
> >|'connector.write.buffer-flush.interval' = '2s'
> >|)
> >|""".stripMargin)
> >
> >
> >streamTableEnv.executeSql(
> >  """
> >|
> >|insert into user_hbase3
> >|SELECT uid,
> >|
> >|  ROW(sex, age, created_time ) as cf
> >|  FROM  (select uid,sex,age, cast(created_time as VARCHAR) as
> created_time from `user`)
> >|
> >|""".stripMargin)
> >
> >
> >
> >
> >
> >
> >
> >
>


-- 
Best, Jingsong Lee


?????? flink state????

2020-07-08 Thread Robert.Zhang
??iterativestreamstateoperator??stream??keyby??operatorstate??key



Best Regards





----
??: "Congxian Qiu"

Re: State里面用guava Cache

2020-07-08 Thread Congxian Qiu
TTL state[1] 满足你的需求吗? 如果不满足的话,能否描述下你的需求呢?

[1]
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
Best,
Congxian


op <520075...@qq.com> 于2020年7月8日周三 下午3:53写道:

> 您好,我主要是觉得Cache的自动过期比较好用
>
>
> --原始邮件--
> 发件人:"Congxian Qiu" 发送时间:2020年7月8日(星期三) 下午3:50
> 收件人:"user-zh"
> 主题:Re: State里面用guava Cache
>
>
>
> 你好,为什么需要在 State 里面再用 cache 呢?单纯的 State 不能满足需求吗?需求是什么呢?
> 另外,除了 ValueState,其他的 ListState/MapState 能否满足你的需求呢?
>
> Best,
> Congxian
>
>
> op <520075...@qq.com 于2020年7月8日周三 上午10:31写道:
>
>  大家好,我想使用一个 ValueState[Cache]的状态,但是发现这个状态的value 没办法更新,
> 
> 
>  比如我在map里面每次往cache里面put一个字符串,然后update这个state,输出cache的长度,为什么每次输出长度都是1


Re:回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-08 Thread Zhou Zach



代码结构改成这样的了:




val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment

val blinkEnvSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, 
blinkEnvSettings)





streamExecutionEnv.execute("from kafka sink hbase")




还是报一样的错











在 2020-07-08 15:40:41,"夏帅"  写道:
>你好,
>可以看看你的代码结构是不是以下这种
>val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>val bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
>val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
>  ..
>tableEnv.execute("")
>如果是的话,可以尝试使用bsEnv.execute("")
>1.11对于两者的execute代码实现有改动
>
>
>--
>发件人:Zhou Zach 
>发送时间:2020年7月8日(星期三) 15:30
>收件人:Flink user-zh mailing list 
>主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology
>
>代码在flink 
>1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
>Exception in thread "main" java.lang.IllegalStateException: No operators 
>defined in streaming topology. Cannot generate StreamGraph.
>at 
>org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
>at 
>org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
>at 
>org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
>at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
>at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
>
>
>但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
>
>
>
>
>query:
>streamTableEnv.executeSql(
>  """
>|
>|CREATE TABLE `user` (
>|uid BIGINT,
>|sex VARCHAR,
>|age INT,
>|created_time TIMESTAMP(3),
>|WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>|) WITH (
>|'connector.type' = 'kafka',
>|'connector.version' = 'universal',
>|-- 'connector.topic' = 'user',
>|'connector.topic' = 'user_long',
>|'connector.startup-mode' = 'latest-offset',
>|'connector.properties.group.id' = 'user_flink',
>|'format.type' = 'json',
>|'format.derive-schema' = 'true'
>|)
>|""".stripMargin)
>
>
>
>
>
>
>streamTableEnv.executeSql(
>  """
>|
>|CREATE TABLE user_hbase3(
>|rowkey BIGINT,
>|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
>|) WITH (
>|'connector.type' = 'hbase',
>|'connector.version' = '2.1.0',
>|'connector.table-name' = 'user_hbase2',
>|'connector.zookeeper.znode.parent' = '/hbase',
>|'connector.write.buffer-flush.max-size' = '10mb',
>|'connector.write.buffer-flush.max-rows' = '1000',
>|'connector.write.buffer-flush.interval' = '2s'
>|)
>|""".stripMargin)
>
>
>streamTableEnv.executeSql(
>  """
>|
>|insert into user_hbase3
>|SELECT uid,
>|
>|  ROW(sex, age, created_time ) as cf
>|  FROM  (select uid,sex,age, cast(created_time as VARCHAR) as 
> created_time from `user`)
>|
>|""".stripMargin)
>
>
>
>
>
>
>
>


Re: Chaining the creation of a WatermarkStrategy doesn't work?

2020-07-08 Thread Tzu-Li (Gordon) Tai
Ah, didn't realize Chesnay has it answered already, sorry for the concurrent
reply :)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Chaining the creation of a WatermarkStrategy doesn't work?

2020-07-08 Thread Tzu-Li (Gordon) Tai
Hi,

This would be more of a Java question.
In short, type inference of generic types does not work for chained
invocations, and therefore type information has to be explicitly included.

If you'd like to chain the calls, this would work:

WatermarkStrategy watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES))
.withTimestampAssigner((SerializableTimestampAssigner) (element,
recordTimestamp) -> 42L);

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


?????? State??????guava Cache

2020-07-08 Thread op
??Cache??


----
??:"Congxian Qiu"

Re: Chaining the creation of a WatermarkStrategy doesn't work?

2020-07-08 Thread Chesnay Schepler
WatermarkStrategy.forBoundedOutOfOrderness(Duration.of(1, 
ChronoUnit.MINUTES)) returns a WatermarkStrategy, but the exact type 
is entirely dependent on the variable declaration (i.e., it is not 
dependent on any argument).


So, when you assign the strategy to a variable then the compiler can 
infer the generic type. Without a variable it is treated as a 
WatermarkStrategy, because there is nothing to infer the type from.


On 08/07/2020 08:54, Niels Basjes wrote:

Hi,

I'm migrating some of my code to Flink 1.11 and I ran into something I 
find strange.


This works

WatermarkStrategy watermarkStrategy = WatermarkStrategy
 .forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES));

watermarkStrategy  
.withTimestampAssigner((SerializableTimestampAssigner) (element, 
recordTimestamp) -> 42L);

However this does NOT work

WatermarkStrategy watermarkStrategy = WatermarkStrategy
 .forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES))  
.withTimestampAssigner((SerializableTimestampAssigner) (element, 
recordTimestamp) -> 42L);


When I try to compile this last one I get

Error:(109, 13) java: no suitable method found for 
withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner)
    method 
org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.TimestampAssignerSupplier) 
is not applicable
      (argument mismatch; 
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner 
cannot be converted to 
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier)
    method 
org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner) 
is not applicable
      (argument mismatch; 
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner 
cannot be converted to 
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner)


Why is that?

--
Best regards / Met vriendelijke groeten,

Niels Basjes





Re: State里面用guava Cache

2020-07-08 Thread Congxian Qiu
你好,为什么需要在 State 里面再用 cache 呢?单纯的 State 不能满足需求吗?需求是什么呢?
另外,除了 ValueState,其他的 ListState/MapState 能否满足你的需求呢?

Best,
Congxian


op <520075...@qq.com> 于2020年7月8日周三 上午10:31写道:

> 大家好,我想使用一个 ValueState[Cache]的状态,但是发现这个状态的value 没办法更新,
>
>
> 比如我在map里面每次往cache里面put一个字符串,然后update这个state,输出cache的长度,为什么每次输出长度都是1


Re: flink state使用

2020-07-08 Thread Congxian Qiu
Hi

KeyedState 的操作实际会针对当前的 key,也就是 keyBy 之后得到的 key,但是这个 key 用户看不到。在一个 operator
中 state 的数目是你创建的数目,但是每个 state 可以有多个 KV 对,其中 K 是 keyby 的 key,V 可以是
value,list,map 等。同一个 operator 上不同的 key 的 state 保证能够正确读写的。

Best,
Congxian


Robert.Zhang <173603...@qq.com> 于2020年7月8日周三 下午3:41写道:

> Hello,all
> 目前遇到一个关于state的使用问题
>DataStream iteration.closeWith(
>
> iteration.keyBy(0).process(new RichMapFunc());
>
>
>
> 我在richmapfunc里使用了state,那么每一次迭代的时候,这个state还是同一个吗?
> 如果我对result这个结果stream继续使用了keyby(0)的话,state也是同一个么?
> 此外,如果我对一个stream进行keyby之后,key一直是不变的,
> 但是map等operator之后返回的并非是keyedstream(operator不对key进行操作),是否只能继续keyby,还是有更好的方式


Re: [ANNOUNCE] Apache Flink 1.11.0 released

2020-07-08 Thread Congxian Qiu
Thanks Zhijiang and Piotr for the great work as release manager, and thanks
everyone who makes the release possible!

Best,
Congxian


Benchao Li  于2020年7月8日周三 下午12:39写道:

> Congratulations!  Thanks Zhijiang & Piotr for the great work as release
> managers.
>
> Rui Li  于2020年7月8日周三 上午11:38写道:
>
>> Congratulations! Thanks Zhijiang & Piotr for the hard work.
>>
>> On Tue, Jul 7, 2020 at 10:06 PM Zhijiang 
>> wrote:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.11.0, which is the latest major release.
>>>
>>> Apache Flink® is an open-source stream processing framework for distributed,
>>> high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements for this new major release:
>>> https://flink.apache.org/news/2020/07/06/release-1.11.0.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346364
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Cheers,
>>> Piotr & Zhijiang
>>>
>>
>>
>> --
>> Best regards!
>> Rui Li
>>
>
>
> --
>
> Best,
> Benchao Li
>


Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-08 Thread Leonard Xu

如果是同一个数据库(集群)的表,gtid应该是全局唯一且递增的,用gtid是更好的,异构的数据源就没有一个全局的id了,你可以试下. ^_^

祝好

> 在 2020年7月8日,15:32,jindy_liu <286729...@qq.com> 写道:
> 
> 如果用变更时间来有序,可能会有一个问题,是如果数据变更太快了,两条先后数据的时间可能是一样的?
> 
> 这个问题不知道是不是可以用bin-log的gtid来搞?至少是递增的?
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



flink state????

2020-07-08 Thread Robert.Zhang
Hello,all
state??
   DataStream

回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-08 Thread 夏帅
你好,
可以看看你的代码结构是不是以下这种
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
  ..
tableEnv.execute("")
如果是的话,可以尝试使用bsEnv.execute("")
1.11对于两者的execute代码实现有改动


--
发件人:Zhou Zach 
发送时间:2020年7月8日(星期三) 15:30
收件人:Flink user-zh mailing list 
主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology

代码在flink 
1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
Exception in thread "main" java.lang.IllegalStateException: No operators 
defined in streaming topology. Cannot generate StreamGraph.
at 
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at 
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)


但是,数据是正常sink到了hbase,是不是executeSql误报了。。。




query:
streamTableEnv.executeSql(
  """
|
|CREATE TABLE `user` (
|uid BIGINT,
|sex VARCHAR,
|age INT,
|created_time TIMESTAMP(3),
|WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
|) WITH (
|'connector.type' = 'kafka',
|'connector.version' = 'universal',
|-- 'connector.topic' = 'user',
|'connector.topic' = 'user_long',
|'connector.startup-mode' = 'latest-offset',
|'connector.properties.group.id' = 'user_flink',
|'format.type' = 'json',
|'format.derive-schema' = 'true'
|)
|""".stripMargin)






streamTableEnv.executeSql(
  """
|
|CREATE TABLE user_hbase3(
|rowkey BIGINT,
|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
|) WITH (
|'connector.type' = 'hbase',
|'connector.version' = '2.1.0',
|'connector.table-name' = 'user_hbase2',
|'connector.zookeeper.znode.parent' = '/hbase',
|'connector.write.buffer-flush.max-size' = '10mb',
|'connector.write.buffer-flush.max-rows' = '1000',
|'connector.write.buffer-flush.interval' = '2s'
|)
|""".stripMargin)


streamTableEnv.executeSql(
  """
|
|insert into user_hbase3
|SELECT uid,
|
|  ROW(sex, age, created_time ) as cf
|  FROM  (select uid,sex,age, cast(created_time as VARCHAR) as 
created_time from `user`)
|
|""".stripMargin)










Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-08 Thread jindy_liu
如果用变更时间来有序,可能会有一个问题,是如果数据变更太快了,两条先后数据的时间可能是一样的?

这个问题不知道是不是可以用bin-log的gtid来搞?至少是递增的?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-08 Thread Zhou Zach
代码在flink 
1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
Exception in thread "main" java.lang.IllegalStateException: No operators 
defined in streaming topology. Cannot generate StreamGraph.
at 
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at 
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)


但是,数据是正常sink到了hbase,是不是executeSql误报了。。。




query:
streamTableEnv.executeSql(
  """
|
|CREATE TABLE `user` (
|uid BIGINT,
|sex VARCHAR,
|age INT,
|created_time TIMESTAMP(3),
|WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
|) WITH (
|'connector.type' = 'kafka',
|'connector.version' = 'universal',
|-- 'connector.topic' = 'user',
|'connector.topic' = 'user_long',
|'connector.startup-mode' = 'latest-offset',
|'connector.properties.group.id' = 'user_flink',
|'format.type' = 'json',
|'format.derive-schema' = 'true'
|)
|""".stripMargin)






streamTableEnv.executeSql(
  """
|
|CREATE TABLE user_hbase3(
|rowkey BIGINT,
|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
|) WITH (
|'connector.type' = 'hbase',
|'connector.version' = '2.1.0',
|'connector.table-name' = 'user_hbase2',
|'connector.zookeeper.znode.parent' = '/hbase',
|'connector.write.buffer-flush.max-size' = '10mb',
|'connector.write.buffer-flush.max-rows' = '1000',
|'connector.write.buffer-flush.interval' = '2s'
|)
|""".stripMargin)


streamTableEnv.executeSql(
  """
|
|insert into user_hbase3
|SELECT uid,
|
|  ROW(sex, age, created_time ) as cf
|  FROM  (select uid,sex,age, cast(created_time as VARCHAR) as 
created_time from `user`)
|
|""".stripMargin)









Re: TaskManager docker image for Beam WordCount failing with ClassNotFound Exception

2020-07-08 Thread Tzu-Li (Gordon) Tai
Hi,

Assuming that the job jar bundles all the required dependencies (including
the Beam dependencies), making them available under `/opt/flink/usrlib/` in
the container either by mounting or directly adding the job artifacts should
work. AFAIK It is also the recommended way, as opposed to adding them under
`/lib`.

>From the exception, what seems to be missing is the Beam dependencies.
Just to get rid of the obvious first: are you sure that the job jar has
those bundled?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: FlinkKinesisProducer blocking ?

2020-07-08 Thread Tzu-Li (Gordon) Tai
Hi Vijay,

The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
It does however apply backpressure (therefore effectively blocking all
upstream operators) when the number of outstanding records accumulated
exceeds a set limit, configured using the FlinkKinesisProducer#setQueueLimit
method.

For starters, you can maybe check if that was set appropriately.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re:Re: flink 1.11 connector jdbc 依赖解析失败

2020-07-08 Thread Zhou Zach
感谢提醒,




我是在https://mvnrepository.com/这个上面搜没搜到对应的包的,不过,module名改成flink-connector-jdbc,可以了,感谢提醒








在 2020-07-08 09:35:10,"Leonard Xu"  写道:
>Hello,
>
>我看下了maven仓库里有的[1], 官网文档里也有下载链接[2],是不是pom里的依赖没有写对?1.11 jdbc connector 的module名从 
>flink-jdbc 规范到了 flink-connector-jdbc。
>
>祝好,
>Leonard Xu
>
>[1] 
>https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.11.0/
> 
>
>[2] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
> 
>
>
>
>> 在 2020年7月8日,08:15,Zhou Zach  写道:
>> 
>> hi all,
>> flink升级到1.11,flink-connector-jdbc 
>> idea解析失败,去maven仓库查也没查到,请问是不是要手动编译1.11的源码的方式安装依赖的
>> 
>


Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-08 Thread jindy_liu
恩,主要是看flink 的发布里说flink
支持cdc了,感觉这个功能好像是我要的,感觉好像我要做的事情能用flink都搞定。就不用多个开源平台切换与维护多个组件了。

我原本还想先基于flink sql 将数据存量数据先全量导一次异构存储(如hbase, pgsql等)(批量),然后再flink cdc
把mysql的bin-log变化数据搬运到异构存储(如hbase,
pgsql等)后(增量),同时再镜像一份cdc后的kafka里的json数据到下游(变化通知)。

那么下游再基于镜像的kafka里的数据(变化)+异构的镜像数据,再基于flink去做一些实时计算的场景需求(比如最近一个月内的前多少名的数据等),不用都挤在mysql的从库在做一些分析了,并且有些分析也不适合在mysql上搞,一些olap类的。

但实际demo了吧,光一个数据的实时搬运里,要解决的问题还挺多的,光flink好像不太行(可能是我不太熟悉,我接触flink时间较短)
问题:
1、存量+实时数据怎么结合起来,目前语义上只能做到“至少一次”,先存量搬运,再binlog实时迁移,但难以定位存量搬运完后对应的kafka的起始消费位置。(但业务场景如果只需要“至少一次”,还是可以用的,业务大部分是只需“至少一次”)

2、db里多表有序:这里有kafka性能问题和有序保证问题;目前业务场景db表变化不太快,一天1百w行数据的变更,可以搞定,同时也可以按需的N张表有序,不用整个db实例里的全部表。但这个有序感觉用flink
sql cdc还不太好搞多表。如果直接写程序去消费

3、多sink怎么保证数据一致性:具体来说,在增量同步的时候,flink需要先sink 异构存储(先),后要sink
kafka(后),怎么保证两个sink的先后次序与原子性?

现请问下,flink 的sink能定义先后吗?
如上面的,将kafka里的canal-json数据取出后,能先写pgsql成功,再把json数据原封不动写kafka吗?如果目前不支持,可否自己改造下支持?





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Hadoop依赖

2020-07-08 Thread Xintong Song
你说的 “jobmanager的lib文件夹” 是指哪里?Flink 的部署方式是怎样的?CLI 运行在哪里?

Thank you~

Xintong Song



On Wed, Jul 8, 2020 at 10:59 AM Z-Z  wrote:

> Hi, 各位大佬们,有个问题,Flink
> 1.10.0版本中,已经在jobmanager的lib文件夹添加了flink-shaded-hadoop-2-uber-2.7.5-10.0.jar文件,通过webui上传可以正常运行任务,但通过cli命令,提交任务后报Could
> not find a file system implementation for scheme 'hdfs'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded.有谁知道是怎么回事吗?


Chaining the creation of a WatermarkStrategy doesn't work?

2020-07-08 Thread Niels Basjes
Hi,

I'm migrating some of my code to Flink 1.11 and I ran into something I find
strange.

This works

WatermarkStrategy watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES));

watermarkStrategy
.withTimestampAssigner((SerializableTimestampAssigner)
(element, recordTimestamp) -> 42L);

However this does NOT work

WatermarkStrategy watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES))
.withTimestampAssigner((SerializableTimestampAssigner)
(element, recordTimestamp) -> 42L);


When I try to compile this last one I get

Error:(109, 13) java: no suitable method found for
withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner)
method
org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.TimestampAssignerSupplier)
is not applicable
  (argument mismatch;
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner
cannot be converted to
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier)
method
org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner)
is not applicable
  (argument mismatch;
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner
cannot be converted to
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner)

Why is that?

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


回复: 求助:FLINKSQL1.10实时统计累计UV

2020-07-08 Thread seeksst
你好,我刚刚切换到1.10.1版本,还是可以设置的,这个接口是在StreamTableEnvironment里面,使用的是flink-table-api-java-bridge,如果你使用的是scala版本,这个我不是很了解,理论应该差不多。
在1.10版本中,我一般是这么写的:
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
也不需要进行sqlUpdate传入,直接tableEnv.getConfig().setIdleStateRetentionTime() 设置就可以了。
另外,你指的状态越来越大需要描述的更详细一点,具体现象和状态后端的选择。之前也提到了,使用rocksdb做状态后端,需要开启ttl配置,
否则现象就是checkpoint的文件大小会不断增大。


原始邮件
发件人:x35907...@qq.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年7月8日(周三) 11:08
主题:回复: 求助:FLINKSQL1.10实时统计累计UV


您说的这种方式,V1.10.1 不支持吧,我看参数只有一个String类型的 void sqlUpdate(String stmt); 
--nbsp;原始邮件nbsp;-- 
发件人:nbsp;"seeksst"seeksst@163.comgt;; 发送时间:nbsp;2020年7月7日(星期二) 中午11:35 
收件人:nbsp;"user-zh"user-zh@flink.apache.orggt;; 主题:nbsp;回复: 
求助:FLINKSQL1.10实时统计累计UV 
我看你代码上是sqlUpdate,tableConfig是另外设置的,需要作为入参一同放入sqlUpdate中, 使用方法sqlUpdate(str, 
config) 另外如果你使用的是rocksdb,需要开启rocksdb的ttl 
state.backend.rocksdb.ttl.compaction.filter.enabled设置成true 低版本这个参数默认是false 原始邮件 
发件人:x35907...@qq.com 收件人:user-zhuser...@flink.apache.org 发送时间:2020年7月7日(周二) 
10:46 主题:回复: 求助:FLINKSQL1.10实时统计累计UV 是blinkval setttings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() 
--nbsp;原始邮件nbsp;-- 发件人:nbsp;"Benchao 
Li"libenchao@apache.orggt;; 发送时间:nbsp;2020年7月6日(星期一) 晚上11:11 
收件人:nbsp;"user-zh"user-zh@flink.apache.orggt;; 主题:nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV 感觉不太应该有这种情况,你用的是blink planner么? x 35907418@qq.comgt; 
于2020年7月6日周一 下午1:24写道: gt; sorry,我说错了,确实没有,都是group agg. gt; gt; 
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),但是状态还是越来越大,没有按既定配置自动清理.
 gt; gt; gt; --amp;nbsp;原始邮件amp;nbsp;-- gt; 
发件人:amp;nbsp;"Benchao Li"libenchao@apache.orgamp;gt;; gt; 
发送时间:amp;nbsp;2020年7月6日(星期一) 中午12:52 gt; 
收件人:amp;nbsp;"user-zh"user-zh@flink.apache.orgamp;gt;; gt; gt; 主题:amp;nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV gt; gt; gt; gt; 我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。 gt; 
这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。 gt; gt; [1] gt; gt; 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
 gt; gt; x 35907418@qq.comamp;gt; 于2020年7月6日周一 上午11:15写道: gt; gt; amp;gt; 
版本是1.10.1,最后sink的时候确实是一个window里面做count gt; amp;gt; 
distinct操作。请问是只要计算过程中含有一个window里面做count gt; amp;gt; gt; 
distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupamp;amp;nbsp;DATE_FORMAT(rowtm,
 gt; amp;gt; '-MM-dd') 这个sql对应的状态很大。代码如下: gt; amp;gt; val rt_totaluv_view : 
Table = tabEnv.sqlQuery( gt; amp;gt;amp;nbsp;amp;nbsp; """ gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT MAX(DATE_FORMAT(rowtm, 
'-MM-dd gt; HH:mm:00')) gt; amp;gt; time_str,COUNT(DISTINCT userkey) uv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM source gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY DATE_FORMAT(rowtm, 
'-MM-dd') gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) gt; amp;gt; gt; 
amp;gt; val totaluvTmp = gt; 
tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) gt; 
amp;gt;amp;nbsp;amp;nbsp; .filter( line =amp;amp;gt; line._1 == true ).map( 
line gt; =amp;amp;gt; line._2 ) gt; amp;gt; gt; amp;gt; val totaluvTabTmp = 
tabEnv.fromDataStream( totaluvTmp ) gt; amp;gt; gt; amp;gt; tabEnv.sqlUpdate( 
gt; amp;gt;amp;nbsp;amp;nbsp; s""" gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; INSERT INTO mysql_totaluv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT _1,MAX(_2) gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM $totaluvTabTmp gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY _1 gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
--amp;amp;nbsp;原始邮件amp;amp;nbsp;-- gt; amp;gt; 
发件人:amp;amp;nbsp;"Benchao Li"libenchao@apache.orgamp;amp;gt;; gt; amp;gt; 
发送时间:amp;amp;nbsp;2020年7月3日(星期五) 晚上9:47 gt; amp;gt; 
收件人:amp;amp;nbsp;"user-zh"user-zh@flink.apache.orgamp;amp;gt;; gt; amp;gt; gt; 
amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; gt; amp;gt; gt; 
amp;gt; gt; amp;gt; 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题, 
gt; amp;gt; 这个已经在1.11中修复了。 gt; amp;gt; gt; amp;gt; [1] 
https://issues.apache.org/jira/browse/FLINK-17942 gt; amp;gt; gt; amp;gt; x 
35907418@qq.comamp;amp;gt; 于2020年7月3日周五 下午4:34写道: gt; amp;gt; gt; amp;gt; 
amp;amp;gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, gt; amp;gt; amp;amp;gt; 
gt; amp;gt; amp;amp;gt; gt; amp;gt; gt; 
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
 gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt;