求和统计后的数据格式-科学计数法

2020-12-28 文章 superainbower
大家好:
有一张Mysql表,flink 
cdc接了表的binlog,对表中按id分组后对某个字段(DECIMAL(38,2))求和,之后输出到Kafka(id STRING, sum  
DECIMAL(38,2));


现在的问题是,Kafka中收到的sum值会以不同形式展示,如下2条数据中的charge_number字段
{"stats_key”:"123","charge_number":18353.83,"charge_duration":38825,"charge_income":5013.8,"charge_count":432}
{"stats_key”:”456","charge_number":6E+2,"charge_duration":675,"charge_income":45,"charge_count":3}


存在了这样的科学计数法 6E+2,这个该怎么转换成具体数值呢?不然后续不好处理


| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制



Re: Some questions about limit push down

2020-12-28 文章 Shengkai Fang
hi, Jun Zhang.

Currently, the rule `PushLimitIntoTableSourceScanRule` captures the
structure that the `FlinkLogicalSort` node is the parent of the
`FlinkLogicalTableSourceScan`.

In your case, we have a Calc node between the Sort node and the Scan node,
which makes the rule fails to apply.

Maybe we can add a rule to transpose between the Sort and Calc.

Best,
Shengkai

Jun Zhang  于2020年12月28日周一 下午5:31写道:

> when I query hive table by sql, like this `select * from hivetable where id
> = 1 limit 1`,   I found that the limit push down is invalid, is it a bug or
> was it designed like this?
>
> if the sql is  'select * from hivetable  limit 1'  ,it is ok
>
> thanks
>


Re: Flink 1.11.2客户端 select * from table

2020-12-28 文章 Shengkai Fang
要看一下日志里面是否有报错信息,才能进一步分析。



Jacob <17691150...@163.com> 于2020年12月29日周二 上午9:11写道:

> Dear All,
>
>
> 
>
>
> 在Flink SQL客户端中使用select * from table 语句查询表数据,结果只显示表头,没有数据,是什么原因。
>
> 
>
> 在sql-client-defaults.yaml中已经指定catalog
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink1.10.2 读取mysql并打印数据,本地可以执行,yarn上报错,什么原因呢

2020-12-28 文章 xufengfeng
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
Could not complete the operation. Number of retries has been exhausted.
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:342)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:493)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:472)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:531)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:323)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:339)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
拒绝连接: emr-worker-4.cluster/:40751
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 19 more
Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
拒绝连接: emr-worker-4.cluster-1/xxx:40751
Caused by: java.net.ConnectException: 拒绝连接
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:336)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 根据业务需求选择合适的flink state

2020-12-28 文章 张锴
感谢你,稍后我会按这种思路试试

赵一旦  于2020年12月28日周一 下午7:12写道:

> 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
>
> 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
>
> session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
>
>
> 张锴  于2020年12月28日周一 下午5:35写道:
>
> > 能描述一下用session window的考虑吗
> >
> > Akisaya  于2020年12月28日周一 下午5:00写道:
> >
> > > 这个可以用 session window 吧
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> > >
> > > news_...@163.com  于2020年12月28日周一 下午2:15写道:
> > >
> > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> > > >
> > > >
> > > >
> > > > news_...@163.com
> > > >
> > > > 发件人: 张锴
> > > > 发送时间: 2020-12-28 13:35
> > > > 收件人: user-zh
> > > > 主题: 根据业务需求选择合适的flink state
> > > > 各位大佬帮我分析下如下需求应该怎么写
> > > >
> > > > 需求说明:
> > > >
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> > > >
> > > >
> > >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> > > >
> > > > 我的想法:
> > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> > > >
> > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> > > >
> > > > flink 版本1.10.1
> > > >
> > >
> >
>


kafka producer ?????????????????????????????

2020-12-28 文章 ??????????
kafka source flink.partition-discovery.interval-millis 
??kafka sink 
partitionmap


int[] partitions = (int[])this.topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, transaction.producer);
this.topicPartitionsMap.put(targetTopic, partitions);
}

Re: flink 1.12.0 kubernetes-session部署问题

2020-12-28 文章 Yang Wang
ConfigMap不需要提前创建,那个Warning信息可以忽略,是正常的,主要原因是先创建的deployment,再创建的ConfigMap
你可以参考社区的文档[1]把Jm的log打到console看一下

我怀疑是你没有创建service account导致的[2]

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#log-files
[2].
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#rbac

Best,
Yang

陈帅  于2020年12月28日周一 下午5:54写道:

> 今天改用官方最新发布的flink镜像版本1.11.3也启不起来
> 这是我的命令
> ./bin/kubernetes-session.sh \
>   -Dkubernetes.cluster-id=rtdp \
>   -Dtaskmanager.memory.process.size=4096m \
>   -Dkubernetes.taskmanager.cpu=2 \
>   -Dtaskmanager.numberOfTaskSlots=4 \
>   -Dresourcemanager.taskmanager-timeout=360 \
>   -Dkubernetes.container.image=flink:1.11.3-scala_2.12-java8 \
>   -Dkubernetes.namespace=rtdp
>
>
>
> Events:
>
>   Type Reason  AgeFrom   Message
>
>    --        ---
>
>   Normal   Scheduled   88sdefault-scheduler
> Successfully assigned rtdp/rtdp-6d7794d65d-g6mb5 to
> cn-shanghai.192.168.16.130
>
>   Warning  FailedMount 88skubelet
> MountVolume.SetUp failed for volume "flink-config-volume" : configmap
> "flink-config-rtdp" not found
>
>   Warning  FailedMount 88skubelet
> MountVolume.SetUp failed for volume "hadoop-config-volume" : configmap
> "hadoop-config-rtdp" not found
>
>   Normal   AllocIPSucceed  87sterway-daemon  Alloc IP
> 192.168.32.25/22 for Pod
>
>   Normal   Pulling 87skubeletPulling
> image "flink:1.11.3-scala_2.12-java8"
>
>   Normal   Pulled  31skubelet
> Successfully pulled image "flink:1.11.3-scala_2.12-java8"
>
>   Normal   Created 18s (x2 over 26s)  kubeletCreated
> container flink-job-manager
>
>   Normal   Started 18s (x2 over 26s)  kubeletStarted
> container flink-job-manager
>
>   Normal   Pulled  18skubeletContainer
> image "flink:1.11.3-scala_2.12-java8" already present on machine
>
>   Warning  BackOff 10skubeletBack-off
> restarting failed container
>
>
>
>
>
>
>
> 这里面有两个ConfigMap没有找到,是需要提前创建吗?官方文档没有说明?还是我看漏了?
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#start-flink-session
>
>
>
>
>
>
>
>
>
> 在 2020-12-27 22:50:32,"陈帅"  写道:
>
> >本人第一次尝试在k8s上部署flink,版本用的是1.12.0,jdk是1.8.0_275,scala是2.12.12,在我的mac机器上安装有minikube单机环境,以下是实验步骤:
> >
> >
> >git clone
> https://github.com/apache/flink-dockercdflink-docker/1.12/scala_2.12-java8-debian
> >docker build --tag flink:1.12.0-scala_2.12-java8 .
> >
> >
> >cd flink-1.12.0
> >./bin/kubernetes-session.sh \
> -Dkubernetes.container.image=flink:1.12.0-scala_2.12-java8 \
> -Dkubernetes.rest-service.exposed.type=NodePort \
> -Dtaskmanager.numberOfTaskSlots=2 \
> -Dkubernetes.cluster-id=flink-session-cluster
> >
> >
> >显示JM启起来了,但无法通过web访问
> >
> >2020-12-27 22:08:12,387 INFO
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create
> flink session cluster session001 successfully, JobManager Web Interface:
> http://192.168.99.100:8081
> >
> >
> >
> >
> >通过 `kubectl get pods` 命令查看到pod一直处理ContainerCreating状态
> >
> >NAME   READY   STATUS
>   RESTARTS   AGE
> >
> >flink-session-cluster-858bd55dff-bzjk2 0/1
>  ContainerCreating   0  5m59s
> >
> >kubernetes-dashboard-1608509744-6bc8455756-mp47w   1/1 Running
>  0  6d14h
> >
> >
> >
> >
> >于是通过 `kubectl describe pod
> flink-session-cluster-858bd55dff-bzjk2`命令查看详细,结果如下:
> >
> >
> >
> >
> >Name: flink-session-cluster-858bd55dff-bzjk2
> >
> >Namespace:default
> >
> >Priority: 0
> >
> >Node: minikube/192.168.99.100
> >
> >Start Time:   Sun, 27 Dec 2020 22:21:56 +0800
> >
> >Labels:   app=flink-session-cluster
> >
> >  component=jobmanager
> >
> >  pod-template-hash=858bd55dff
> >
> >  type=flink-native-kubernetes
> >
> >Annotations:  
> >
> >Status:   Pending
> >
> >IP:   172.17.0.4
> >
> >IPs:
> >
> >  IP:   172.17.0.4
> >
> >Controlled By:  ReplicaSet/flink-session-cluster-858bd55dff
> >
> >Containers:
> >
> >  flink-job-manager:
> >
> >Container ID:
> >
> >Image: flink:1.12.0-scala_2.12-java8
> >
> >Image ID:
> >
> >Ports: 8081/TCP, 6123/TCP, 6124/TCP
> >
> >Host Ports:0/TCP, 0/TCP, 0/TCP
> >
> >Command:
> >
> >  /docker-entrypoint.sh
> >
> >Args:
> >
> >  native-k8s
> >
> >  $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824
> -Xms1073741824 -XX:MaxMetaspaceSize=268435456
> -Dlog.file=/opt/flink/log/jobmanager.log
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> 

Re: flink1.12????OSError: Expected IPC message of type schema but got record batch

2020-12-28 文章 ??????
??udfMySQL
import pymysql
import pandas as pd
def pole_record_perid1(poleId: int, current_time: int, days: int):
  days_delta_ts = current_time - days * 24 * 60 * 60 * 1000


  days_mh_delta = '''select rt,ts from test where poleId={} 
and
   
 ts between {} and {};'''.format( poleId, days_delta_ts,
   
   
   
  current_time)
# mysql 
 
  mysql = pymysql.connect('192.1.1.1', 'test_database', port=3306, 
charset='utf8')
  delta = mysql.excute(days_mh_delta)
# mysql

  info_df = pd.DataFrame(delta, columns=['rt', 'ts'])
  if info_df.empty:
return ["0.0", timeconvert(current_time), "-1", 
"-1"]
  else:
max_rt_info = info_df.query('rt==rt.max()')
min_rt_info = info_df.query('rt==rt.min()')
max_rt_info = 
max_rt_info['ts'].apply(timeconvert).tolist()
min_rt_info = 
min_rt_info['ts'].apply(timeconvert).tolist()
   
max_rt_ts = ",".join(max_rt_info)
min_rt_ts = ",".join(min_rt_info)
return [str(info_df['rt'].max() - 
info_df['rt'].min()), timeconvert(current_time), \
max_rt_ts, min_rt_ts]


@udf(result_type=DataTypes.ARRAY(DataTypes.STRING()), func_type="pandas")
def pole_record_perid(poleId, current_time, days):
  df = pd.DataFrame({'poleId': poleId, 'current_time': 
current_time, 'days': days})
  df['res'] = df.apply(lambda x: pole_record_perid1(x.poleId, 
x.current_time, x.days), axis=1)
  return df['res']

Flink 1.11.2客户端 select * from table

2020-12-28 文章 Jacob
Dear All,


 


在Flink SQL客户端中使用select * from table 语句查询表数据,结果只显示表头,没有数据,是什么原因。

 

在sql-client-defaults.yaml中已经指定catalog



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink 1.12 Cancel Job??????????(??)

2020-12-28 文章 ??????
flink 
sqlCancelCancelIDEA??CheckpointsCanceljobTaskManager??

Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

2020-12-28 文章 Xingbo Huang
Hi,

我试了一下这个版本的pyarrow,跑了一下现有的test并没有能复现这个问题。你方便提供一下你的pandas udf的内容吗?

Best,
Xingbo

小学生 <201782...@qq.com> 于2020年12月28日周一 下午3:07写道:

> 您好,我使用的pyarrow是0.17.1。


Re: 根据业务需求选择合适的flink state

2020-12-28 文章 赵一旦
按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。

或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。

session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。


张锴  于2020年12月28日周一 下午5:35写道:

> 能描述一下用session window的考虑吗
>
> Akisaya  于2020年12月28日周一 下午5:00写道:
>
> > 这个可以用 session window 吧
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> >
> > news_...@163.com  于2020年12月28日周一 下午2:15写道:
> >
> > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> > >
> > >
> > >
> > > news_...@163.com
> > >
> > > 发件人: 张锴
> > > 发送时间: 2020-12-28 13:35
> > > 收件人: user-zh
> > > 主题: 根据业务需求选择合适的flink state
> > > 各位大佬帮我分析下如下需求应该怎么写
> > >
> > > 需求说明:
> > >
> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> > >
> > >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> > >
> > > 我的想法:
> > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> > >
> > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> > >
> > > flink 版本1.10.1
> > >
> >
>


Re: Flink RocksDBStateBackend 可以设置阿里云 OSS 存储吗?

2020-12-28 文章 Yun Tang
Hi 王磊,

当然是可以的,state backend的checkpoint地址其实依赖于Flink的file 
system实现,只要参照文档[1]的描述,对oss进行相关配置即可。

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/oss.html#shaded-hadoop-oss-file-system


祝好
唐云

From: Lei Wang 
Sent: Monday, December 28, 2020 16:21
To: user-zh@flink.apache.org 
Subject: Flink RocksDBStateBackend 可以设置阿里云 OSS 存储吗?

如题, 可以直接这样写吗?

env.setStateBackend(new RocksDBStateBackend(“oss://”, true));

谢谢,
王磊


Re:flink 1.12.0 kubernetes-session部署问题

2020-12-28 文章 陈帅
今天改用官方最新发布的flink镜像版本1.11.3也启不起来
这是我的命令
./bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=rtdp \
  -Dtaskmanager.memory.process.size=4096m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=4 \
  -Dresourcemanager.taskmanager-timeout=360 \
  -Dkubernetes.container.image=flink:1.11.3-scala_2.12-java8 \
  -Dkubernetes.namespace=rtdp



Events:

  Type Reason  AgeFrom   Message

   --        ---

  Normal   Scheduled   88sdefault-scheduler  Successfully 
assigned rtdp/rtdp-6d7794d65d-g6mb5 to cn-shanghai.192.168.16.130

  Warning  FailedMount 88skubelet
MountVolume.SetUp failed for volume "flink-config-volume" : configmap 
"flink-config-rtdp" not found

  Warning  FailedMount 88skubelet
MountVolume.SetUp failed for volume "hadoop-config-volume" : configmap 
"hadoop-config-rtdp" not found

  Normal   AllocIPSucceed  87sterway-daemon  Alloc IP 
192.168.32.25/22 for Pod

  Normal   Pulling 87skubeletPulling image 
"flink:1.11.3-scala_2.12-java8"

  Normal   Pulled  31skubeletSuccessfully 
pulled image "flink:1.11.3-scala_2.12-java8"

  Normal   Created 18s (x2 over 26s)  kubeletCreated 
container flink-job-manager

  Normal   Started 18s (x2 over 26s)  kubeletStarted 
container flink-job-manager

  Normal   Pulled  18skubeletContainer 
image "flink:1.11.3-scala_2.12-java8" already present on machine

  Warning  BackOff 10skubeletBack-off 
restarting failed container







这里面有两个ConfigMap没有找到,是需要提前创建吗?官方文档没有说明?还是我看漏了?
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#start-flink-session









在 2020-12-27 22:50:32,"陈帅"  写道:
>本人第一次尝试在k8s上部署flink,版本用的是1.12.0,jdk是1.8.0_275,scala是2.12.12,在我的mac机器上安装有minikube单机环境,以下是实验步骤:
>
>
>git clone 
>https://github.com/apache/flink-dockercdflink-docker/1.12/scala_2.12-java8-debian
>docker build --tag flink:1.12.0-scala_2.12-java8 .
>
>
>cd flink-1.12.0
>./bin/kubernetes-session.sh \ 
>-Dkubernetes.container.image=flink:1.12.0-scala_2.12-java8 \ 
>-Dkubernetes.rest-service.exposed.type=NodePort \ 
>-Dtaskmanager.numberOfTaskSlots=2 \ 
>-Dkubernetes.cluster-id=flink-session-cluster
>
>
>显示JM启起来了,但无法通过web访问
>
>2020-12-27 22:08:12,387 INFO  
>org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create flink 
>session cluster session001 successfully, JobManager Web Interface: 
>http://192.168.99.100:8081
>
>
>
>
>通过 `kubectl get pods` 命令查看到pod一直处理ContainerCreating状态
>
>NAME   READY   STATUS  
>RESTARTS   AGE
>
>flink-session-cluster-858bd55dff-bzjk2 0/1 ContainerCreating   
>0  5m59s
>
>kubernetes-dashboard-1608509744-6bc8455756-mp47w   1/1 Running 
>0  6d14h
>
>
>
>
>于是通过 `kubectl describe pod flink-session-cluster-858bd55dff-bzjk2`命令查看详细,结果如下:
>
>
>
>
>Name: flink-session-cluster-858bd55dff-bzjk2
>
>Namespace:default
>
>Priority: 0
>
>Node: minikube/192.168.99.100
>
>Start Time:   Sun, 27 Dec 2020 22:21:56 +0800
>
>Labels:   app=flink-session-cluster
>
>  component=jobmanager
>
>  pod-template-hash=858bd55dff
>
>  type=flink-native-kubernetes
>
>Annotations:  
>
>Status:   Pending
>
>IP:   172.17.0.4
>
>IPs:
>
>  IP:   172.17.0.4
>
>Controlled By:  ReplicaSet/flink-session-cluster-858bd55dff
>
>Containers:
>
>  flink-job-manager:
>
>Container ID:  
>
>Image: flink:1.12.0-scala_2.12-java8
>
>Image ID:  
>
>Ports: 8081/TCP, 6123/TCP, 6124/TCP
>
>Host Ports:0/TCP, 0/TCP, 0/TCP
>
>Command:
>
>  /docker-entrypoint.sh
>
>Args:
>
>  native-k8s
>
>  $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 
> -Xms1073741824 -XX:MaxMetaspaceSize=268435456 
> -Dlog.file=/opt/flink/log/jobmanager.log 
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml 
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties 
> -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties 
> org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint -D 
> jobmanager.memory.off-heap.size=134217728b -D 
> jobmanager.memory.jvm-overhead.min=201326592b -D 
> jobmanager.memory.jvm-metaspace.size=268435456b -D 
> jobmanager.memory.heap.size=1073741824b -D 
> jobmanager.memory.jvm-overhead.max=201326592b
>
>State:  Waiting
>
>  Reason:   ImagePullBackOff
>
>Ready:  False
>
>Restart Count:  0
>
>Limits:
>
>  cpu: 1
>
>  memory:  1600Mi
>
>Requests:
>
>  cpu: 1
>
>  

Re: 根据业务需求选择合适的flink state

2020-12-28 文章 张锴
能描述一下用session window的考虑吗

Akisaya  于2020年12月28日周一 下午5:00写道:

> 这个可以用 session window 吧
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
>
> news_...@163.com  于2020年12月28日周一 下午2:15写道:
>
> > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> >
> >
> >
> > news_...@163.com
> >
> > 发件人: 张锴
> > 发送时间: 2020-12-28 13:35
> > 收件人: user-zh
> > 主题: 根据业务需求选择合适的flink state
> > 各位大佬帮我分析下如下需求应该怎么写
> >
> > 需求说明:
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> >
> > 我的想法:
> > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> >
> > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> >
> > flink 版本1.10.1
> >
>


Re: 根据业务需求选择合适的flink state

2020-12-28 文章 张锴
能描述一下用session window的考虑吗

Akisaya  于2020年12月28日周一 下午5:00写道:

> 这个可以用 session window 吧
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
>
> news_...@163.com  于2020年12月28日周一 下午2:15写道:
>
> > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
> >
> >
> >
> > news_...@163.com
> >
> > 发件人: 张锴
> > 发送时间: 2020-12-28 13:35
> > 收件人: user-zh
> > 主题: 根据业务需求选择合适的flink state
> > 各位大佬帮我分析下如下需求应该怎么写
> >
> > 需求说明:
> > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
> >
> >
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
> >
> > 我的想法:
> > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
> >
> > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
> >
> > flink 版本1.10.1
> >
>


Some questions about limit push down

2020-12-28 文章 Jun Zhang
when I query hive table by sql, like this `select * from hivetable where id
= 1 limit 1`,   I found that the limit push down is invalid, is it a bug or
was it designed like this?

if the sql is  'select * from hivetable  limit 1'  ,it is ok

thanks


Re: 根据业务需求选择合适的flink state

2020-12-28 文章 Akisaya
这个可以用 session window 吧
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows

news_...@163.com  于2020年12月28日周一 下午2:15写道:

> 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
>
>
>
> news_...@163.com
>
> 发件人: 张锴
> 发送时间: 2020-12-28 13:35
> 收件人: user-zh
> 主题: 根据业务需求选择合适的flink state
> 各位大佬帮我分析下如下需求应该怎么写
>
> 需求说明:
> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
>
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
>
> 我的想法:
> 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
>
> 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
>
> flink 版本1.10.1
>


Flink RocksDBStateBackend 可以设置阿里云 OSS 存储吗?

2020-12-28 文章 Lei Wang
如题, 可以直接这样写吗?

env.setStateBackend(new RocksDBStateBackend(“oss://”, true));

谢谢,
王磊