Re: 回复: 回复: 回复: 回复:如何用SQL表达对设备离在线监控

2019-12-05 文章 Shuo Cheng
用 udagg 应该能比较完美的解决你的问题 ^.^

On 12/6/19, Djeng Lee  wrote:
> 存在查询
>
> 在 2019/12/5 下午4:06,“Yuan,Youjun” 写入:
>
> Count=0的窗口如何能得到呢?没有数据就没有产出。
> 然而可以同rows
> over窗口,将两个前后窗口的sum-当前的count,可以间接得到两个窗口的count是否相等。同时辅以前后窗口时间的差,来辅助判断。
>
> 最终在自定义函数last_value_str/first_value_str的帮助下,勉强得以实现(尚不完美,可能出现连续的ONLINE的输出)
> 下面是我的SQL,仅供参考:
>
> INSERT INTO mysink
> SELECT userid, lastts, case when preCnt <= 0 OR tsdiff > 10 THEN
> 'ONLINE' ELSE 'offline' END AS status
> FROM (
>   SELECT curCnt, preCnt, lastts, firstts, userid, yeardiff * 31536000 +
> monthdiff * 2678400 + daydiff * 86400 + hourdiff * 3600 + mindiff * 60 +
> seconddiff AS tsdiff
>   FROM (
>   SELECT curCnt, preCnt,
>   cast(substring(lastts, 1, 4) as bigint) - 
> cast(substring(firstts, 1,
> 4) as bigint) as yeardiff,
>   cast(substring(lastts, 6, 2) as bigint) - 
> cast(substring(firstts, 6,
> 2) as bigint) as monthdiff,
>   cast(substring(lastts, 9, 2) as bigint) - 
> cast(substring(firstts, 9,
> 2) as bigint) as daydiff,
>   cast(substring(lastts, 12, 2) as bigint) - 
> cast(substring(firstts,
> 12, 2) as bigint) as hourdiff,
>   cast(substring(lastts, 15, 2) as bigint) - 
> cast(substring(firstts,
> 15, 2) as bigint) as mindiff,
>   cast(substring(lastts, 18, 2) as bigint) - 
> cast(substring(firstts,
> 18, 2) as bigint) as seconddiff,
>   lastts, firstts, userid
>   FROM (
>   SELECT userid, cnt AS curCnt, sum(cnt) OVER w - cnt as 
> preCnt,
> last_value_str(ts0) OVER w as lastts, first_value_str(ts0) OVER w as firstts
>
>   FROM (
>   SELECT HOP_PROCTIME(rowtime, interval '5' 
> second, interval '10'
> second) AS rowtime, count(*) as cnt, userid, last_value_str(cast(rowtime AS
> varchar)) AS ts0
>   FROM mysrc
>   GROUP BY userid, hop(rowtime, interval '5' 
> second, interval '10'
> second)
>   )
>   WINDOW w as (PARTITION BY userid ORDER BY rowtime ROWS 
> BETWEEN 1
> PRECEDING AND CURRENT ROW)
>   )
>   )
>   WHERE (preCnt <= 0 OR yeardiff * 31536000 + monthdiff * 2678400 +
> daydiff * 86400 + hourdiff * 3600 + mindiff * 60 + seconddiff > 10) OR
> (curCnt = preCnt AND lastts = lastts)
> )
>
> -邮件原件-
> 发件人: 1193216154 <1193216...@qq.com>
> 发送时间: Thursday, December 5, 2019 2:43 PM
> 收件人: user-zh 
> 主题: 回复: 回复: 回复:如何用SQL表达对设备离在线监控
>
> 可以考虑用flink cep,应该可以解决你的问题。
>
>
> --原始邮件--
> 发件人:"Djeng Lee" 发送时间:2019年12月5日(星期四) 下午2:40
> 收件人:"user-zh@flink.apache.org"
> 主题:Re: 回复: 回复:如何用SQL表达对设备离在线监控
>
>
>
> 上线时间,前n窗口count == 0 , 后n窗口count  1。说明是上线。由此得出上线时间.
> 离线时间,前n 窗口count=1, 后n窗口count==0,说明下线,由此可得下线时间。
> 前n后n都1 作为心跳维持。
>
>
>
> 在 2019/12/5 下午2:06,“Yuan,Youjun”
>  谢谢你的回复。
> 
> 这种方案比较有意思,只是还不能区分设备第一次心跳产生的count=1的消息(上线),和设备最后一次心跳产生的count=1的消息(下线)
> 
>  -邮件原件-
>  发件人: 1193216154 <1193216...@qq.com
>  发送时间: Wednesday, December 4, 2019 9:39 PM
>  收件人: user-zh   主题: 回复:如何用SQL表达对设备离在线监控
> 
>  设定一个滑动窗口,窗口大小大于等于2n,滑动间隔大于等于n,若一次窗口结算,count
> 大于等于2,则在线,否则下线
> 
>  ---原始邮件---
>  发件人: "Yuan,Youjun"  发送时间: 2019年12月4日(周三) 晚上6:49
>  收件人:
> "user-zh@flink.apache.org"  主题: 如何用SQL表达对设备离在线监控
> 
> 
>  Hi all,
> 
> 
> 假设我们有很多设备,设备正常工作期间会定时发送心跳到服务器。如果某个设备在超过N分钟的时间内,没有发送任何心跳到服务器,服务器会认为设备已经离线。直到下一次心跳,才判定设备为在线。
> 
> 需求:在判定设备离线时,产出一条设备离线消息;在设备经过一次离线后,第一次心跳时,产出一条设备上线的消息;
>  假设设备上报的消息包含当前时间(ts)和设备id(deviceid):
>  1575456144,dev1
>  1575456146,dev2
>  1575456147,dev1
>  ….
> 
>  产出的离在线消息分别格式如下(第一列为设备离在线时间):
>  1575456158,dev1,offline 
> 1575456169,dev2,online  
> 能否用一条SQL来定义这个作业呢?
> 
>  谢谢!
>  袁尤军
> 
>
>


Re: 回复: 回复: 回复: 回复:如何用SQL表达对设备离在线监控

2019-12-05 文章 Djeng Lee
存在查询

在 2019/12/5 下午4:06,“Yuan,Youjun” 写入:

Count=0的窗口如何能得到呢?没有数据就没有产出。
然而可以同rows 
over窗口,将两个前后窗口的sum-当前的count,可以间接得到两个窗口的count是否相等。同时辅以前后窗口时间的差,来辅助判断。
最终在自定义函数last_value_str/first_value_str的帮助下,勉强得以实现(尚不完美,可能出现连续的ONLINE的输出)
下面是我的SQL,仅供参考:

INSERT INTO mysink 
SELECT userid, lastts, case when preCnt <= 0 OR tsdiff > 10 THEN 'ONLINE' 
ELSE 'offline' END AS status 
FROM (
SELECT curCnt, preCnt, lastts, firstts, userid, yeardiff * 31536000 + 
monthdiff * 2678400 + daydiff * 86400 + hourdiff * 3600 + mindiff * 60 + 
seconddiff AS tsdiff 
FROM (
SELECT curCnt, preCnt, 
cast(substring(lastts, 1, 4) as bigint) - 
cast(substring(firstts, 1, 4) as bigint) as yeardiff, 
cast(substring(lastts, 6, 2) as bigint) - 
cast(substring(firstts, 6, 2) as bigint) as monthdiff, 
cast(substring(lastts, 9, 2) as bigint) - 
cast(substring(firstts, 9, 2) as bigint) as daydiff, 
cast(substring(lastts, 12, 2) as bigint) - 
cast(substring(firstts, 12, 2) as bigint) as hourdiff, 
cast(substring(lastts, 15, 2) as bigint) - 
cast(substring(firstts, 15, 2) as bigint) as mindiff, 
cast(substring(lastts, 18, 2) as bigint) - 
cast(substring(firstts, 18, 2) as bigint) as seconddiff, 
lastts, firstts, userid
FROM ( 
SELECT userid, cnt AS curCnt, sum(cnt) OVER w - cnt as 
preCnt, last_value_str(ts0) OVER w as lastts, first_value_str(ts0) OVER w as 
firstts  
FROM (  
SELECT HOP_PROCTIME(rowtime, interval '5' 
second, interval '10' second) AS rowtime, count(*) as cnt, userid, 
last_value_str(cast(rowtime AS varchar)) AS ts0  
FROM mysrc  
GROUP BY userid, hop(rowtime, interval '5' 
second, interval '10' second)
) 
WINDOW w as (PARTITION BY userid ORDER BY rowtime ROWS 
BETWEEN 1 PRECEDING AND CURRENT ROW)
)
) 
WHERE (preCnt <= 0 OR yeardiff * 31536000 + monthdiff * 2678400 + 
daydiff * 86400 + hourdiff * 3600 + mindiff * 60 + seconddiff > 10) OR (curCnt 
= preCnt AND lastts = lastts)
)

-邮件原件-
发件人: 1193216154 <1193216...@qq.com> 
发送时间: Thursday, December 5, 2019 2:43 PM
收件人: user-zh 
主题: 回复: 回复: 回复:如何用SQL表达对设备离在线监控

可以考虑用flink cep,应该可以解决你的问题。


--原始邮件--
发件人:"Djeng Lee"

yarn-session模式通过python api消费kafka数据报错

2019-12-05 文章 改改

[root@hdp02 bin]# ./flink run -yid application_1575352295616_0014 -py 
/opt/tumble_window.py
2019-12-06 14:15:48,262 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- Found Yarn properties file under /tmp/.yarn-properties-root.
2019-12-06 14:15:48,262 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- Found Yarn properties file under /tmp/.yarn-properties-root.
2019-12-06 14:15:48,816 INFO  org.apache.hadoop.yarn.client.RMProxy 
- Connecting to ResourceManager at 
hdp02.wuagecluster/10.2.19.32:8050
2019-12-06 14:15:48,964 INFO  org.apache.hadoop.yarn.client.AHSProxy
- Connecting to Application History server at 
hdp03.wuagecluster/10.2.19.33:10200
2019-12-06 14:15:48,973 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- No path for the flink jar passed. Using the location of class 
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-12-06 14:15:48,973 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- No path for the flink jar passed. Using the location of class 
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-12-06 14:15:49,101 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Found 
application JobManager host name 'hdp07.wuagecluster' and port '46376' from 
supplied application id 'application_1575352295616_0014'
Starting execution of program
Traceback (most recent call last):
  File "/usr/lib64/python2.7/runpy.py", line 162, in _run_module_as_main
"__main__", fname, loader, pkg_name)
  File "/usr/lib64/python2.7/runpy.py", line 72, in _run_code
exec code in run_globals
  File "/tmp/pyflink/b9a29ae4-89ac-4289-9111-5f77ad90d386/tumble_window.py", 
line 62, in 
.register_table_source("source")
  File 
"/tmp/pyflink/b9a29ae4-89ac-4289-9111-5f77ad90d386/pyflink.zip/pyflink/table/descriptors.py",
 line 1293, in register_table_source
  File 
"/tmp/pyflink/b9a29ae4-89ac-4289-9111-5f77ad90d386/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1286, in __call__
  File 
"/tmp/pyflink/b9a29ae4-89ac-4289-9111-5f77ad90d386/pyflink.zip/pyflink/util/exceptions.py",
 line 154, in deco
pyflink.util.exceptions.TableException: u'findAndCreateTableSource failed.'
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:83)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
 at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
 at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
 at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)

Re: flink消费kafka得问题

2019-12-05 文章 Charoes
hi,
参考文档里, 两个flink可以消费不同的partitions.

Map specificStartOffsets = new
HashMap<>();specificStartOffsets.put(new
KafkaTopicPartition("myTopic", 0), 23L);specificStartOffsets.put(new
KafkaTopicPartition("myTopic", 1), 31L);specificStartOffsets.put(new
KafkaTopicPartition("myTopic", 2), 43L);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);


On Fri, Dec 6, 2019 at 1:56 PM cs <58683...@qq.com> wrote:

> Hi all,请教一个问题,我起了两个一模一样得flink程序,消费得同一个topic使用的同一个group
> id。根据程序得运行情况看,两个程序可以同时获取kakfa得全量数据。但我想要得情况是每个flink程序只消费一半得数据,既同一个group
> id应该做组内竞争得关系。我应该配置什么吗?


flink????kafka??????

2019-12-05 文章 cs
Hi 
all,??flink??topicgroup
 
idkakfaflink??group
 id