回复:flink-metrics如何获取applicationid

2023-08-30 文章 allanqinjy
多谢了,明天改一下代码试试
 回复的原邮件 
| 发件人 | Feng Jin |
| 发送日期 | 2023年08月30日 19:42 |
| 收件人 | user-zh |
| 主题 | Re: flink-metrics如何获取applicationid |
hi,

可以尝试获取下 _APP_ID  这个 JVM 环境变量.
System.getenv(YarnConfigKeys.ENV_APP_ID);

https://github.com/apache/flink/blob/6c9bb3716a3a92f3b5326558c6238432c669556d/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java#L28


Best,
Feng

On Wed, Aug 30, 2023 at 7:14 PM allanqinjy  wrote:

> hi,
>请教大家一个问题,就是在上报指标到prometheus时候,jobname会随机生成一个后缀,看源码也是new Abstract
> ID(),有方法在这里获取本次上报的作业applicationid吗?


回复:flink-metrics如何获取applicationid

2023-08-30 文章 allanqinjy
多谢,明天修改一下代码试试
 回复的原邮件 
| 发件人 | Feng Jin |
| 发送日期 | 2023年08月30日 19:42 |
| 收件人 | user-zh |
| 主题 | Re: flink-metrics如何获取applicationid |
hi,

可以尝试获取下 _APP_ID  这个 JVM 环境变量.
System.getenv(YarnConfigKeys.ENV_APP_ID);

https://github.com/apache/flink/blob/6c9bb3716a3a92f3b5326558c6238432c669556d/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java#L28


Best,
Feng

On Wed, Aug 30, 2023 at 7:14 PM allanqinjy  wrote:

> hi,
>请教大家一个问题,就是在上报指标到prometheus时候,jobname会随机生成一个后缀,看源码也是new Abstract
> ID(),有方法在这里获取本次上报的作业applicationid吗?


flink-metrics如何获取applicationid

2023-08-30 文章 allanqinjy
hi,
   请教大家一个问题,就是在上报指标到prometheus时候,jobname会随机生成一个后缀,看源码也是new Abstract 
ID(),有方法在这里获取本次上报的作业applicationid吗?

ElasticsearchSink 设置es 主分片数

2022-11-29 文章 allanqinjy


hi,
flink streaming(版本1.12.5) 写es的时候ElasticsearchSink.Builder发现没有设置配置的地方,比如要想设置 
number_of_shards。哪位大佬知道,请教一下!


ElasticsearchSink.BuilderesSinkBuilder=newElasticsearchSink.Builder<>(httpHosts,newElasticsearchSinkFunction(){publicIndexRequestcreateIndexRequest(Stringelement){Mapjson=newHashMap<>();json.put("data",element);returnRequests.indexRequest().index("my-index").type("my-type").source(json);}@Overridepublicvoidprocess(Stringelement,RuntimeContextctx,RequestIndexerindexer){indexer.add(createIndexRequest(element));}})
| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制



Re:OutOfMemoryError: Direct buffer memory

2022-10-08 文章 allanqinjy
看堆栈信息是内存不够,调大一些看看。我之前在读取hdfs上的一个获取地理位置的离线库,也是内存溢出,通过调整内存大小解决的。用的streamingapi开发的作业,1.12.5版本。


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


On 10/8/2022 21:00,RS wrote:
Hi,


版本:Flink-1.15.1


有个场景,从hdfs读文件再处理数据,batch mode,10个并发,使用Flink 
SQL定义执行,source是connector=filesystem,format=raw,path=


执行任务的时候,有时候能成功,有时候失败了然后就一直失败,重启集群好像可以解决问题,这种情况如何是什么原因导致的?


集群的off-heap都是默认配置,
taskmanager.memory.task.off-heap.size=0
taskmanager.memory.framework.off-heap.size=128MB


报错堆栈:
Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct 
out-of-memory error has occurred. This can mean two things: either job(s) 
require(s) a larger size of JVM direct memory or there is a direct memory leak. 
The direct memory can be allocated by user code or some of its dependencies. In 
this case 'taskmanager.memory.task.off-heap.size' configuration option should 
be increased. Flink framework and its dependencies also consume the direct 
memory, mostly for network communication. The most of network memory is managed 
by Flink and should not result in out-of-memory error. In certain special 
cases, in particular for jobs with high parallelism, the framework may require 
more direct memory which is not managed by Flink. In this case 
'taskmanager.memory.framework.off-heap.size' configuration option should be 
increased. If the error persists then there is probably a direct memory leak in 
user code or some of its dependencies which has to be investigated and fixed. 
The task executor has to be shutdown...
at java.nio.Bits.reserveMemory(Bits.java:695)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at org.apache.hadoop.util.DirectBufferPool.getBuffer(DirectBufferPool.java:72)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.reallocPacketBuf(PacketReceiver.java:270)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:163)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
at 
org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.readNextPacket(BlockReaderRemote.java:183)
at 
org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.read(BlockReaderRemote.java:142)
at 
org.apache.hadoop.hdfs.ByteArrayStrategy.readFromBlock(ReaderStrategy.java:118)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:704)
at 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:765)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:825)
at java.io.DataInputStream.read(DataInputStream.java:149)
at 
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:96)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:742)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:586)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:505)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:50)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.loadSplit(ContinuousFileReaderOperator.java:415)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.access$300(ContinuousFileReaderOperator.java:98)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$ReaderState$2.prepareToProcessRecord(ContinuousFileReaderOperator.java:122)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:348)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:240)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:750)


Thanks

回复:flink使用hive作为维表,kafka作为数据源,join时候报错

2021-05-13 文章 allanqinjy
你看异常信息,提示时态表join的时候需要主键,但是你没有定义。而且你join的时候不需要on吗?


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年05月14日 09:32,hehuiyuan 写道:
select 
FROM  jdqTableSources AS a
JOIN tmmmp FOR SYSTEM_TIME AS OF a.proctime AS b



Caused by: org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error: Temporal Table Join requires primary key in
versioned table, but no primary key can be found. The physical plan is:
FlinkLogicalJoin(condition=[AND(=($0, $4),
__INITIAL_TEMPORAL_JOIN_CONDITION($3, __TEMPORAL_JOIN_LEFT_KEY($0),
__TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[inner])
FlinkLogicalCalc(select=[opt, src, cur, PROCTIME() AS proctime])
FlinkLogicalTableSourceScan(table=[[default_catalog, default_database,
jdqTableSources]], fields=[mid, db, sch, tab, opt, ts, ddl, err, src, cur,
cus])
FlinkLogicalSnapshot(period=[$cor0.proctime])
FlinkLogicalCalc(select=[item_sku_id, premium, cate_lev, type, borc])
FlinkLogicalTableSourceScan(table=[[myhive, dev,
dev_brokenscreen_insurance_sku_info]], fields=[item_sku_id, item_sku_name,
premium, cate_lev, type, borc, plan_code, subjection_b, product_name,
lev_low_price, lev_upp_price, jd_price, shelves_tm, item_first_cate_name,
item_second_cate_name, item_third_cate_name, sure_cate_lev, flag])



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:flink sql写hbase问题

2021-05-12 文章 allanqinjy
光看异常,应该是你插入了空值吧,你插入hbase的时候做个filter过滤吧,比如你的rowkey空了,你往hbase插入应该是不行的。你可以试试。


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年05月12日 19:23,酷酷的浑蛋 写道:
Mismatch of function's argument data type 'STRING NOT NULL' and actual argument 
type 'STRING'.sql有些长,大概就是在执行  insert hbase sql时 报了上面的错误,请问这种错误是什么原因?

回复:flink job task在taskmanager上分布不均衡

2021-05-07 文章 allanqinjy
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
flink的配置中是有flink taskmanager配置的,一个tm对应几个slots 
。taskmanager.numberOfTaskSlots默认是1.并行度是对应了slots数据,一般我们的slots与并行度最大的一样。你可以看一下这个参数设置。然后对照官网说明。


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年05月7日 16:42,wenyuan138 写道:
flink集群(flink 1.10.1),taskmanager有4个,每个有10个slot。 然后我有2个job,
每个并行度是4,预期是会分布到不同taskmanager的slot上(也就是4个taskmanager平均分配2个slot,
这样能更好的利用cpu资源). 结果发现这2个job的8个task全部分配到同一个taskmanager上了。 为什么? 有什么配置可以改变这种行为吗?
我们想要的是task能分到不同的taskmanager上。 谢谢!



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复:Flink Window算子在WebUI中Bytes/Records Sent为0

2021-03-29 文章 allanqinjy
你的图看不到,我猜到的是你window之后是一个算子链,最后是sink了,所以bytes 
sent是0,sink后应该就不属于flink的管辖范围了,所以sent是0。你可以设置disableOperatorChaining()一下,然后你在window完后,再map算子一下,就能看到window后面有没有sent了。


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年03月30日 09:37,王 浩成 写道:

你好,我有下图这样的一个数据流图,以csv文件作为数据源生成数据流,数据流的时间跨度约15分钟,时间窗口设定为1分钟。

使用单节点提交Flink Job之后,在Web UI界面发现从Window算子开始,后面的Bytes 
Sent全部为0B了,可是实际上window算子及其后面的算子都进行了相应的操作,并且生成了对应的数据结果。

请问这里是为什么出现了这个问题,如何让它能像我想象的显示发送的数据量?谢谢!

回复: Flink savepoint迁移问题

2021-03-11 文章 allanqinjy
建云,
之前我也遇到了savepoint 起作业失败的问题,是我们升级pulsar客户端以后,从2.2升级到2.5.2,我-s 
启动作业的时候。因为作业也不是很重要,当时手头有其他任务,我就没有关注这个问题。你看看pulsar source那儿是不是做了什么。


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年03月11日 22:43,Kezhu Wang 写道:
有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。

确实是这样的,checkpoint 把 serializer 也 snapshot 了。

重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar
的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用
`MessageId.toByteArray`。


On March 11, 2021 at 20:26:15, 赵 建云 (zhaojianyu...@outlook.com) wrote:

你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的
initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang  写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
})));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator
的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)

... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)

at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more
Caused by: java.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D=0>
<http://java.io
<https://apac01.safelinks.prot

回复:消息队列量级特别如何优化消费

2021-03-05 文章 allanqinjy
感谢各位的回答


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年03月5日 19:23,smq<374060...@qq.com> 写道:
被压严重一般是sink效率低或者计算过程有瓶颈,被压跟数据源数据多少没什么关系。你可以在web界面查看哪个算子导致的,然后优化就可以了



发自我的iPhone


-- 原始邮件 --
发件人: 刘建刚 

回复:状态恢复参数顺序 -s

2021-03-04 文章 allanqinjy
你放在jar包后就当作jar的参数了 ,你可以试试这样在你的main中获取参数 s 就是你的path。


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年03月5日 14:57,dushang<1823103...@qq.com> 写道:
../bin/flink run -s path  -c class test.jar 这里面的-s 必须在最前面么,我换成  ../bin/flink
run   -c class test.jar -s path 不生效。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:消息积压如何优化

2021-03-04 文章 allanqinjy
你好,

消费速度跟不上写入速度,通过调试并发可以提高消费,但是不能一直这样靠着修改并发度来做优化。你说的看哪个算子问题,能具体点吗?怎么就算有问题了?一旦被压了,基本你看算子,基本的被压肯定都是正常的只有source那里才是high,被压会一级一级的反到source。你看算子的metrics也能只能看进入的量,出去的量,sink的出去的量是第三方flink
 ui上应该也看不到,这样如何排查是哪个算子的具体问题?你们是有什么好的方法吗?


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年03月5日 14:22,Michael Ran 写道:
看看哪个算子问题,增加那个算子并发。 或者优化那个算子执行。 先找找原因
在 2021-03-05 11:05:14,"allanqinjy"  写道:


hi,
由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!
| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制



消息积压如何优化

2021-03-04 文章 allanqinjy


hi,
由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!
| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制



消息队列量级特别如何优化消费

2021-03-04 文章 allanqinjy


hi,
  由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制



Re:时间窗口和reduce算子问题

2020-09-12 文章 allanqinjy
你好,
   
reduce在没有开窗口的时候,是一条一条来处理的。因为keyby以后是根据key分组以后的,不开窗口是无限流的形式走的。当开了window窗口以后,你可以理解为一个batch,然后对这一块数据进行了keyby后就会有一条数据了,如果你reduce里面再有个规则,比如按照time进行大小比较,只要最近的那一条重复的,那么最后就是那一条最新的数据了。这个自己也可以做个demo。如下是自己本地的测试,你也可以体验一下。希望可以帮助到你,能力有限,哪儿说的不对请见谅!!
public class ReduceTest {

private static final Logger logger = LoggerFactory.getLogger(ReduceTest.class);

public static void main(String[] args) {

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

DataStream source = 
env.readTextFile("/Users/allanqin/myprojects/spend-report/demo/src/main/resources/reduce.txt");

source
.keyBy(new KeySelector() {
@Override
public String getKey(String s) throws Exception {
ObjectMapper mapper = new ObjectMapper();
ReduceEntity reduceEntity = mapper.readValue(s, ReduceEntity.class);
return reduceEntity.getName();
}
}, TypeInformation.of(new TypeHint() {
}))

.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction() {

ReduceEntity entity = new ReduceEntity();

@Override
public String reduce(String v1, String v2) throws Exception {

//ObjectMapper mapper = new ObjectMapper();
//ReduceEntity reduceEntity = mapper.readValue(v1, 
ReduceEntity.class);
//ReduceEntity reduceEntity2 = mapper.readValue(v2, 
ReduceEntity.class);
//if (reduceEntity.getAge() > reduceEntity2.getAge()) {
//return v1;
//}

return v2;

}
})

.print();

env.execute("test");

}
}
txt文件内容:
{ "name" : "allanqinjy", "age" : 4 }
{ "name" : "allanqinjy", "age" : 45 }
{ "name" : "allanqinjy", "age" : 6 }
{ "name" : "allanqinjy", "age" : 9 }










在 2020-09-12 17:57:01,"ゞ野蠻遊戲χ"  写道:

大家好



   
在window算子之后使用reduce算子,是否是把当前窗口的所有元素根据reduce算子计算完成之后,仅仅输出一条到下游,还是当前窗口前后2个元素每次进入reduce算子,计算完成之后就往下游输出一条?







Thanks
嘉治

Re:回复:流groupby

2020-06-08 文章 allanqinjy
hi,
   也就是指定 update-model retract就可以了是吧?好的多谢,我试试!

















在 2020-06-09 12:13:10,"1048262223" <1048262...@qq.com> 写道:
>Hi
>
>
>可以不开窗口只不过结果是retract流而不是append流
>
>
>Best,
>Yichao Yang
>
>
>
>
>
>发自我的iPhone
>
>
>-- 原始邮件 --
>发件人: allanqinjy 发送时间: 2020年6月9日 12:11
>收件人: user-zh 主题: 回复:流groupby


流groupby

2020-06-08 文章 allanqinjy
hi,
   请教个问题,流sql如果要group by 只能配合窗口吗?如果我不开窗口,根据一个字段比如日期group by 不可以吗?
 AppendStreamTableSink requires that Table has only insert changes.

Re:回复:数组越界

2020-05-18 文章 allanqinjy
我觉得要是从1开始,那么编译的时候就应该报异常了,而不是提交作业运行报。




Caused by: java.lang.ArrayIndexOutOfBoundsException: 22369621
18-05-2020 16:27:14 CST INFO -  at 
org.apache.flink.table.runtime.util.SegmentsUtil.getByteMultiSegments(SegmentsUtil.java:598)
18-05-2020 16:27:14 CST INFO -  at 
org.apache.flink.table.runtime.util.SegmentsUtil.getByte(SegmentsUtil.java:590)
18-05-2020 16:27:14 CST INFO -  at 
org.apache.flink.table.runtime.util.SegmentsUtil.bitGet(SegmentsUtil.java:534)
18-05-2020 16:27:14 CST INFO -  at 
org.apache.flink.table.dataformat.BinaryArray.isNullAt(BinaryArray.java:117)
18-05-2020 16:27:14 CST INFO -  at BatchCalc$822.processElement(Unknown Source)
18-05-2020 16:27:14 CST INFO -  at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
18-05-2020 16:27:14 CST INFO -  at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
18-05-2020 16:27:14 CST INFO -  at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
18-05-2020 16:27:14 CST INFO -  at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:748)
18-05-2020 16:27:14 CST INFO -  at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:734)
18-05-2020 16:27:14 CST INFO -  at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
18-05-2020 16:27:14 CST INFO -  at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
18-05-2020 16:27:14 CST INFO -  at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
18-05-2020 16:27:14 CST INFO -  at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
18-05-2020 16:27:14 CST INFO -  at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
18-05-2020 16:27:14 CST INFO -  at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)








在 2020-05-18 16:38:16,"1048262223" <1048262...@qq.com> 写道:
>图看不到
>flink内置udf和hive udf不同,有些udf下标是从1开始的
>
>
>
>
>
>
>
>
>各位好,
>  
>flink1.10,在跑flink批量sql的适合语法通过没问题,在运行脚步的适合报错如下,hive脚步跑没有问题,不知道为什么flink 
>跑会报数组越界,这个是什么问题?
>  
>
>
>
>
>


数组越界

2020-05-18 文章 allanqinjy
各位好,
 flink1.10,在跑flink批量sql的适合语法通过没问题,在运行脚步的适合报错如下,hive脚步跑没有问题,不知道为什么flink 
跑会报数组越界,这个是什么问题?
 

dayofweek异常

2020-04-10 文章 allanqinjy
hi,
   在flink中使用hql函数的时候 dayofweek  报错,编译都没有通过。我的使用方式 
用-MM-dd也是一样的错误。哪位大神遇到过!
DAYOFWEEK(to_date(from_unixtime (unix_timestamp(cast(dt as 
string),'MMdd'),'-MM-dd HH:mm:ss')))


,Cannot apply 'EXTRACT' to arguments of type 'EXTRACT( FROM 
)'. Supported form(s): 'EXTRACT( FROM 
)'
10-04-2020 16:26:14 CST user_visit_terminal_prefer_7d INFO - 
'EXTRACT( FROM )'
10-04-2020 16:26:14 CST user_visit_terminal_prefer_7d INFO -at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
10-04-2020 16:26:14 CST user_visit_terminal_prefer_7d INFO -at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)

hdfs 坏文件导致hive无法读取

2020-02-29 文章 allanqinjy
各位好,
   请教个问题,就是在往hdfs写数据的时候,会经常遇到坏文件导致hive读取的时候报异常。写hdfs 代码如下,之后的是hive 
读取时候由于坏文件导致没法select 报的异常,把坏文件删了就可以了。请问如何解决避免生成坏文件,这种生成坏文件有没有哪位遇到过并且有效的解决了。


BucketingSink> HDFS_SINK = new BucketingSink<>(path);
HDFS_SINK.setBucketer(new DateTimeBucketer(format));
HDFS_SINK.setPendingPrefix("flink_");
HDFS_SINK.setInProgressPrefix("flink_");
HDFS_SINK.setPartPrefix("pulsar_part");
HDFS_SINK.setInactiveBucketThreshold(bucketThreshold);
HDFS_SINK.setWriter(new SequenceFileWriter("SnappyCodec", 
SequenceFile.CompressionType.BLOCK));




  2020-02-29 18:31:30,747 WARN [main] org.apache.hadoop.mapred.YarnChild: 
Exception running child : java.io.IOException: java.io.IOException: 
java.io.EOFException
at 
org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHandlerChain.java:121)
at 
org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderNextException(HiveIOExceptionHandlerUtil.java:77)
at 
org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.doNextWithExceptionHandler(HadoopShimsSecure.java:227)
at 
org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.next(HadoopShimsSecure.java:137)
at 
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:199)
at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:185)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:52)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:459)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.io.IOException: java.io.EOFException
at 
org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHandlerChain.java:121)
at 
org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderNextException(HiveIOExceptionHandlerUtil.java:77)
at 
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:365)
at 
org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:116)
at 
org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:43)
at 
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:116)
at 
org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.doNextWithExceptionHandler(HadoopShimsSecure.java:225)
... 11 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:70)
at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:120)
at org.apache.hadoop.io.SequenceFile$Reader.readBuffer(SequenceFile.java:2158)
at 
org.apache.hadoop.io.SequenceFile$Reader.seekToCurrentValue(SequenceFile.java:2224)
at 
org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:2299)
at 
org.apache.hadoop.mapred.SequenceFileRecordReader.getCurrentValue(SequenceFileRecordReader.java:109)
at 
org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:84)
at 
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:360)
... 15 more