submit new job is not working

2021-08-19 Thread Dhiru
hello all ,
       I was able to run sample example and was able to upload jar using UI, 
cluster which I have deployed on k8s 
Today I had to reboot jobmanager after that I am not able to upload any jar to 
my cluster. Do not see any log as well to debug 
any help 


--kumar 

Re: Error while deserializing the element

2021-08-19 Thread JING ZHANG
Hi Vijay, Yun,
I've created a JIRA https://issues.apache.org/jira/browse/FLINK-23886 to
track this.

Best,
JING ZHANG

JING ZHANG  于2021年8月20日周五 下午1:19写道:

> Hi Vijay,
> I have encountered the same problem several times in online production
> Flink jobs, but I have not found the root cause of the exception yet.
> We have walk around the exception by adding the following parameter, hope
> it could help you.
> state.backend.rocksdb.timer-service.factory: HEAP
>
> I would invite Yun Tang who is an expert on the topic to look into the
> problem, we could also create a JIRA to track the issue.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/#timers-heap-vs-rocksdb
>
> Best,
> JING ZHANG
>
> vijayakumar palaniappan  于2021年8月19日周四 上午8:02写道:
>
>> Setup Specifics:
>> Version: 1.6.2
>> RocksDB Map State
>> Timers stored in rocksdb
>>
>> When we have this job running for long periods of time like > 30 days, if
>> for some reason the job restarts, we encounter "Error while
>> deserializing the element". Is this a known issue fixed in later
>> versions? I see some changes to code for FLINK-10175, but we don't use
>> any queryable state
>>
>> Below is the stack trace
>>
>> org.apache.flink.util.FlinkRuntimeException: Error while deserializing
>> the element.
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
>>
>> at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
>>
>> at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
>>
>> at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
>>
>> at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
>>
>> at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
>>
>> at
>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
>>
>> at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
>>
>> at
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
>>
>> at
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
>>
>> at
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
>>
>> at
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.io.EOFException
>>
>> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
>>
>> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
>>
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>>
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>>
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)
>>
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)
>>
>> at
>> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)
>>
>> at
>> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387)
>>
>> ... 20 more
>>
>> --
>> Thanks,
>> -Vijay
>>
>


Re: Error while deserializing the element

2021-08-19 Thread JING ZHANG
Hi Vijay,
I have encountered the same problem several times in online production
Flink jobs, but I have not found the root cause of the exception yet.
We have walk around the exception by adding the following parameter, hope
it could help you.
state.backend.rocksdb.timer-service.factory: HEAP

I would invite Yun Tang who is an expert on the topic to look into the
problem, we could also create a JIRA to track the issue.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/#timers-heap-vs-rocksdb

Best,
JING ZHANG

vijayakumar palaniappan  于2021年8月19日周四 上午8:02写道:

> Setup Specifics:
> Version: 1.6.2
> RocksDB Map State
> Timers stored in rocksdb
>
> When we have this job running for long periods of time like > 30 days, if
> for some reason the job restarts, we encounter "Error while deserializing
> the element". Is this a known issue fixed in later versions? I see some
> changes to code for FLINK-10175, but we don't use any queryable state
>
> Below is the stack trace
>
> org.apache.flink.util.FlinkRuntimeException: Error while deserializing the
> element.
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
>
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
>
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
>
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
>
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
>
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
>
> at
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
>
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
>
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
>
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
>
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
>
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.io.EOFException
>
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
>
> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
>
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)
>
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)
>
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)
>
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387)
>
> ... 20 more
>
> --
> Thanks,
> -Vijay
>


Re: DataStream to Table API

2021-08-19 Thread Caizhi Weng
Hi!

I've created a JIRA ticket[1] for this issue. Please check it out and track
the progress there.

[1] https://issues.apache.org/jira/browse/FLINK-23885

Caizhi Weng  于2021年8月20日周五 上午10:47写道:

> Hi!
>
> This is because TypeExtractor#getMapReturnTypes are not dealing with row
> types (see that method and also TypeExtractor#privateGetForClass). You
> might want to open a JIRA ticket for this.
>
> Matthias Broecheler  于2021年8月20日周五 上午7:01写道:
>
>> Hey Flinkers,
>>
>> I am trying to follow the docs
>> 
>>  to
>> convert a DataStream to a Table. Specifically, I have a DataStream of Row
>> and want the columns of the row to become the columns of the resulting
>> table.
>>
>> That works but only if I construct the Rows statically. If I construct
>> them dynamically (in a map) then Flink turns the entire Row into one column
>> of type "RAW('org.apache.flink.types.Row', '...')".
>>
>> Does anybody know why this is the case or how to fix it? Take a look at
>> the simple Flink program below where I construct the DataStream "rows" in
>> two different ways. I would expect those to be identical (and the sink does
>> print identical information) but the inferred table schema is different.
>>
>> Thanks a ton,
>> Matthias
>>
>> --
>>
>> StreamExecutionEnvironment flinkEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>
>> DataStream integers = flinkEnv.fromElements(12, 5);
>>
>> DataStream rows = integers.map(i -> Row.of("Name"+i, i));
>>
>> //  This alternative way of constructing this data stream produces the 
>> expected table schema
>> //  DataStream rows = flinkEnv.fromElements(Row.of("Name12", 12), 
>> Row.of("Name5", 5));
>>
>> StreamTableEnvironment tableEnv = 
>> StreamTableEnvironment.create(flinkEnv);
>> Table table = tableEnv.fromDataStream(rows);
>> table.printSchema();
>>
>> rows.addSink(new PrintSinkFunction<>());
>>
>> flinkEnv.execute();
>>
>>


Re: Periodic output at end of stream

2021-08-19 Thread JING ZHANG
Hi Matthias,
Thanks for providing the example, I would reply back soon after I do some
debug.

Best,
JING ZHANG

Matthias Broecheler  于2021年8月19日周四 上午1:53写道:

> Hey JING,
>
> thanks for getting back to me. I tried to produce the smallest,
> self-contained example that produces the phenomenon:
> https://gist.github.com/mbroecheler/fd27dd8a810b038ec463cdd5339d290f
>
> If you run MainRepl you should see an infinite loop of re-processing the 5
> integers. The offending process is BufferedLatestSelector - specifically
> the event timer that is registered in it. Without the timer the process
> will not emit an output.
>
> The timer is set whenever the state is null. Is there a problem with how I
> implemented that buffering process?
> Thank you,
> Matthias
>
> On Sun, Aug 15, 2021 at 8:59 PM JING ZHANG  wrote:
>
>> Hi Matthias,
>> How often do you register the event-time timer?
>> It is registered per input record, or re-registered a new timer after an
>> event-time timer is triggered?
>> Would you please provide your test case code, it would be very helpful
>> for troubleshooting.
>>
>> Best wishes,
>> JING ZHANG
>>
>> Matthias Broecheler  于2021年8月14日周六 上午3:44写道:
>>
>>> Hey guys,
>>>
>>> I have a KeyedProcessFunction that gathers statistics on the events that
>>> flow through and emits it periodically (every few seconds) to a SideOutput.
>>> However, at the end of stream the last set of statistics don't get
>>> emitted. I read on the mailing list that processing time timers that are
>>> pending don't get triggered when Flink cleans up a stream, but that event
>>> timers do get triggered because a watermark with Long.MAX_VALUE is sent
>>> through the stream.
>>> Hence, I thought that I could register a "backup" event timer for
>>> Long.MAX_VALUE-1 to make sure that my process function gets notified when
>>> the stream ends to emit the in-flight statistics.
>>>
>>> However, now my simple test case (with a data source fromCollection of 4
>>> elements) keeps iterating over the same 4 elements in an infinite loop.
>>>
>>> I don't know how to make sense of this and would appreciate your help.
>>> Is there a better way to set a timer that gets triggered at the end of
>>> stream?
>>> And for my education: Why does registering an event timer cause an
>>> infinite loop over the source elements?
>>>
>>> Thanks a lot and have a wonderful weekend,
>>> Matthias
>>>
>>


Re: Pre shuffle aggregation in flink is not working

2021-08-19 Thread JING ZHANG
Hi Suman,
Please try copy `*MapBundleOperator*`, update the `HashMap` to
`LinkedHashMap` to keep the output sequence consistent with input sequence.

Best,
JING ZHANG

suman shil  于2021年8月20日周五 上午2:23写道:

> Hi Jing,
> Thanks for looking into this. Here is the code of `TaxiFareStream'. I was
> following this link
> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
> . Please let me know if there is any other way of aggregating elements
> locally.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *public class TaxiFareStream extends MapBundleOperator TaxiFare, TaxiFare> {private KeySelector keySelector;
>   public TaxiFareStream(MapBundleFunction TaxiFare> userFunction,  BundleTrigger
> bundleTrigger,  KeySelector
> keySelector) {super(userFunction, bundleTrigger, keySelector);
>   this.keySelector = keySelector;}@Overrideprotected Long
> getKey(TaxiFare input) throws Exception {return
> keySelector.getKey(input);}}*
>
> Thanks
>
> On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG  wrote:
>
>> Hi Suman,
>> Would you please provide the code about `*TaxiFareStream*`? It seems we
>> could use `MapBundleOperator` directly here.
>> BTW, I have some concerns about using the solution to do
>> local-aggregation for window aggregation because `MapBundleOperator`
>> would save input data in a bundle which is a HashMap object which could
>> not keep the data input sequence. I'm afraid there exists
>> unorder in a bundle (in your case 10) problem. I'm not sure whether it is
>> reasonable to assign a watermark based on an unordered
>> timestamp.
>>
>> Best,
>> JING ZHANG
>>
>>
>>
>> suman shil  于2021年8月19日周四 下午12:43写道:
>>
>>> I am trying to do pre shuffle aggregation in flink. Following is the
>>> MapBundle implementation.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *public class TaxiFareMapBundleFunction extends MapBundleFunction>> TaxiFare, TaxiFare, TaxiFare> {@Overridepublic TaxiFare
>>> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
>>>   if (value == null) {return input;}value.tip =
>>> value.tip + input.tip;return value;}@Overridepublic
>>> void finishBundle(Map buffer, Collector out)
>>> throws Exception {for (Map.Entry entry :
>>> buffer.entrySet()) {out.collect(entry.getValue());}
>>> }}*
>>>
>>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation
>>> is not working as the "*count*" variable is always 0. Please let me
>>> know If I am missing something.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *@Overridepublic void onElement(T element) throws Exception {
>>> count++;if (count >= maxCount) {
>>> callback.finishBundle();reset();}}*
>>>
>>> Here is the main code.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *MapBundleFunction
>>> mapBundleFunction = new TaxiFareMapBundleFunction();
>>> BundleTrigger bundleTrigger = new CountBundleTrigger<>(10);
>>>   KeySelector taxiFareLongKeySelector = new
>>> KeySelector() {@Overridepublic Long
>>> getKey(TaxiFare value) throws Exception {return
>>> value.driverId;}};DataStream>> Float>> hourlyTips =//fares.keyBy((TaxiFare
>>> fare) -> fare.driverId)//
>>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
>>> AddTips());;fares.transform("preshuffle",
>>> TypeInformation.of(TaxiFare.class),new
>>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>>> )).assignTimestampsAndWatermarks(new
>>> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(20)) {
>>>   @Overridepublic long
>>> extractTimestamp(TaxiFare element) {return
>>> element.startTime.getEpochSecond();}
>>> }).keyBy((TaxiFare fare) ->
>>> fare.driverId)
>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>>   .process(new AddTips());DataStream>
>>> hourlyMax =
>>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>>>
>>> Thanks
>>> Suman
>>>
>>


Re: 请问同一个flink history server能够支持多个flink application cluster吗?

2021-08-19 Thread Chenyu Zheng
History Server的API也是使用jobid作为区分

  *   /config
  *   /jobs/overview
  *   /jobs/
  *   /jobs//vertices
  *   /jobs//config
  *   /jobs//exceptions
  *   /jobs//accumulators
  *   /jobs//vertices/
  *   /jobs//vertices//subtasktimes
  *   /jobs//vertices//taskmanagers
  *   /jobs//vertices//accumulators
  *   /jobs//vertices//subtasks/accumulators
  *   /jobs//vertices//subtasks/
  *   /jobs//vertices//subtasks//attempts/
  *   
/jobs//vertices//subtasks//attempts//accumulators
  *   /jobs//plan


From: Chenyu Zheng 
Reply-To: "user-zh@flink.apache.org" 
Date: Friday, August 20, 2021 at 11:43 AM
To: "user-zh@flink.apache.org" 
Subject: 请问同一个flink history server能够支持多个flink application cluster吗?

您好,

我们目前在k8s上以flink application模式运行作业,现在希望部署一个history server方便debug。但是根据文档,flink 
historyserver貌似只支持单个cluster下不同job的使用方法,如果存在多个cluster,相同的jobID将会出现错误。

请问对于多个application cluster,history使用的最佳姿势是什么样的?

谢谢[cid:image001.png@01D795B8.6430A670]


请问同一个flink history server能够支持多个flink application cluster吗?

2021-08-19 Thread Chenyu Zheng
您好,

我们目前在k8s上以flink application模式运行作业,现在希望部署一个history server方便debug。但是根据文档,flink 
historyserver貌似只支持单个cluster下不同job的使用方法,如果存在多个cluster,相同的jobID将会出现错误。

请问对于多个application cluster,history使用的最佳姿势是什么样的?

谢谢[cid:image001.png@01D795B8.6430A670]


Re:Re: Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread 李航飞
你好:

SELECT window_start,window_end,SUM(price),item 
 FROM TABLE(
CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '1' MINUTES,INTERVAL'10' HOUR))

GROUP BY window_start,window_end,item
语句没有问题,正常每1分钟输出一次,过期时间代码已注释,
public ChangelogMode  getChanglogMode(ChangelogMode arg0){
   return ChangelogMode.upsert();
}
实现RedisMapper 方法  落地redis 有输出语句,每1分钟都会落地一次,我确定数据每次都一样
这upsert 不合理啊
在 2021-08-20 11:15:17,"Caizhi Weng"  写道:
>Hi!
>
>之前没注意到是 cumulate 窗口,具体的 group by 语句是什么样的呢?描述里写着“小窗口 1 分钟计算一次”,那 sink
>确实应该每分钟收到一条消息。
>
>sink 在 streaming 场景可以接收 upsert 消息,只要 getChangelogMode 返回的是 upsert 即可。
>
>李航飞  于2021年8月20日周五 上午10:03写道:
>
>> 你好:
>> 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗?
>>
>>
>>
>> 在 2021-08-20 09:10:44,"李航飞"  写道:
>> >你好:
>> >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。
>> >
>> >
>> >我在RichMapFunction接口里面实现open方法
>> >设置过StateTtlConfig;
>> >之后在RedisConmmand.SETEX设置过期时间
>> >都注释了,但upsert()方法还是没效
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >在 2021-08-19 17:44:02,"Caizhi Weng"  写道:
>> >>Hi!
>> >>
>> >>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>> >>
>> >>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>> >>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>> >>
>> >>李航飞  于2021年8月19日周四 下午5:03写道:
>> >>
>> >>> 版本 flink1.13.2
>> >>> 具体场景
>> >>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>> >>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>> >>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>> >>>
>> >>>
>> >>> 问题:
>> >>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>> >>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>> >>>
>> >>>
>>


Re: How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?

2021-08-19 Thread Jingsong Li
Hi Yik,

The **batch** Hive sink does not support `sink.partition-commit.policy.kind`.

Default **batch** Hive sink will commit metastore without success-file.

You can create a JIRA for this.

Best,
Jingsong

On Fri, Aug 20, 2021 at 11:01 AM Caizhi Weng  wrote:
>
> Hi!
>
> As far as I know Flink batch jobs will not add the _SUCCESS file. However for 
> batch jobs you can register a JobListener and add the _SUCCESS file by 
> yourself in JobListener#onJobExecuted. See registerJobListener method in 
> StreamExecutionEnvironment.
>
> Yik San Chan  于2021年8月20日周五 上午10:26写道:
>>
>> Hi community,
>>
>> According to the 
>> [docs](https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy),
>>  if I create a Hive table with config 
>> sink.partition-commit.policy.kind="metastore,success-file", once the write 
>> to the **streaming** Hive sink is finished:
>>
>> - The HDFS directory will be registered to the Hive metastore,
>> - There will be a _SUCCESS file written to the directory when the job 
>> finishes.
>>
>> An example result directory on HDFS looks like this:
>>
>> [10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819
>> Found 9 items
>> -rw-r-   2 basedata aiinfra  0 2021-08-20 08:56 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS
>> -rw-r-   2 basedata aiinfra   10684668 2021-08-20 08:49 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0
>> -rw-r-   2 basedata aiinfra   10712792 2021-08-20 08:48 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0
>> -rw-r-   2 basedata aiinfra   10759066 2021-08-20 08:46 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0
>> -rw-r-   2 basedata aiinfra   10754886 2021-08-20 08:46 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0
>> -rw-r-   2 basedata aiinfra   10681155 2021-08-20 08:45 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0
>> -rw-r-   2 basedata aiinfra   10725101 2021-08-20 08:46 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0
>> -rw-r-   2 basedata aiinfra   10717976 2021-08-20 08:56 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0
>> -rw-r-   2 basedata aiinfra   10585453 2021-08-20 08:45 
>> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0
>>
>> There are 8 part-* files because I set the flink run parallelism to 8. After 
>> all part-* are written, a _SUCCESS file is added (see the timestamp 08:56, 
>> which is later than all the rest).
>>
>> I wonder: can I do the same with **batch** Hive sink as well? Ideally, after 
>> the job finishes, I would like to have a _SUCCESS file added to the 
>> directory. However, I haven't figured out how to do it yet.
>>
>> Any help? Thanks!
>>
>> Best,
>> Yik San



-- 
Best, Jingsong Lee


退订

2021-08-19 Thread Bruce Zhang
邮件退订

Re: Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread Caizhi Weng
Hi!

之前没注意到是 cumulate 窗口,具体的 group by 语句是什么样的呢?描述里写着“小窗口 1 分钟计算一次”,那 sink
确实应该每分钟收到一条消息。

sink 在 streaming 场景可以接收 upsert 消息,只要 getChangelogMode 返回的是 upsert 即可。

李航飞  于2021年8月20日周五 上午10:03写道:

> 你好:
> 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗?
>
>
>
> 在 2021-08-20 09:10:44,"李航飞"  写道:
> >你好:
> >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。
> >
> >
> >我在RichMapFunction接口里面实现open方法
> >设置过StateTtlConfig;
> >之后在RedisConmmand.SETEX设置过期时间
> >都注释了,但upsert()方法还是没效
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2021-08-19 17:44:02,"Caizhi Weng"  写道:
> >>Hi!
> >>
> >>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
> >>
> >>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
> >>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
> >>
> >>李航飞  于2021年8月19日周四 下午5:03写道:
> >>
> >>> 版本 flink1.13.2
> >>> 具体场景
> >>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
> >>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
> >>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
> >>>
> >>>
> >>> 问题:
> >>> 测试发现,每1分钟都会输出一次,落地的数据一样,
> >>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
> >>>
> >>>
>


Re: How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?

2021-08-19 Thread Caizhi Weng
Hi!

As far as I know Flink batch jobs will not add the _SUCCESS file. However
for batch jobs you can register a JobListener and add the _SUCCESS file by
yourself in JobListener#onJobExecuted. See registerJobListener method in
StreamExecutionEnvironment.

Yik San Chan  于2021年8月20日周五 上午10:26写道:

> Hi community,
>
> According to the [docs](
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy),
> if I create a Hive table with config
> sink.partition-commit.policy.kind="metastore,success-file", once the write
> to the **streaming** Hive sink is finished:
>
> - The HDFS directory will be registered to the Hive metastore,
> - There will be a _SUCCESS file written to the directory when the job
> finishes.
>
> An example result directory on HDFS looks like this:
>
> [10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819
> Found 9 items
> -rw-r-   2 basedata aiinfra  0 2021-08-20 08:56
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS
> -rw-r-   2 basedata aiinfra   10684668 2021-08-20 08:49
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0
> -rw-r-   2 basedata aiinfra   10712792 2021-08-20 08:48
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0
> -rw-r-   2 basedata aiinfra   10759066 2021-08-20 08:46
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0
> -rw-r-   2 basedata aiinfra   10754886 2021-08-20 08:46
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0
> -rw-r-   2 basedata aiinfra   10681155 2021-08-20 08:45
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0
> -rw-r-   2 basedata aiinfra   10725101 2021-08-20 08:46
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0
> -rw-r-   2 basedata aiinfra   10717976 2021-08-20 08:56
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0
> -rw-r-   2 basedata aiinfra   10585453 2021-08-20 08:45
> /user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0
>
> There are 8 part-* files because I set the flink run parallelism to 8.
> After all part-* are written, a _SUCCESS file is added (see the timestamp
> 08:56, which is later than all the rest).
>
> I wonder: can I do the same with **batch** Hive sink as well? Ideally,
> after the job finishes, I would like to have a _SUCCESS file added to the
> directory. However, I haven't figured out how to do it yet.
>
> Any help? Thanks!
>
> Best,
> Yik San
>


Re:Re: Re: cumulate函数和比较函数连用报错

2021-08-19 Thread 李航飞
你好:
具体场景是对agg结果之前进行过滤,现在通过create view进行提取过滤了
现在我想通过DynameicTable的方式,以upsert写入redis里面




在 2021-08-20 10:31:18,"Caizhi Weng"  写道:
>Hi!
>
>具体的条件是什么样的呢?理论上如果是和原本的 group by ... having ... 等价的语句应该是支持的。
>
>李航飞  于2021年8月18日周三 下午4:34写道:
>
>> 哦哦,好的,谢谢你。比较函数不可以连用,简单的一些的条件(where a = '2')可以用,这个功能后续还会调整吗
>> 在 2021-08-18 16:21:20,"Caizhi Weng"  写道:
>> >Hi!
>> >
>> >目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window
>> >agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。
>> >
>> >李航飞  于2021年8月18日周三 下午3:55写道:
>> >
>> >> 通过flinksql建立数据处理通道
>> >> SELECT window_start,window_end,SUM(price)
>> >>
>> >> FROM TABLE(
>> >>
>> >> CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL
>> '10'
>> >> MINUTES))
>> >>
>> >> GROUP BY window_start,window_end;
>> >>
>> >> 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题
>> >> 关键一步是 StatementSet对象 sta.execute() 执行报错
>> >> java.lang.UnsupportedOperationException:
>> >> Currently Flink doesn't support individual window table-valued function
>> >> CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]).
>> >>  Please use window table-valued function with aggregate together using
>> >> window_start and window_end as group keys.
>> >> 执行环境是flink1.13.1  去掉where条件可以正常执行,加上就不行。
>> >>
>> >>
>>


Re: DataStream to Table API

2021-08-19 Thread Caizhi Weng
Hi!

This is because TypeExtractor#getMapReturnTypes are not dealing with row
types (see that method and also TypeExtractor#privateGetForClass). You
might want to open a JIRA ticket for this.

Matthias Broecheler  于2021年8月20日周五 上午7:01写道:

> Hey Flinkers,
>
> I am trying to follow the docs
> 
>  to
> convert a DataStream to a Table. Specifically, I have a DataStream of Row
> and want the columns of the row to become the columns of the resulting
> table.
>
> That works but only if I construct the Rows statically. If I construct
> them dynamically (in a map) then Flink turns the entire Row into one column
> of type "RAW('org.apache.flink.types.Row', '...')".
>
> Does anybody know why this is the case or how to fix it? Take a look at
> the simple Flink program below where I construct the DataStream "rows" in
> two different ways. I would expect those to be identical (and the sink does
> print identical information) but the inferred table schema is different.
>
> Thanks a ton,
> Matthias
>
> --
>
> StreamExecutionEnvironment flinkEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>
> DataStream integers = flinkEnv.fromElements(12, 5);
>
> DataStream rows = integers.map(i -> Row.of("Name"+i, i));
>
> //  This alternative way of constructing this data stream produces the 
> expected table schema
> //  DataStream rows = flinkEnv.fromElements(Row.of("Name12", 12), 
> Row.of("Name5", 5));
>
> StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(flinkEnv);
> Table table = tableEnv.fromDataStream(rows);
> table.printSchema();
>
> rows.addSink(new PrintSinkFunction<>());
>
> flinkEnv.execute();
>
>


Re: Re: cumulate函数和比较函数连用报错

2021-08-19 Thread Caizhi Weng
Hi!

具体的条件是什么样的呢?理论上如果是和原本的 group by ... having ... 等价的语句应该是支持的。

李航飞  于2021年8月18日周三 下午4:34写道:

> 哦哦,好的,谢谢你。比较函数不可以连用,简单的一些的条件(where a = '2')可以用,这个功能后续还会调整吗
> 在 2021-08-18 16:21:20,"Caizhi Weng"  写道:
> >Hi!
> >
> >目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window
> >agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。
> >
> >李航飞  于2021年8月18日周三 下午3:55写道:
> >
> >> 通过flinksql建立数据处理通道
> >> SELECT window_start,window_end,SUM(price)
> >>
> >> FROM TABLE(
> >>
> >> CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL
> '10'
> >> MINUTES))
> >>
> >> GROUP BY window_start,window_end;
> >>
> >> 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题
> >> 关键一步是 StatementSet对象 sta.execute() 执行报错
> >> java.lang.UnsupportedOperationException:
> >> Currently Flink doesn't support individual window table-valued function
> >> CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]).
> >>  Please use window table-valued function with aggregate together using
> >> window_start and window_end as group keys.
> >> 执行环境是flink1.13.1  去掉where条件可以正常执行,加上就不行。
> >>
> >>
>


How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?

2021-08-19 Thread Yik San Chan
Hi community,

According to the [docs](
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/filesystem/#partition-commit-policy),
if I create a Hive table with config
sink.partition-commit.policy.kind="metastore,success-file", once the write
to the **streaming** Hive sink is finished:

- The HDFS directory will be registered to the Hive metastore,
- There will be a _SUCCESS file written to the directory when the job
finishes.

An example result directory on HDFS looks like this:

[10.106.11.21:serv@cn-hz-wl-prod-data-stat00:~]$ hdfs dfs -ls
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819
Found 9 items
-rw-r-   2 basedata aiinfra  0 2021-08-20 08:56
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/_SUCCESS
-rw-r-   2 basedata aiinfra   10684668 2021-08-20 08:49
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-0-0
-rw-r-   2 basedata aiinfra   10712792 2021-08-20 08:48
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-1-0
-rw-r-   2 basedata aiinfra   10759066 2021-08-20 08:46
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-2-0
-rw-r-   2 basedata aiinfra   10754886 2021-08-20 08:46
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-3-0
-rw-r-   2 basedata aiinfra   10681155 2021-08-20 08:45
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-4-0
-rw-r-   2 basedata aiinfra   10725101 2021-08-20 08:46
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-5-0
-rw-r-   2 basedata aiinfra   10717976 2021-08-20 08:56
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-6-0
-rw-r-   2 basedata aiinfra   10585453 2021-08-20 08:45
/user/hive/warehouse/aiinfra.db/user_loss_predictions/p_day=20210819/part-3ee91bc0-a5f6-44c9-b2e5-3d50ee882028-7-0

There are 8 part-* files because I set the flink run parallelism to 8.
After all part-* are written, a _SUCCESS file is added (see the timestamp
08:56, which is later than all the rest).

I wonder: can I do the same with **batch** Hive sink as well? Ideally,
after the job finishes, I would like to have a _SUCCESS file added to the
directory. However, I haven't figured out how to do it yet.

Any help? Thanks!

Best,
Yik San


Re: Flink SQL Api不支持TIMESTAMP(p) WITH TIME ZONE 类型的列

2021-08-19 Thread changfeng
你好
  感谢解答,我仔细看了下Flink Table API & SQL Data Types页面: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/
 

 , 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#data-types-in-the-table-api
 

 这一节末尾的表格是否就是Flink当前已经支持的所有数据类型呢?


> 2021年8月19日 下午9:10,Leonard Xu  写道:
> 
> Hello,
> 
> Flink 还不支持 TIMESTAMP WITH TIME ZONE 类型,
> 
> 目前支持的有: 
> TIMESTAMP WITHOUT TIME ZONE, 缩写为 TIMESTAMP
> TIMESTAMP WITH LOCAL TIME ZONE,缩写为TIMESTAMP_LTZ
> 
> 祝好,
> Leonard
> 
>> 在 2021年8月19日,20:51,changfeng  写道:
>> 
>> ` TIMESTAMP(6) WITH TIME ZONE
> 



Re:Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread 李航飞
你好:
我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗?



在 2021-08-20 09:10:44,"李航飞"  写道:
>你好:
>我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。
>
>
>我在RichMapFunction接口里面实现open方法
>设置过StateTtlConfig;
>之后在RedisConmmand.SETEX设置过期时间
>都注释了,但upsert()方法还是没效
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-08-19 17:44:02,"Caizhi Weng"  写道:
>>Hi!
>>
>>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>>
>>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>>
>>李航飞  于2021年8月19日周四 下午5:03写道:
>>
>>> 版本 flink1.13.2
>>> 具体场景
>>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>>>
>>>
>>> 问题:
>>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>>>
>>>


Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread 李航飞
你好:
我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。


我在RichMapFunction接口里面实现open方法
设置过StateTtlConfig;
之后在RedisConmmand.SETEX设置过期时间
都注释了,但upsert()方法还是没效














在 2021-08-19 17:44:02,"Caizhi Weng"  写道:
>Hi!
>
>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>
>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>
>李航飞  于2021年8月19日周四 下午5:03写道:
>
>> 版本 flink1.13.2
>> 具体场景
>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>>
>>
>> 问题:
>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>>
>>


Kafka Metrics

2021-08-19 Thread Mason Chen
Hi all,

We have found that the per partition Kafka metrics contributes to a lot of
metrics being indexed by our metrics system.

We would still like to have the proxied kafka metrics from the kafka
clients library. Is there a flag to only exclude Flink's additional Kafka
metrics?

Best,
Mason


Looking for suggestions about multithreaded CEP to be used with flink

2021-08-19 Thread Tejas B
Hi,
Here's our use case :
We are planning to build a rule based engine on top of flink with huge number 
of rules(1000s). the rules could be stateless or stateful. 
Example stateless rule is : A.id = 3 && A.name = 'abc' || A.color = red. 
Example stateful rule is : A is event.id =3, B is event.name = 'abc', C is 
event.color = red and we are looking for pattern AB*C over time window of 1 
hour.

Now we have tried to use flink CEP for this purpose and program crashed because 
of lot of threads. The explanation is : every application of CEP.pattern 
creates a new operator in the graph and flink can't support that many vertices 
in a graph.

Other approach could be to use processFunction in flink, but still to run the 
rules on events stream you'd have to use some sort of CEP or write your own.

My question is, does anybody have any other suggestions on how to achieve this 
? Any other CEPs that integrate and work better with flink (siddhi, jasper, 
drools) ? Any experience would be helpful.


DataStream to Table API

2021-08-19 Thread Matthias Broecheler
Hey Flinkers,

I am trying to follow the docs

to
convert a DataStream to a Table. Specifically, I have a DataStream of Row
and want the columns of the row to become the columns of the resulting
table.

That works but only if I construct the Rows statically. If I construct them
dynamically (in a map) then Flink turns the entire Row into one column of
type "RAW('org.apache.flink.types.Row', '...')".

Does anybody know why this is the case or how to fix it? Take a look at the
simple Flink program below where I construct the DataStream "rows" in two
different ways. I would expect those to be identical (and the sink does
print identical information) but the inferred table schema is different.

Thanks a ton,
Matthias

--

StreamExecutionEnvironment flinkEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);

DataStream integers = flinkEnv.fromElements(12, 5);

DataStream rows = integers.map(i -> Row.of("Name"+i, i));

//  This alternative way of constructing this data stream produces the
expected table schema
//  DataStream rows = flinkEnv.fromElements(Row.of("Name12",
12), Row.of("Name5", 5));

StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(flinkEnv);
Table table = tableEnv.fromDataStream(rows);
table.printSchema();

rows.addSink(new PrintSinkFunction<>());

flinkEnv.execute();


RE: failures during job start

2021-08-19 Thread Colletta, Edward
Thanks you.   I am going to try the first option for now, but I do need to 
figure out why deployment takes so long.
Are there any metrics or log patterns that would indicate which task is waiting 
and which task is being waited on?


From: Chesnay Schepler 
Sent: Thursday, August 19, 2021 2:23 PM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: failures during job start

NOTICE: This email is from an external sender - do not click on links or 
attachments unless you recognize the sender and know the content is safe.

This exception means that a task was deployed, but the task that produces the 
data it wants to consume was not available yet (even after waiting for a while).

Your case sounds similar to https://issues.apache.org/jira/browse/FLINK-9413, 
where this happens because the deployment of the producing task takes too long.

You have 2 options to solve this:
a) Have Flink wait longer for the partition to be created by increasing 
taskmanager.network.request-backoff.max
b) Speed up the deployment; for this you'd naturally have to investigate why 
the deployment takes so long in the first place.

On 19/08/2021 07:15, Colletta, Edward wrote:
Any help with this would be appreciated.   Is it possible that this is a 
data/application issue or a flink config/resource issue?

Using flink 11.2, java 11, session cluster, 5 nodes 32 cores each node.

I have an issue where starting a job takes a long time, and sometimes fails 
with PartitionNotFoundException, but succeeds on restart.   The job has 10 
kafka sources (10 partitions for each topic) and parallelism 5.
The failure does not happen when the kafka logs are empty.

Note during below scenario, cpu usage on task manager and job managers is low 
(below 30%)

The scenario we see

  1.  run request to load and run a jar, job appears on dashboard with all 160 
subtasks in Deploying state
  2.  after 2 minutes some subtasks start transitioning to running.
  3.  after another 30 seconds failure occurs and job goes into Restarting state
  4.  after another minute, restart completes all nodes running.

Exception history shows
2021-08-15 07:55:02
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
Partition 205a0867c6ef540009acd962d556f981#0@a6b547c5096f3c33eb9059cfe767a2ec 
not found.
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765)
at 
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)








Re: Process suspend when get Hana connection in open method of sink function

2021-08-19 Thread Chesnay Schepler
If the Hana driver cannot be loaded then the most likely reason is that 
the dependency is not actually on the classpath.

Please double-check that your user jar bundles the dependency.

On 18/08/2021 15:05, Chenzhiyuan(HR) wrote:


Dear all:

I have a problem when I want to sink data to Hana database.

Process is suspended when get Hana connection in the open method of 
sink function as below.


My flink version is 1.10.

public class HrrmPayValueSumToHana extends 
RichSinkFunction  {


@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = HrrmUtils./getHanaConnection/();    // process is 
suspended here

}

@Override public void invoke() {
  …….
}
@Override public void close() throws Exception {
  ……….
}

}

public static Connection getHanaConnection() { Connection con = null; 
try { Class./forName/(HrrmConstants./HANA_DRIVER_CLASS/); con = 
DriverManager./getConnection/(HrrmConstants./HANA_SOURCE_DRIVER_URL/, 
    HrrmConstants./HANA_SOURCE_USER/, 
HrrmConstants./HANA_SOURCE_PASSWORD/);    } catch (Exception e) { /LOG/.error("---hana get connection has exception , msg = ", e); 
    } return con; }


Hana driver dependency as below:

     com.sap.cloud.db.jdbc     
ngdbc     2.3.62 






Re: Process suspend when get Hana connection in open method of sink function

2021-08-19 Thread Chesnay Schepler
If the Hana driver cannot be loaded then the most likely reason is that 
the dependency is not actually on the classpath.

Please double-check that your user jar bundles the dependency.

On 18/08/2021 15:05, Chenzhiyuan(HR) wrote:


Dear all:

I have a problem when I want to sink data to Hana database.

Process is suspended when get Hana connection in the open method of 
sink function as below.


My flink version is 1.10.

public class HrrmPayValueSumToHana extends 
RichSinkFunction  {


@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = HrrmUtils./getHanaConnection/();    // process is 
suspended here

}

@Override public void invoke() {
  …….
}
@Override public void close() throws Exception {
  ……….
}

}

public static Connection getHanaConnection() { Connection con = null; 
try { Class./forName/(HrrmConstants./HANA_DRIVER_CLASS/); con = 
DriverManager./getConnection/(HrrmConstants./HANA_SOURCE_DRIVER_URL/, 
    HrrmConstants./HANA_SOURCE_USER/, 
HrrmConstants./HANA_SOURCE_PASSWORD/);    } catch (Exception e) { /LOG/.error("---hana get connection has exception , msg = ", e); 
    } return con; }


Hana driver dependency as below:

     com.sap.cloud.db.jdbc     
ngdbc     2.3.62 






Re: Pre shuffle aggregation in flink is not working

2021-08-19 Thread suman shil
Hi Jing,
Thanks for looking into this. Here is the code of `TaxiFareStream'. I was
following this link
http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
. Please let me know if there is any other way of aggregating elements
locally.














*public class TaxiFareStream extends MapBundleOperator {private KeySelector keySelector;
  public TaxiFareStream(MapBundleFunction userFunction,  BundleTrigger
bundleTrigger,  KeySelector
keySelector) {super(userFunction, bundleTrigger, keySelector);
  this.keySelector = keySelector;}@Overrideprotected Long
getKey(TaxiFare input) throws Exception {return
keySelector.getKey(input);}}*

Thanks

On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG  wrote:

> Hi Suman,
> Would you please provide the code about `*TaxiFareStream*`? It seems we
> could use `MapBundleOperator` directly here.
> BTW, I have some concerns about using the solution to do local-aggregation
> for window aggregation because `MapBundleOperator`
> would save input data in a bundle which is a HashMap object which could
> not keep the data input sequence. I'm afraid there exists
> unorder in a bundle (in your case 10) problem. I'm not sure whether it is
> reasonable to assign a watermark based on an unordered
> timestamp.
>
> Best,
> JING ZHANG
>
>
>
> suman shil  于2021年8月19日周四 下午12:43写道:
>
>> I am trying to do pre shuffle aggregation in flink. Following is the
>> MapBundle implementation.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *public class TaxiFareMapBundleFunction extends MapBundleFunction> TaxiFare, TaxiFare, TaxiFare> {@Overridepublic TaxiFare
>> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
>>   if (value == null) {return input;}value.tip =
>> value.tip + input.tip;return value;}@Overridepublic
>> void finishBundle(Map buffer, Collector out)
>> throws Exception {for (Map.Entry entry :
>> buffer.entrySet()) {out.collect(entry.getValue());}
>> }}*
>>
>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is
>> not working as the "*count*" variable is always 0. Please let me know If
>> I am missing something.
>>
>>
>>
>>
>>
>>
>>
>>
>> *@Overridepublic void onElement(T element) throws Exception {
>> count++;if (count >= maxCount) {
>> callback.finishBundle();reset();}}*
>>
>> Here is the main code.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *MapBundleFunction
>> mapBundleFunction = new TaxiFareMapBundleFunction();
>> BundleTrigger bundleTrigger = new CountBundleTrigger<>(10);
>>   KeySelector taxiFareLongKeySelector = new
>> KeySelector() {@Overridepublic Long
>> getKey(TaxiFare value) throws Exception {return
>> value.driverId;}};DataStream> Float>> hourlyTips =//fares.keyBy((TaxiFare
>> fare) -> fare.driverId)//
>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
>> AddTips());;fares.transform("preshuffle",
>> TypeInformation.of(TaxiFare.class),new
>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>> )).assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(20)) {
>>   @Overridepublic long
>> extractTimestamp(TaxiFare element) {return
>> element.startTime.getEpochSecond();}
>> }).keyBy((TaxiFare fare) ->
>> fare.driverId)
>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>   .process(new AddTips());DataStream>
>> hourlyMax =
>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>>
>> Thanks
>> Suman
>>
>


Re: failures during job start

2021-08-19 Thread Chesnay Schepler
This exception means that a task was deployed, but the task that 
produces the data it wants to consume was not available yet (even after 
waiting for a while).


Your case sounds similar to 
https://issues.apache.org/jira/browse/FLINK-9413, where this happens 
because the deployment of the producing task takes too long.


You have 2 options to solve this:
a) Have Flink wait longer for the partition to be created by increasing 
taskmanager.network.request-backoff.max
b) Speed up the deployment; for this you'd naturally have to investigate 
why the deployment takes so long in the first place.


On 19/08/2021 07:15, Colletta, Edward wrote:


Any help with this would be appreciated.   Is it possible that this is 
a data/application issue or a flink config/resource issue?


Using flink 11.2, java 11, session cluster, 5 nodes 32 cores each node.

I have an issue where starting a job takes a long time, and sometimes 
fails with PartitionNotFoundException, but succeeds on restart.   The 
job has 10 kafka sources (10 partitions for each topic) and parallelism 5.


The failure does not happen when the kafka logs are empty.

Note during below scenario, cpu usage on task manager and job managers 
is low (below 30%)


The scenario we see

  * run request to load and run a jar, job appears on dashboard with
all 160 subtasks in Deploying state
  * after 2 minutes some subtasks start transitioning to running.
  * after another 30 seconds failure occurs and job goes into
Restarting state
  * after another minute, restart completes all nodes running.

Exception history shows

2021-08-15 07:55:02

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
Partition 
205a0867c6ef540009acd962d556f981#0@a6b547c5096f3c33eb9059cfe767a2ec 
not found.


    at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)


    at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)


    at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)


    at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765)


    at 
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)


    at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)


    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)

    at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)


    at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)


    at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)


    at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)


    at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)






Re: Just failed while starting

2021-08-19 Thread Chesnay Schepler
Can you share the logs with us (ideally on DEBUG if available) from the 
affected TaskManager and JobManager?


On 19/08/2021 08:29, Ivan Yang wrote:

Dear Flink community,

I recently running into this issue at a job startup. It happened from time to 
time. Here is the exception from the job manager:

2021-08-17 01:21:01,944 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: Defence raw 
event prod05_analytics_output -> String to JSON -> (Sink: Malformed JSON Sink, ThreatSight Filter -> Raw Json 
to ThreatSight Json -> ThreatSight Json to String -> Sink: ThreatSight Sink, Json to CDCA -> (mssp eventlog 
filter 1 -> mssp eventlog filter 2 -> CDCA to eventlog json -> Flat Map, Sink: Parquet Sink Event Time), mssp 
json filter -> action filter -> raw json to action json, Filter -> Json to ResponseAlarm, Filter -> json 
to SecurityEvent) (542/626) (a7be17221c0726a67679091062cfa8dc) switched from DEPLOYING to FAILED on 
172.1.200.173:6122-856ad2 @ ip-172-1-200-173.ec2.internal (dataPort=6121).
org.apache.flink.util.FlinkException: Could not mark slot 
58af05c3109a0fe8f96ea8936c0783a4 active.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$handleAcceptedSlotOffers$18(TaskExecutor.java:1561)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 ~[?:1.8.0_302]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 ~[?:1.8.0_302]
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
 ~[?:1.8.0_302]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
2021-08-17 01:21:01,945 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Discarding 
the results produced by task execution a7be17221c0726a67679091062cfa8dc.
2021-08-17 01:21:01,945 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Discarding 
the results produced by task execution a7be17221c0726a67679091062cfa8dc.
2021-08-17 01:21:01,945 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
7ac3b50525c642dc419b976cfaf0ee0e_541.
2021-08-17 01:21:01,999 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 3150 tasks should be restarted to recover the failed task 
7ac3b50525c642dc419b976cfaf0ee0e_541.
2021-08-17 01:21:02,012 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job Event 
Router prod05 

Re: Job manager sometimes doesn't restore job from checkpoint post TaskManager failure

2021-08-19 Thread Chesnay Schepler
How do you deploy Flink on Kubernetes? Do you use the standalone 
 
or native 
 
mode?


Is it really just task managers going down? It seems unlikely that the 
loss of a TM could have such an effect.


Can you provide us with the JobManager logs at the time the TM crash 
occurred? They should contain some hints as to how Flink handled the TM 
failure.



On 19/08/2021 16:06, Kevin Lam wrote:

Hi all,

I've noticed that sometimes when task managers go down--it looks like 
the job is not restored from checkpoint, but instead restarted from a 
fresh state (when I go to the job's checkpoint tab in the UI, I don't 
see the restore, and the number in the job overview all get reset). 
Under what circumstances does this happen?


I've been trying to debug and we really want the job to restore from 
the checkpoint at all times for our use case.


We're running Apache Flink 1.13 on Kubernetes in a high availability 
set-up.


Thanks in advance!





Re: map concurrent modification exception analysis when checkpoint

2021-08-19 Thread Chesnay Schepler
Essentially this exception means that the state was modified while a 
snapshot was being taken.


We usually see this when users hold on to some state value beyond a 
single call to a user-defined function, particularly from different threads.


We may be able to pinpoint the issue if you were to provide us with the 
functions.


On 19/08/2021 16:59, yidan zhao wrote:

Flink web ui shows the exception as follows.
In the task (ual_transform_UserLogBlackUidJudger ->
ual_transform_IpLabel ), the first one is a broadcast process
function, and the second one is an async function. I do not know
whether the issues have some relation to it.

And the issues not occurred before, it occurred after I upgraded to
flink 1.13.2.



_exception info from flink web ui:_
java.io.IOException: Could not perform checkpoint 58 for operator
ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel
(29/60)#0.

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)

at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)

at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)

at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)

at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)

at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)

at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)

at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)

at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)

at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)

at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)

at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.checkpoint.CheckpointException:
Could not complete snapshot 58 for operator
ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel
(29/60)#0. Failure reason: Checkpoint was declined.

at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)

at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)

at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)

at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)

at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)

at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)

... 20 more

Caused by: java.util.ConcurrentModificationException

at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)

at 

Re: Metrics outside of RichFunction context

2021-08-19 Thread Chesnay Schepler
I don't believe there are other options, outside of creating a ticket to 
have Flink extend the API accordingly.


On 19/08/2021 16:40, John Karp wrote:

Hi,

I'm using StreamingFileSink to collect records into avro files. Inside 
of the BulkWriter implementation, I have to do some operations (such 
as dynamic schema lookup) which I want to record metrics about. 
However, the BulkWriter API, as it is defined, does not accept a 
RuntimeContext or MetricsGroup from the StreamingFileSink, so I 
seemingly don't have access to the metrics API. And it doesn't look 
like StreamingFileSink is particularly extensible. What alternatives 
do I have besides forking StreamingFileSink, or bypassing the Flink 
metrics API entirely?


Thanks,
John





Re: Apache Flink matrics are not alligned in the reporter

2021-08-19 Thread Chesnay Schepler

What reporter interval do you have configured?

On 19/08/2021 13:31, Jawad Tahir wrote:

Hi,

I have defined Graphite as my matrics reporter with my Flink 
(v1.13.2). My pipeline is pretty simple. It consists of one source, 
one stateful operator (simple window aggregation), and one sink 
(operations-playground, basically). I have set the parallel factor as 
2. The graph of the pipeline is as follows:


[Flink pipeline][1]

The program is running well and producing the correct results. 
However, when I see the matrics, I see that source started sending the 
records way after the system has started even though my sink was 
producing correct results since the start of the job. Here is the 
[graph][2] of uptime of the job and numRecordsOut of the source. As 
far as I understood, Apache Flink sources' numRecordsOut should start 
with uptime as my sink was producing correct results since the start.



  [1]: https://i.stack.imgur.com/rZm5h.png 

  [2]: https://i.stack.imgur.com/nlBoS.png 






Re: Pre shuffle aggregation in flink is not working

2021-08-19 Thread JING ZHANG
Hi Suman,
Would you please provide the code about `*TaxiFareStream*`? It seems we
could use `MapBundleOperator` directly here.
BTW, I have some concerns about using the solution to do local-aggregation
for window aggregation because `MapBundleOperator`
would save input data in a bundle which is a HashMap object which could not
keep the data input sequence. I'm afraid there exists
unorder in a bundle (in your case 10) problem. I'm not sure whether it is
reasonable to assign a watermark based on an unordered
timestamp.

Best,
JING ZHANG



suman shil  于2021年8月19日周四 下午12:43写道:

> I am trying to do pre shuffle aggregation in flink. Following is the
> MapBundle implementation.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *public class TaxiFareMapBundleFunction extends MapBundleFunction TaxiFare, TaxiFare, TaxiFare> {@Overridepublic TaxiFare
> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
>   if (value == null) {return input;}value.tip =
> value.tip + input.tip;return value;}@Overridepublic
> void finishBundle(Map buffer, Collector out)
> throws Exception {for (Map.Entry entry :
> buffer.entrySet()) {out.collect(entry.getValue());}
> }}*
>
> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is
> not working as the "*count*" variable is always 0. Please let me know If
> I am missing something.
>
>
>
>
>
>
>
>
> *@Overridepublic void onElement(T element) throws Exception {
>   count++;if (count >= maxCount) {
> callback.finishBundle();reset();}}*
>
> Here is the main code.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *MapBundleFunction
> mapBundleFunction = new TaxiFareMapBundleFunction();
> BundleTrigger bundleTrigger = new CountBundleTrigger<>(10);
>   KeySelector taxiFareLongKeySelector = new
> KeySelector() {@Overridepublic Long
> getKey(TaxiFare value) throws Exception {return
> value.driverId;}};DataStream Float>> hourlyTips =//fares.keyBy((TaxiFare
> fare) -> fare.driverId)//
> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
> AddTips());;fares.transform("preshuffle",
> TypeInformation.of(TaxiFare.class),new
> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
> )).assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(20)) {
>   @Overridepublic long
> extractTimestamp(TaxiFare element) {return
> element.startTime.getEpochSecond();}
> }).keyBy((TaxiFare fare) ->
> fare.driverId)
> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>   .process(new AddTips());DataStream>
> hourlyMax =
> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>
> Thanks
> Suman
>


map concurrent modification exception analysis when checkpoint

2021-08-19 Thread yidan zhao
Flink web ui shows the exception as follows.
In the task (ual_transform_UserLogBlackUidJudger ->
ual_transform_IpLabel ), the first one is a broadcast process
function, and the second one is an async function. I do not know
whether the issues have some relation to it.

And the issues not occurred before, it occurred after I upgraded to
flink 1.13.2.



_exception info from flink web ui:_
java.io.IOException: Could not perform checkpoint 58 for operator
ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel
(29/60)#0.

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)

at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)

at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)

at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)

at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)

at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)

at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)

at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)

at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)

at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)

at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)

at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.checkpoint.CheckpointException:
Could not complete snapshot 58 for operator
ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel
(29/60)#0. Failure reason: Checkpoint was declined.

at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)

at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)

at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)

at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)

at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)

at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)

... 20 more

Caused by: java.util.ConcurrentModificationException

at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)

at java.util.HashMap$EntryIterator.next(HashMap.java:1479)

at java.util.HashMap$EntryIterator.next(HashMap.java:1477)

at 
com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)

at 
com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)

at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)

at 

Metrics outside of RichFunction context

2021-08-19 Thread John Karp
Hi,

I'm using StreamingFileSink to collect records into avro files. Inside of
the BulkWriter implementation, I have to do some operations (such as
dynamic schema lookup) which I want to record metrics about. However, the
BulkWriter API, as it is defined, does not accept a RuntimeContext or
MetricsGroup from the StreamingFileSink, so I seemingly don't have access
to the metrics API. And it doesn't look like StreamingFileSink is
particularly extensible. What alternatives do I have besides forking
StreamingFileSink, or bypassing the Flink metrics API entirely?

Thanks,
John


Job manager sometimes doesn't restore job from checkpoint post TaskManager failure

2021-08-19 Thread Kevin Lam
Hi all,

I've noticed that sometimes when task managers go down--it looks like the
job is not restored from checkpoint, but instead restarted from a fresh
state (when I go to the job's checkpoint tab in the UI, I don't see the
restore, and the number in the job overview all get reset). Under what
circumstances does this happen?

I've been trying to debug and we really want the job to restore from the
checkpoint at all times for our use case.

We're running Apache Flink 1.13 on Kubernetes in a high availability
set-up.

Thanks in advance!


Re: Flink SQL Api不支持TIMESTAMP(p) WITH TIME ZONE 类型的列

2021-08-19 Thread Leonard Xu
Hello,

Flink 还不支持 TIMESTAMP WITH TIME ZONE 类型,

目前支持的有: 
TIMESTAMP WITHOUT TIME ZONE, 缩写为 TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE,缩写为TIMESTAMP_LTZ

祝好,
Leonard

> 在 2021年8月19日,20:51,changfeng  写道:
> 
> ` TIMESTAMP(6) WITH TIME ZONE



Re: Flink SQL Api不支持TIMESTAMP(p) WITH TIME ZONE 类型的列

2021-08-19 Thread Caizhi Weng
Hi!

目前 timestamp 相关类型只支持普通的 timestamp 还有 timestamp with local time zone
这两种。如果是有把 Fri Mar 26 12:27:05 IST 2021 这种带时区的 string 转成 timestamp 的需求,建议使用
date_format 函数。

changfeng  于2021年8月19日周四 下午8:52写道:

> 你好, 我最近在使用Flink 1.13.1版本的SQL Api时,遇到了不支持TIMESTAMP(p) WITH TIME ZONE
> 类型数据的问题:
>  使用SQL: CREATE TABLE source ( `ctimestamp` TIMESTAMP(6) WITH TIME ZONE)
> WITH ('connector' = 'print’) 创建表,报错:
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> "TIME" at line 1, column 55.
> Was expecting:
> "LOCAL" ...
>
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:450)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:213)
> at
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
> at
> org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
> at
> org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
> at
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:54)
> ... 30 more
> Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered
> "TIME" at line 1, column 55.
> Was expecting:
> "LOCAL" ...
>
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39782)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39593)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TimeZoneOpt(FlinkSqlParserImpl.java:25946)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.DateTimeTypeName(FlinkSqlParserImpl.java:25892)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlTypeName(FlinkSqlParserImpl.java:25168)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TypeName(FlinkSqlParserImpl.java:24787)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ExtendedDataType(FlinkSqlParserImpl.java:4990)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TypedColumn(FlinkSqlParserImpl.java:4866)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TableColumn(FlinkSqlParserImpl.java:4491)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5197)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6233)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:20934)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3415)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3918)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:261)
> at
> org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
> ... 32 more
>
> 而使用SQL: CREATE TABLE source ( `ctimestamp` TIMESTAMP(6) WITH LOCAL TIME
> ZONE) WITH ('connector' = 'print’) 创建表则不会有问题,请问Flink SQL
> Api当前是否支持TIMESTAMP(p) WITH TIME ZONE 类型数据或者有相关Bug。
>
>


Flink SQL Api不支持TIMESTAMP(p) WITH TIME ZONE 类型的列

2021-08-19 Thread changfeng
你好, 我最近在使用Flink 1.13.1版本的SQL Api时,遇到了不支持TIMESTAMP(p) WITH TIME ZONE 类型数据的问题:
 使用SQL: CREATE TABLE source ( `ctimestamp` TIMESTAMP(6) WITH TIME ZONE) WITH 
('connector' = 'print’) 创建表,报错:
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "TIME" 
at line 1, column 55.
Was expecting:
"LOCAL" ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:450)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:213)
at 
org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at 
org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
at 
org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:54)
... 30 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" 
at line 1, column 55.
Was expecting:
"LOCAL" ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39782)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39593)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TimeZoneOpt(FlinkSqlParserImpl.java:25946)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.DateTimeTypeName(FlinkSqlParserImpl.java:25892)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlTypeName(FlinkSqlParserImpl.java:25168)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TypeName(FlinkSqlParserImpl.java:24787)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ExtendedDataType(FlinkSqlParserImpl.java:4990)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TypedColumn(FlinkSqlParserImpl.java:4866)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TableColumn(FlinkSqlParserImpl.java:4491)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5197)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6233)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:20934)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3415)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3918)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:261)
at 
org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
... 32 more

而使用SQL: CREATE TABLE source ( `ctimestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE) 
WITH ('connector' = 'print’) 创建表则不会有问题,请问Flink SQL Api当前是否支持TIMESTAMP(p) WITH 
TIME ZONE 类型数据或者有相关Bug。



Re: How can I build the flink docker image from source code?

2021-08-19 Thread Caizhi Weng
Hi!

If you only modified Java code, use mvn clean package to build Flink from
source code. After that COPY all jars in
flink-dist/target/flink-/lib to the lib directory of the latest
Flink image.

Chenyu Zheng  于2021年8月19日周四 下午7:36写道:

> Hi contributors,
>
>
>
> I’ve changed a little bit code in flink, and want to build a docker image
> to test it. Could you tell me how can I build the image from source code?
>
>
>
> Thx!
>


Re: 请问如何从源码构建flink docker镜像?

2021-08-19 Thread Caizhi Weng
Hi!

如果只是改了 java 代码,可以先把 flink 编译出来,然后基于最新的镜像,把你编译出来的 flink(位于
flink-dist/target/flink-)的 lib 目录里的所有 jar COPY 到镜像的 lib 目录即可。

Chenyu Zheng  于2021年8月19日周四 下午7:38写道:

> Hi,
>
> 我最近对于手头的源码进行了些许修改,请问如何从源码构建docker镜像?这将方便我进行下一步测试
>
> 谢谢
>


请问如何从源码构建flink docker镜像?

2021-08-19 Thread Chenyu Zheng
Hi,

我最近对于手头的源码进行了些许修改,请问如何从源码构建docker镜像?这将方便我进行下一步测试

谢谢


How can I build the flink docker image from source code?

2021-08-19 Thread Chenyu Zheng
Hi contributors,

I’ve changed a little bit code in flink, and want to build a docker image to 
test it. Could you tell me how can I build the image from source code?

Thx!


Apache Flink matrics are not alligned in the reporter

2021-08-19 Thread Jawad Tahir
Hi,

I have defined Graphite as my matrics reporter with my Flink (v1.13.2). My
pipeline is pretty simple. It consists of one source, one stateful operator
(simple window aggregation), and one sink (operations-playground,
basically). I have set the parallel factor as 2. The graph of the pipeline
is as follows:

[Flink pipeline][1]

The program is running well and producing the correct results. However,
when I see the matrics, I see that source started sending the records way
after the system has started even though my sink was producing correct
results since the start of the job. Here is the [graph][2] of uptime of the
job and numRecordsOut of the source. As far as I understood, Apache Flink
sources' numRecordsOut should start with uptime as my sink was producing
correct results since the start.


  [1]: https://i.stack.imgur.com/rZm5h.png
  [2]: https://i.stack.imgur.com/nlBoS.png


Re: Theory question on process_continously processing mode and watermarks

2021-08-19 Thread Arvid Heise
I think what you are seeing is that the files have records with similar
timestamps. That means after reading file1 your watermarks are already
progressed to the end of your time range. When Flink picks up file2, all
records are considered late records and no windows fire anymore.

See [1] for a possible soluton on DataStream. Table API is dealing much
better with that if you use upserts [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#allowed-lateness
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/versioned_tables/

On Thu, Aug 19, 2021 at 11:48 AM Caizhi Weng  wrote:

> Hi!
>
> FileProcessingMode.PROCESS_CONTINUOUSLY means to continuously scans the
> file for updates, and there should be nothing to do with stopping the
> streaming job.
>
> I'm suspecting that in the column you defined the watermark there is some
> data which exceeds Long.MAX_VALUE. A Long.MAX_VALUE watermark indicates the
> job to stop. You might also want to share your code in the mailing lists so
> others can look into this problem more deeply.
>
> Fra  于2021年8月19日周四 下午5:26写道:
>
>> Hello, during my personal development of a Flink streaming Platform i
>> found something that perplexes me.
>>
>> Using FileProcessingMode.*PROCESS_CONTINUOUSLY*
>>
>> Into a streaming job that uses tumbling Windows and watermarks causes my
>> streaming process to stop ad the reading files phase.
>>
>> Meanwhile if i delete my declarations of Windows and watermark the
>> program works as expected.
>>
>> Is there some meaning behind this behaviour ? my theory is that
>> PROCESS_CONTINOUSLY re-reads the file and that causes a contradiction with
>> the watermarks created in the first reading of the files, causing it to stop
>>
>>
>>
>>
>>
>> Inviato da Posta  per
>> Windows
>>
>>
>>
>


Re: Theory question on process_continously processing mode and watermarks

2021-08-19 Thread Caizhi Weng
Hi!

FileProcessingMode.PROCESS_CONTINUOUSLY means to continuously scans the
file for updates, and there should be nothing to do with stopping the
streaming job.

I'm suspecting that in the column you defined the watermark there is some
data which exceeds Long.MAX_VALUE. A Long.MAX_VALUE watermark indicates the
job to stop. You might also want to share your code in the mailing lists so
others can look into this problem more deeply.

Fra  于2021年8月19日周四 下午5:26写道:

> Hello, during my personal development of a Flink streaming Platform i
> found something that perplexes me.
>
> Using FileProcessingMode.*PROCESS_CONTINUOUSLY*
>
> Into a streaming job that uses tumbling Windows and watermarks causes my
> streaming process to stop ad the reading files phase.
>
> Meanwhile if i delete my declarations of Windows and watermark the program
> works as expected.
>
> Is there some meaning behind this behaviour ? my theory is that
> PROCESS_CONTINOUSLY re-reads the file and that causes a contradiction with
> the watermarks created in the first reading of the files, causing it to stop
>
>
>
>
>
> Inviato da Posta  per
> Windows
>
>
>


Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread Caizhi Weng
Hi!

可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。

如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。

李航飞  于2021年8月19日周四 下午5:03写道:

> 版本 flink1.13.2
> 具体场景
> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>
>
> 问题:
> 测试发现,每1分钟都会输出一次,落地的数据一样,
> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>
>


Re: Timer Service vs Custom Triggers

2021-08-19 Thread Caizhi Weng
Hi!

If you'd like to aggregate something on the records before time out, then
you want to consider using session window (instead of writing your own
trigger). However if aggregation is not needed I would prefer using a
process function to process watermark by myself, as the registered timer in
the window are stored in memory for each record, and if you have a very
large data flow this might cause a shortage in heap memory.

Aeden Jameson  于2021年8月19日周四 上午8:07写道:

> My use case is that I'm producing a set of measurements by key every
> 60 seconds. Currently,  this is handled with the usual pattern of
> keyBy().window(Tumbling...(60)).process(...) I need to provide the same
> output, but as a result of a timeout. The data needed for the timeout
> summary will be in the global state for that key. This seems possible by
> either using the timer service in the process function without a window
> (e.g. keyBy(..).process(..)) or by using a customer trigger. Why choose one
> or the other?
>
> --
> Thanks,
> Aeden
>
> GitHub: https://github.com/aedenj
> Linked In: http://www.linkedin.com/in/aedenjameson
>
>


Theory question on process_continously processing mode and watermarks

2021-08-19 Thread Fra
Hello, during my personal development of a Flink streaming Platform i found something that perplexes me.Using FileProcessingMode.PROCESS_CONTINUOUSLYInto a streaming job that uses tumbling Windows and watermarks causes my streaming process to stop ad the reading files phase.Meanwhile if i delete my declarations of Windows and watermark the program works as expected.Is there some meaning behind this behaviour ? my theory is that PROCESS_CONTINOUSLY re-reads the file and that causes a contradiction with the watermarks created in the first reading of the files, causing it to stop  Inviato da Posta per Windows 


flink-connector-redis连接器upsert()模式插入问题

2021-08-19 Thread 李航飞
版本 flink1.13.2
具体场景
flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次


问题:
测试发现,每1分钟都会输出一次,落地的数据一样,
根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?



Re: Setting S3 parameters in a K8 jobmanager deployment

2021-08-19 Thread Yang Wang
I am afraid jobmanager.sh[1] could not parse the "-D" correctly now.

[1].
https://github.com/apache/flink/blob/master/flink-dist/src/main/flink-bin/bin/jobmanager.sh


Best,
Yang

Robert Cullen  于2021年8月18日周三 下午10:21写道:

> I have a kubernetes jobmanager deployment that requires parameters be
> passed as command line rather than retrieving values from the flink-config
> map. Is there a way to do this?
>
> apiVersion: apps/v1
> kind: Deployment
> metadata:
>   name: flink-jobmanager
> spec:
>   replicas: 1 # Set the value to greater than 1 to start standby JobManagers
>   selector:
> matchLabels:
>   app: flink
>   component: jobmanager
>   template:
> metadata:
>   labels:
> app: flink
> component: jobmanager
> spec:
>   containers:
>   - name: jobmanager
> image: apache/flink:1.13.0-scala_2.11
> args: ["jobmanager", "-Ds3.endpoint=https://192.173.0.0:9000;, 
> "-Ds3.access-key=key", "Ds3.secret-key=secret"]
> ports
>
> Robert Cullen
> 240-475-4490
>


关于InfluxDB中大量监控指标的count为0

2021-08-19 Thread Camile Sing
Hi!
我最近在为Flink集群搭建监控,使用了InfluxDB+Grafana。在关注每秒同步行数以及每秒同步数据大小时,我使用了以下相关指标:

taskmanager_job_task_numRecordsIn、taskmanager_job_task_Shuffle_Netty_Input_numBytesInLocalPerSecond


 让我感到的奇怪的是,当我去查询InfluxDB时,指标的count都为空

> select * from
> taskmanager_job_task_Shuffle_Netty_Input_numBytesInLocalPerSecond where
> count != 0;
> > select * from taskmanager_job_task_numRecordsIn where count != 0;

  所有指标的count都是空的

 > select count(*) from taskmanager_job_task_numRecordsIn where count = 0;

name: taskmanager_job_task_numRecordsIn

time count_count

 ---

0325289

> select count(*) from
> taskmanager_job_task_Shuffle_Netty_Input_numBytesInLocalPerSecond where
> count = 0;

name: taskmanager_job_task_Shuffle_Netty_Input_numBytesInLocalPerSecond

time count_count count_rate

 --- --

0325328  325328

   请问这是正常现象吗?如果这是正常的,我该如何在界面上展示我的同步行数/s以及同步大小/s?因为现在展示出来都是一条线,即为0
[image: image.png]
[image: image.png]


Just failed while starting

2021-08-19 Thread Ivan Yang
Dear Flink community,

I recently running into this issue at a job startup. It happened from time to 
time. Here is the exception from the job manager:

2021-08-17 01:21:01,944 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: 
Defence raw event prod05_analytics_output -> String to JSON -> (Sink: Malformed 
JSON Sink, ThreatSight Filter -> Raw Json to ThreatSight Json -> ThreatSight 
Json to String -> Sink: ThreatSight Sink, Json to CDCA -> (mssp eventlog filter 
1 -> mssp eventlog filter 2 -> CDCA to eventlog json -> Flat Map, Sink: Parquet 
Sink Event Time), mssp json filter -> action filter -> raw json to action json, 
Filter -> Json to ResponseAlarm, Filter -> json to SecurityEvent) (542/626) 
(a7be17221c0726a67679091062cfa8dc) switched from DEPLOYING to FAILED on 
172.1.200.173:6122-856ad2 @ ip-172-1-200-173.ec2.internal (dataPort=6121).
org.apache.flink.util.FlinkException: Could not mark slot 
58af05c3109a0fe8f96ea8936c0783a4 active.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$handleAcceptedSlotOffers$18(TaskExecutor.java:1561)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 ~[?:1.8.0_302]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 ~[?:1.8.0_302]
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
 ~[?:1.8.0_302]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
2021-08-17 01:21:01,945 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Discarding 
the results produced by task execution a7be17221c0726a67679091062cfa8dc.
2021-08-17 01:21:01,945 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Discarding 
the results produced by task execution a7be17221c0726a67679091062cfa8dc.
2021-08-17 01:21:01,945 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
7ac3b50525c642dc419b976cfaf0ee0e_541.
2021-08-17 01:21:01,999 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 3150 tasks should be restarted to recover the failed task 
7ac3b50525c642dc419b976cfaf0ee0e_541. 
2021-08-17 01:21:02,012 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job Event 
Router prod05 (0f10bafc07e48918f955fb22cbaa5735) switched from state RUNNING to 
RESTARTING.

I am using kubernetes deployment session mode. We have 4 jobs running in the 
cluster, 3