Re: Can we use CheckpointedFunction with the new Source api?

2022-05-31 Thread Qingsheng Ren
Hi Qing, 

Thanks for the input. I think having a stateful function to accumulate the tree 
after source is a reasonable solution to me. Under your design a split is 
mapping to a znode so the state persisted in the source reader would be 
per-node information, and it’s hard to accumulate them under the current 
abstraction of source. Also I’m a little bit curious about the use case. If the 
downstream requires the whole tree, does that means the parallelism of the 
accumulator has to be 1? Please forgive me if my understanding is incorrect. 

Another idea in my mind is that if you are also providing a reusable *table* 
source, you can wrap the source and the accumulating function together into a 
DataStreamScanProvider and provide as one table source to user. This might look 
a bit neater. 

Cheers, 

Qingsheng

> On May 31, 2022, at 16:04, Qing Lim  wrote:
> 
> Hi Qingsheng, thanks for getting back.
> 
> I manage to find a workaround, but if you can provide other suggestions it'd 
> be great too.
> 
> I followed the documentation here: 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/
> 
> I implemented a Custom Source that emits all changes from a Zookeeper node 
> (recursively), I modelled it such that everytime there's a change in the 
> tree, it will produce a split for the specific node, then the reader is 
> responsible to fetch the latest state of the node, and emit.
> This works fine, but we have a use case to emit the whole tree (recursively), 
> conceptually it is quite simple, I just need to accumulate the whole tree as 
> State, and then emit the State whenever it updates.
> 
> I was trying to achieve this inside the Source, because this is meant to be 
> something reusable within our organization, which is why I asked the original 
> question, but I have now solved it by implementing a stateful Map function 
> instead, it is a bit less ergonomic, but acceptable on my end. So if you have 
> an alternative, please share with me, thank you.
> 
> Kind regards
> 
> -Original Message-
> From: Qingsheng Ren  
> Sent: 31 May 2022 03:57
> To: Qing Lim 
> Cc: user@flink.apache.org
> Subject: Re: Can we use CheckpointedFunction with the new Source api?
> 
> Hi Qing,
> 
> I’m afraid CheckpointedFunction cannot be applied to the new source API, but 
> could you share the abstractions of your source implementation, like which 
> component a split maps to etc.? Maybe we can try to do some workarounds. 
> 
> Best, 
> 
> Qingsheng
> 
>> On May 30, 2022, at 20:09, Qing Lim  wrote:
>> 
>> Hi, is it possible to use CheckpointedFunction with the new Source api? (The 
>> one in package org.apache.flink.api.connector.source)
>> 
>> My use case:
>> 
>> I have a custom source that emit individual nodes update from a tree, and I 
>> wish to create a stream of the whole Tree snapshots, so I will have to 
>> accumulate all updates and keep it as state. In addition to this, I wish to 
>> expose this functionality as a library to my organization.
>> 
>> The custom source is written using the new Source api, I wonder if we can 
>> attach state to it?
>> 
>> Kind regards
>> 
>> This e-mail and any attachments are confidential to the addressee(s) and may 
>> contain information that is legally privileged and/or confidential. If you 
>> are not the intended recipient of this e-mail you are hereby notified that 
>> any dissemination, distribution, or copying of its content is strictly 
>> prohibited. If you have received this message in error, please notify the 
>> sender by return e-mail and destroy the message and all copies in your 
>> possession.
>> 
>> 
>> To find out more details about how we may collect, use and share your 
>> personal information, please see https://www.mwam.com/privacy-policy. This 
>> includes details of how calls you make to us may be recorded in order for us 
>> to comply with our legal and regulatory obligations.
>> 
>> 
>> To the extent that the contents of this email constitutes a financial 
>> promotion, please note that it is issued only to and/or directed only at 
>> persons who are professional clients or eligible counterparties as defined 
>> in the FCA Rules. Any investment products or services described in this 
>> email are available only to professional clients and eligible 
>> counterparties. Persons who are not professional clients or eligible 
>> counterparties should not rely or act on the contents of this email.
>> 
>> 
>> Marshall Wace LLP is authorised and regulated by the Financial Conduct 
>> Authority. Marshall Wace LLP is a limited liability partnership registered 
>> in England and Wales with registered number OC302228 and registered office 
>> at George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving 
>> this e-mail as a client, or an investor in an investment vehicle, managed or 
>> advised by Marshall Wace North America L.P., the sender of this e-mail is 
>> communicating with you in the sender's capacity as an 

Re: [External] Re: Exception when running Java UDF with Blink table planner

2022-05-31 Thread Shengkai Fang
Hi, Tom.

I don't reproduce the exception in the master. I am not sure whether the
problem is fixed or I missing something.

The only difference is my test udf extends ScalarFunction rather than
DPScalarFunction and I use String[] as the input type.

```

public static class ListToString extends ScalarFunction {
public String eval(String[] arr) {
return "foo";
}
}

```

I think you can also debug in this way:
0. Open the Flink repo and checkout to the release-1.11
1. Create the UDF in JavaUserDefinedScalarFunctions
2. Find a test in the table ITCase, e.g. TableSinkITCase.scala
3. Add a new test to verify the results. I just add the following code
```
 @Test
  def test(): Unit = {

val dataId = TestValuesTableFactory.registerRowData(
  Seq(GenericRowData.of(new
GenericArrayData(Array(StringData.fromString("3")).toArray[Object]

tEnv.executeSql(
  s"""
 |CREATE TABLE test2   (person ARRAY) WITH(
 |  'connector' = 'values',
 |  'data-id' = '$dataId',
 |  'register-internal-data' = 'true'
 |)
 |""".stripMargin
)
tEnv.createFunction("ListToString", classOf[ListToString])
tEnv.executeSql("SELECT ListToString(`person`) as col1_string FROM
`test2`").print()
  }
```
4. Then you can debug the case in your IDEA.

Considering the Flink 1.11 is not maintained by the community, do you mind
to upgrade to the latest version(1.13/1.14/1.15)?



Best,
Shengkai

Tom Thornton  于2022年6月1日周三 02:06写道:

> Hi all,
>
> Thank you for the help.
>
> It seems an exception thrown when Flink try to deserialize the object
>> outputed by your udf. So is the obejct produced by your udf serializable?
>> Does it contain any lambda function in the object/class?
>
>
> The output object of the UDF is the string "foo" which should be
> serializable. This exception only occurs when the input to the UDF is not
> null. However, when the input is null, then the output object (which is
> still the string "foo") does not cause any error or exception (i.e. it is
> able to be serialized). There are no lambda functions in the output object
> (it is just a string object).
>
> Thanks,
> Tom
>
> On Thu, May 26, 2022 at 9:36 PM yuxia  wrote:
>
>> It seems an exception thrown when Flink try to deserialize the object
>> outputed by your udf. So is the obejct produced by your udf serializable?
>> Does it contain any lambda function in the object/class?
>>
>> Best regards,
>> Yuxia
>>
>> --
>> *发件人: *"Tom Thornton" 
>> *收件人: *"User" 
>> *发送时间: *星期五, 2022年 5 月 27日 上午 6:47:04
>> *主题: *Exception when running Java UDF with Blink table planner
>>
>> We are migrating from the legacy table planner to the Blink table
>> planner. Previously we had a UDF defined like this that worked without
>> issue:
>>
>> public class ListToString extends DPScalarFunction {
>> public String eval(List list) {
>> return "foo";
>> }
>>
>> Since moving to the Blink table planner and receiving this error:
>>
>> Caused by: org.apache.flink.table.api.ValidationException: Given parameters 
>> of function 'ListToString' do not match any signature.
>> Actual: (java.lang.String[])
>> Expected: (java.util.List)
>>
>>
>> We refactored the UDF to take as input an Object[] to match what is
>> received from Blink:
>>
>> public class ListToString extends DPScalarFunction {
>> public String eval(Object[] arr) {return "foo";
>> }
>> }
>>
>> Now the UDF always fails (including for the simplified example above
>> where we return a constant string regardless of input). For example, when
>> we run on a query like this one:
>>
>> SELECT ListToString(`col1`) as col1_string FROM `table`
>>
>> Produces an IndexOutOfBoundsException:
>>
>> Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for 
>> length 0
>>  at 
>> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>>  at 
>> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>>  at 
>> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>>  at java.base/java.util.Objects.checkIndex(Objects.java:372)
>>  at java.base/java.util.ArrayList.get(ArrayList.java:459)
>>  at 
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
>>  at 
>> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570)
>>  at 
>> org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
>>  at 
>> org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700)
>>  at 

Re: Flink DataStream and remote Stateful Functions interoperability

2022-05-31 Thread Himanshu Sareen
Tanks Tymur, for the pointers.


I followed the GitHub link and got the understanding on how to define and 
configure Remote Functions with Datastream Api

However, I need help in understanding the following:

1. I didn't find Stateful function definition and server code.
2. How should we deploy Stateful Function code

If you can please help with pointers on Deployment of Stateful function code 
and the GitHub code.

Regards
Himanshu

From: Tymur Yarosh 
Sent: Wednesday, May 25, 2022 9:53:20 PM
To: user@flink.apache.org ; Himanshu Sareen 

Subject: Re: Flink DataStream and remote Stateful Functions interoperability

Hi Himanshu,

The short answer is you should configure Stateful Functions in your job. Here 
is an example 
https://github.com/f1xmAn/era-locator/blob/34dc4f77539195876124fe604cf64c61ced4e5da/src/main/java/com/github/f1xman/era/StreamingJob.java#L68.

Check out this article on Flink DataStream and Stateful Functions 
interoperability 
https://medium.com/devoops-and-universe/realtime-detection-of-russian-crypto-phone-era-with-flink-datastream-and-stateful-functions-e77794fedc2a.

Best,
Tymur Yarosh
On 24 May 2022, 21:16 +0300, Himanshu Sareen , 
wrote:
Team,

I'm working on a POC where our existing Stateful Functions ( remote ) can 
interact with Datastream API.
https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/sdk/flink-datastream/

I started Flink cluster - ./bin/start-cluster.sh
Then I submitted the .jar to Flink.

However, on submitting only Embedded function is called by Datastream code.

I'm unable to invoke stateful functions as module.yaml is not loaded.

Can someone help me in understanding how can we deploy Stateful function code 
(module.yaml) and Datastream api code parllely on Flink cluster.


Regards
Himanshu



Re: flink rest 接口部分数据不返回,返回空,状态200正常。

2022-05-31 Thread yidan zhao
好的,谢谢,再遇到类似问题我试试。

Weihua Hu  于2022年5月31日周二 22:27写道:
>
> Hi, yidan
> /watermark 是通过 Flink 内部 metric 进行采集的,为了防止每次 api 请求都进行 metric query,Flink
> 在内部实现了缓存机制,真实 query 的间隔可以通过参数[1] 控制,默认是 10s。
> 在Flink 内部查询 metric 时,如果失败会保存空记录,体现到 API 上就是返回了空的 list,可以尝试开启 DEBUG
> 日志来确认是否是由于 query metric 失败导致的
>
> [1] metrics.fetcher.update-interval
>
> Best,
> Weihua
>
>
> On Fri, May 20, 2022 at 12:54 PM yidan zhao  wrote:
>
> > 部分任务估计是原先看过ui图,打开后相关数据都能看,但是数字不变。比如其中一个任务的输入节点部分:Records Sent
> > 504,685,253,这个数字就不变了(但任务实际是在处理数据的),看网络请求也的确固定一直返回这个数据。
> > 纯粹转圈不出数据的任务是新提交的任务。
> >
> > 按照以往,我重启jm可能解决这个问题。
> >
> > yidan zhao  于2022年5月20日周五 12:05写道:
> > >
> > > web ui图:https://s3.bmp.ovh/imgs/2022/05/20/dd142de9be3a2c99.png
> > > 网络视图:https://i.bmp.ovh/imgs/2022/05/20/f3c741b28bd208d4.png
> > >
> > > JM1(rest server leader) 异常日志:
> > > WARN  2022-05-20 12:02:12,523
> > > org.apache.flink.runtime.checkpoint.CheckpointsCleaner   - Could
> > > not properly discard completed checkpoint 22259.
> > > java.io.IOException: Directory
> > >
> > bos://flink-bucket/flink/default-checkpoints/bal_baiduid_ft_job/b03390c8295713fbd79f57f57a1e3bdb/chk-22259
> > > is not empty.
> > > at
> > org.apache.hadoop.fs.bos.BaiduBosFileSystem.delete(BaiduBosFileSystem.java:209)
> > > ~[bos-hdfs-sdk-1.0.1-SNAPSHOT-0.jar:?]
> > > at
> > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.delete(HadoopFileSystem.java:160)
> > > ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> > > at
> > org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:74)
> > > ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> > > at
> > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discard(CompletedCheckpoint.java:263)
> > > ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> > > at
> > org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:60)
> > > ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> > > at
> > org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanup$2(CheckpointsCleaner.java:85)
> > > ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> > > at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > > [?:1.8.0_251]
> > > at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > > [?:1.8.0_251]
> > > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
> > > INFO  2022-05-20 12:03:22,441
> > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator-
> > > Triggering checkpoint 21979 (type=CHECKPOINT) @ 1653019401517 for job
> > > 07950b109ab5c3a0ed8576673ab562f7.
> > > INFO  2022-05-20 12:03:31,061
> > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator-
> > > Completed checkpoint 21979 for job 07950b109ab5c3a0ed8576673ab562f7
> > > (1785911977 bytes in 9066 ms).
> > >
> > >
> > > 如上,我web-ui是开启的,所有是一直有请求刷的,不存在相关异常(当然本身从请求返回码200来看也不像是异常)。
> > >
> > > Shengkai Fang  于2022年5月20日周五 10:50写道:
> > > >
> > > > 你好,图挂了,应该是需要图床工具。
> > > >
> > > > 另外,能否贴一下相关的异常日志呢?
> > > >
> > > > Best,
> > > > Shengkai
> > > >
> > > > yidan zhao  于2022年5月20日周五 10:28写道:
> > > >
> > > > > UI视图:[image: 1.png].
> > > > >
> > > > > 网络视图:
> > > > > [image: image.png]
> > > > >
> > > > >
> > > > > 补充部分集群部署信息:
> > > > > (1)flink1.13,standalone集群,基于zk做的HA。3 jm,若干tm。
> > > > > (2)jm的rest api开启了ssl,基于 nginx
> > > > > 做了代理转发(但大概率不会是机制问题,因为不是百分百出现此问题,我集群其他任务都正常,都是运行一段时间后会出现)。
> > > > >  猜测:是否可能和运行一段时间后,出现jm进程挂掉,任务recover更换,rest jm的leader变换有关呢?
> > > > > 目前来看部分jm的日志偶尔存在ssl握手相关报错,但也挺奇怪。  注意:我web
> > > > > ui打开,看着jm的日志,是不出日志的(我是基于zk拿到leader,看leader jm的日志)。我web
> > > > > ui一直刷,理论上如果出错日志应该有相关报错,但实际没报错,报错和这个无关,都是ckpt吧啦的。
> > > > >
> >


Re: [External] Re: Exception when running Java UDF with Blink table planner

2022-05-31 Thread Tom Thornton
Hi all,

Thank you for the help.

It seems an exception thrown when Flink try to deserialize the object
> outputed by your udf. So is the obejct produced by your udf serializable?
> Does it contain any lambda function in the object/class?


The output object of the UDF is the string "foo" which should be
serializable. This exception only occurs when the input to the UDF is not
null. However, when the input is null, then the output object (which is
still the string "foo") does not cause any error or exception (i.e. it is
able to be serialized). There are no lambda functions in the output object
(it is just a string object).

Thanks,
Tom

On Thu, May 26, 2022 at 9:36 PM yuxia  wrote:

> It seems an exception thrown when Flink try to deserialize the object
> outputed by your udf. So is the obejct produced by your udf serializable?
> Does it contain any lambda function in the object/class?
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"Tom Thornton" 
> *收件人: *"User" 
> *发送时间: *星期五, 2022年 5 月 27日 上午 6:47:04
> *主题: *Exception when running Java UDF with Blink table planner
>
> We are migrating from the legacy table planner to the Blink table planner.
> Previously we had a UDF defined like this that worked without issue:
>
> public class ListToString extends DPScalarFunction {
> public String eval(List list) {
> return "foo";
> }
>
> Since moving to the Blink table planner and receiving this error:
>
> Caused by: org.apache.flink.table.api.ValidationException: Given parameters 
> of function 'ListToString' do not match any signature.
> Actual: (java.lang.String[])
> Expected: (java.util.List)
>
>
> We refactored the UDF to take as input an Object[] to match what is
> received from Blink:
>
> public class ListToString extends DPScalarFunction {
> public String eval(Object[] arr) {return "foo";
> }
> }
>
> Now the UDF always fails (including for the simplified example above where
> we return a constant string regardless of input). For example, when we run
> on a query like this one:
>
> SELECT ListToString(`col1`) as col1_string FROM `table`
>
> Produces an IndexOutOfBoundsException:
>
> Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for 
> length 0
>   at 
> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>   at 
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>   at 
> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>   at java.base/java.util.Objects.checkIndex(Objects.java:372)
>   at java.base/java.util.ArrayList.get(ArrayList.java:459)
>   at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570)
>   at 
> org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
>   at 
> org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700)
>   at 
> org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683)
>   at 
> org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175)
>   at 
> org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104)
>   at 
> org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128)
>   at 
> org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070)
>   at 
> org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406)
>   at StreamExecCalc$337.processElement(Unknown Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>   at SourceConversion$328.processElement(Unknown Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>

Re: [External] Re: Exception when running Java UDF with Blink table planner

2022-05-31 Thread Tom Thornton
Hi,

Thank you for your help. Here's the requested info:

Could you also tell us which Flink version you are using, the schema of the
> source table and some test data? With these info, we can debug in our local
> environment.
>

Flink version: 1.11.6.
Schema of the source table:
name | type | null | key | computed column | watermark |
col1 | ARRAY | true | | | |
Example input data to cause exception (i.e., anything that is not null):
{
  "col1": ["FooBar"]
}
If the input is instead null then there is no exception thrown:
{
  "col1": null
}

Thanks,
Tom

On Thu, May 26, 2022 at 7:47 PM Shengkai Fang  wrote:

> Hi.
>
> Could you also tell us which Flink version you are using, the schema of
> the source table and some test data? With these info, we can debug in our
> local environment.
>
> Best,
> Shengkai
>
> Tom Thornton  于2022年5月27日周五 06:47写道:
>
>> We are migrating from the legacy table planner to the Blink table
>> planner. Previously we had a UDF defined like this that worked without
>> issue:
>>
>> public class ListToString extends DPScalarFunction {
>> public String eval(List list) {
>> return "foo";
>> }
>>
>> Since moving to the Blink table planner and receiving this error:
>>
>> Caused by: org.apache.flink.table.api.ValidationException: Given parameters 
>> of function 'ListToString' do not match any signature.
>> Actual: (java.lang.String[])
>> Expected: (java.util.List)
>>
>>
>> We refactored the UDF to take as input an Object[] to match what is
>> received from Blink:
>>
>> public class ListToString extends DPScalarFunction {
>> public String eval(Object[] arr) {return "foo";
>> }
>> }
>>
>> Now the UDF always fails (including for the simplified example above
>> where we return a constant string regardless of input). For example, when
>> we run on a query like this one:
>>
>> SELECT ListToString(`col1`) as col1_string FROM `table`
>>
>> Produces an IndexOutOfBoundsException:
>>
>> Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for 
>> length 0
>>  at 
>> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>>  at 
>> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>>  at 
>> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>>  at java.base/java.util.Objects.checkIndex(Objects.java:372)
>>  at java.base/java.util.ArrayList.get(ArrayList.java:459)
>>  at 
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
>>  at 
>> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570)
>>  at 
>> org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
>>  at 
>> org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700)
>>  at 
>> org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683)
>>  at 
>> org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175)
>>  at 
>> org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104)
>>  at 
>> org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128)
>>  at 
>> org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070)
>>  at 
>> org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406)
>>  at StreamExecCalc$337.processElement(Unknown Source)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>>  at 
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>  at 
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>  at SourceConversion$328.processElement(Unknown Source)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>>  at 
>> 

Re: flink rest 接口部分数据不返回,返回空,状态200正常。

2022-05-31 Thread Weihua Hu
Hi, yidan
/watermark 是通过 Flink 内部 metric 进行采集的,为了防止每次 api 请求都进行 metric query,Flink
在内部实现了缓存机制,真实 query 的间隔可以通过参数[1] 控制,默认是 10s。
在Flink 内部查询 metric 时,如果失败会保存空记录,体现到 API 上就是返回了空的 list,可以尝试开启 DEBUG
日志来确认是否是由于 query metric 失败导致的

[1] metrics.fetcher.update-interval

Best,
Weihua


On Fri, May 20, 2022 at 12:54 PM yidan zhao  wrote:

> 部分任务估计是原先看过ui图,打开后相关数据都能看,但是数字不变。比如其中一个任务的输入节点部分:Records Sent
> 504,685,253,这个数字就不变了(但任务实际是在处理数据的),看网络请求也的确固定一直返回这个数据。
> 纯粹转圈不出数据的任务是新提交的任务。
>
> 按照以往,我重启jm可能解决这个问题。
>
> yidan zhao  于2022年5月20日周五 12:05写道:
> >
> > web ui图:https://s3.bmp.ovh/imgs/2022/05/20/dd142de9be3a2c99.png
> > 网络视图:https://i.bmp.ovh/imgs/2022/05/20/f3c741b28bd208d4.png
> >
> > JM1(rest server leader) 异常日志:
> > WARN  2022-05-20 12:02:12,523
> > org.apache.flink.runtime.checkpoint.CheckpointsCleaner   - Could
> > not properly discard completed checkpoint 22259.
> > java.io.IOException: Directory
> >
> bos://flink-bucket/flink/default-checkpoints/bal_baiduid_ft_job/b03390c8295713fbd79f57f57a1e3bdb/chk-22259
> > is not empty.
> > at
> org.apache.hadoop.fs.bos.BaiduBosFileSystem.delete(BaiduBosFileSystem.java:209)
> > ~[bos-hdfs-sdk-1.0.1-SNAPSHOT-0.jar:?]
> > at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.delete(HadoopFileSystem.java:160)
> > ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> > at
> org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:74)
> > ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> > at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discard(CompletedCheckpoint.java:263)
> > ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> > at
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:60)
> > ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> > at
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanup$2(CheckpointsCleaner.java:85)
> > ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > [?:1.8.0_251]
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > [?:1.8.0_251]
> > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
> > INFO  2022-05-20 12:03:22,441
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator-
> > Triggering checkpoint 21979 (type=CHECKPOINT) @ 1653019401517 for job
> > 07950b109ab5c3a0ed8576673ab562f7.
> > INFO  2022-05-20 12:03:31,061
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator-
> > Completed checkpoint 21979 for job 07950b109ab5c3a0ed8576673ab562f7
> > (1785911977 bytes in 9066 ms).
> >
> >
> > 如上,我web-ui是开启的,所有是一直有请求刷的,不存在相关异常(当然本身从请求返回码200来看也不像是异常)。
> >
> > Shengkai Fang  于2022年5月20日周五 10:50写道:
> > >
> > > 你好,图挂了,应该是需要图床工具。
> > >
> > > 另外,能否贴一下相关的异常日志呢?
> > >
> > > Best,
> > > Shengkai
> > >
> > > yidan zhao  于2022年5月20日周五 10:28写道:
> > >
> > > > UI视图:[image: 1.png].
> > > >
> > > > 网络视图:
> > > > [image: image.png]
> > > >
> > > >
> > > > 补充部分集群部署信息:
> > > > (1)flink1.13,standalone集群,基于zk做的HA。3 jm,若干tm。
> > > > (2)jm的rest api开启了ssl,基于 nginx
> > > > 做了代理转发(但大概率不会是机制问题,因为不是百分百出现此问题,我集群其他任务都正常,都是运行一段时间后会出现)。
> > > >  猜测:是否可能和运行一段时间后,出现jm进程挂掉,任务recover更换,rest jm的leader变换有关呢?
> > > > 目前来看部分jm的日志偶尔存在ssl握手相关报错,但也挺奇怪。  注意:我web
> > > > ui打开,看着jm的日志,是不出日志的(我是基于zk拿到leader,看leader jm的日志)。我web
> > > > ui一直刷,理论上如果出错日志应该有相关报错,但实际没报错,报错和这个无关,都是ckpt吧啦的。
> > > >
>


Re: Flink异步IO使用问题

2022-05-31 Thread Weihua Hu
Hi,

我在 ide 中尝试没有复现该问题,是可以使用 List 的。你代码中的 goodsDetailPage 是如何定义的?

Best,
Weihua


On Thu, May 26, 2022 at 8:59 PM lxk7...@163.com  wrote:

> 重发下图
>
> https://sm.ms/image/12XQHAOZdYoqraC
> https://sm.ms/image/zJ2gfmxvSc85Xl7
>
>
>
> lxk7...@163.com
>
> 发件人: lxk7...@163.com
> 发送时间: 2022-05-26 20:54
> 收件人: user-zh
> 主题: Flink异步IO使用问题
>
> 我在程序里使用了异步IO,但是好像识别不了这个list类型的数据
>
> lxk7...@163.com
>


Re: Flink UI in Application Mode

2022-05-31 Thread Weihua Hu
Hi Zain,
You can view the list of flink applications on the yarn web ui and choose
to jump to the specified Flink web ui.

Best,
Weihua


On Mon, May 23, 2022 at 7:07 PM Zain Haider Nemati 
wrote:

> Hi David,
> Thanks for your response.
> When submitting a job in application mode it gives a url at the end but
> that is different i.e. on different ports when you submit different jobs in
> application mode. Is there a port/ui where I can see the consolidated list
> of jobs running instead of checking each on a different port?
>
> Noted regarding the mailing list concern thankyou for letting me know !
>
>
> On Mon, May 23, 2022 at 1:49 PM David Morávek  wrote:
>
>> Hi Zain,
>>
>> you can find a link to web-ui either in the CLI output after the job
>> submission or in the YARN ResourceManager web ui [1]. With YARN Flink needs
>> to choose the application master port at random (could be somehow
>> controlled by setting _yarn.application-master.port_) as there might be
>> multiple JMs running on the same NodeManager.
>>
>> OT: I've seen you've opened multiple threads on both dev and user mailing
>> list. As these are all "user" related questions, can you please focus them
>> on the user ML only? Separating user & development (the Flink
>> contributions) threads into separate lists allows community to work more
>> efficiently.
>>
>> Best,
>> D.
>>
>> On Sun, May 22, 2022 at 7:44 PM Zain Haider Nemati <
>> zain.hai...@retailo.co> wrote:
>>
>>> Hi,
>>> Which port does flink UI run on in application mode?
>>> If I am running 5 yarn jobs in application mode would the UI be same for
>>> each or different ports for each?
>>>
>>


Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

2022-05-31 Thread Gorjan Todorovski
Hi,

I am running a TensorFlow Extended (TFX) pipeline which uses Apache Beam
for data processing which in turn has a Flink Runner (Basically a batch job
on a Flink Session Cluster on Kubernetes) version 1.13.6, but the job (for
gathering stats) gets stuck.

There is nothing significant in the Job Manager or Task Manager logs. The
only thing that possibly might tell why the task is stuck seems to be a
thread dump:

"MapPartition (MapPartition at [14]{TFXIORead[train],
GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on
java.util.concurrent.CompletableFuture$Signaller@6f078632
at sun.misc.Unsafe.park(Native Method)
- waiting on java.util.concurrent.CompletableFuture$Signaller@6f078632
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block(
CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323
)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture
.java:1742)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:
1908)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at org.apache.beam.runners.fnexecution.control.
SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:
504)
...
I use 32 parallel degrees. Task managers are set, so each TM runs in one
container with 1 CPU and a total process memory set to 20 GB. Each TM runs
1 tasks slot.

This is failing with ~100 files with a total size of about 100 GB. If I run
the pipeline with a smaller number of files to process, it runs ok.

I need Flink to be able to process different amounts of data as it is able
to scale by automatically adding pods depending on the parallel degree
setting for the specific job (I set the parallel degree to the max(number
of files,32))

Thanks,
Gorjan


Re: FileSource SourceReader failure scenario

2022-05-31 Thread Meghajit Mazumdar
Thanks Qingsheng ! This answers my doubt.

Regards,
Meghajit

On Tue, May 31, 2022 at 3:03 PM Qingsheng Ren  wrote:

> Hi Meghajit,
>
> Good question! To make a short answer: splits won’t be returned back to
> enumerator by reader once they are assigned and *checkpointed*.
>
> As described by the JavaDoc of SplitEnumerator#addSplitsBack [1]:
>
> > Add a split back to the split enumerator. It will only happen when a
> SourceReader fails and there are splits assigned to it after the last
> successful checkpoint.
>
> Suppose we have split A and reader 0, and we have a flow like this:
>
> Checkpoint 100 -> Split assignment (A -> 0) -> Checkpoint 101
>
> After checkpoint 101 the state of split A will be managed by reader 0,
> which means if the reader fails and rolls back to checkpoint 101, the state
> of split A should be recovered by reader instead of returning to the
> enumerator because the split has been delivered to the reader and
> successfully stored into the reader’s checkpoint 101. But if reader 0 fails
> before checkpoint 101 and rolls back to 100, reader 0 is not aware of the
> assignment of split A, then A will be added back to the enumerator and be
> assigned again.
>
> In a nulshell, if a split is assigned to a reader and a checkpoint is made
> successfully, it should be reader’s responsibility to handle the state and
> recover, and the split won’t be returned to the enumerator. A split won’t
> be duplicately assigned or read under this pattern.
>
> Hope this is helpful!
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html
>
> Cheers,
>
> Qingsheng
>
>
> > On May 31, 2022, at 16:29, Meghajit Mazumdar <
> meghajit.mazum...@gojek.com> wrote:
> >
> > Hello,
> >
> > I had a question with regards to the behaviour of FileSource and
> SourceReader in cases of failures. Let me know if I missed something
> conceptually.
> >
> > We are running a Parquet File Source. Let's say, we supply the source
> with a directory path containing 5 files and the Flink job is configured to
> run with a parallelism of 2.
> >
> > When the job starts, 2 SourceReaders are created and when they ask for
> splits, the split assigner assigns them one file each, which they start
> processing.
> >
> > Now, the documentation of FileSplitAssigner.addSplits method says the
> following:
> > Adds a set of splits to this assigner. This happens for example when
> some split processing failed and the splits need to be re-added, or when
> new splits got discovered.
> >
> > I understand this means that un-processed splits or splits that were not
> processed completely due to some error with the SourceReader get added back
> to the split assigner to be re-assigned to some other SourceReader.
> >
> > However, the documentation of FileRecordFormat.restoreReader has this
> statement written:
> > Restores a reader from a checkpointed position. This method is called
> when the reader is recovered from a checkpoint and the reader has
> previously stored an offset into the checkpoint, by returning from the
> FileRecordFormat.Reader.getCheckpointedPosition() a value with non-negative
> offset. That value is supplied as the restoredOffset.
> >
> > I am somewhat confused by these 2 documentation statements. If the split
> is added back to the split assigner when split processing got failed by a
> SourceReader (maybe due to some exception or fatal error), then the split
> could be re-assigned to any other SourceReader next. Even if the failed
> SourceReader comes back and starts processing the file from the last
> checkpointed offset, there would be duplicate processing as the file could
> have been assigned to somebody else in the meantime. Then what is the
> purpose of `restoreReader` ? Or, am I missing something ?
> >
> > --
> > Regards,
> > Meghajit
>
>

-- 
*Regards,*
*Meghajit*


Re: Status of File Sink Common (flink-file-sink-common)

2022-05-31 Thread Jing Ge
Hi,

Afaik, there are still a lot of unit tests depending on it. I don't think
we can drop it before dropping all of these unit tests.

Best regards,
Jing

On Tue, May 31, 2022 at 8:10 AM Yun Gao  wrote:

> Hi Jun,
>
> I think the release notes should only include the issues that cause changes
> visible to users. Also I think by design flink-file-sink-common should not
> be
> used directly by users and it only serve as a shared module by the legacy
> StreamingFileSink and the new FileSink.
>
> Best,
> Yun
>
>
> --
> From:yuxia 
> Send Time:2022 May 31 (Tue.) 09:14
> To:Jun Qin 
> Cc:User 
> Subject:Re: Status of File Sink Common (flink-file-sink-common)
>
> I'm afraid not. I can still find it in main repository[1].
> [1]
> https://github.com/apache/flink/tree/master/flink-connectors/flink-file-sink-common
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Jun Qin" 
> 收件人: "User" 
> 发送时间: 星期二, 2022年 5 月 31日 上午 5:24:10
> 主题: Status of File Sink Common (flink-file-sink-common)
>
> Hi,
>
> Has File Sink Common (flink-file-sink-common) been dropped? If so, since
> which version? I do not seem to find anything related in the release notes
> of 1.13.x, 1.14.x and 1.15.0.
>
> Thanks
> Jun
>
>
>


Re: FileSource SourceReader failure scenario

2022-05-31 Thread Qingsheng Ren
Hi Meghajit,

Good question! To make a short answer: splits won’t be returned back to 
enumerator by reader once they are assigned and *checkpointed*. 

As described by the JavaDoc of SplitEnumerator#addSplitsBack [1]:

> Add a split back to the split enumerator. It will only happen when a 
> SourceReader fails and there are splits assigned to it after the last 
> successful checkpoint.

Suppose we have split A and reader 0, and we have a flow like this:

Checkpoint 100 -> Split assignment (A -> 0) -> Checkpoint 101

After checkpoint 101 the state of split A will be managed by reader 0, which 
means if the reader fails and rolls back to checkpoint 101, the state of split 
A should be recovered by reader instead of returning to the enumerator because 
the split has been delivered to the reader and successfully stored into the 
reader’s checkpoint 101. But if reader 0 fails before checkpoint 101 and rolls 
back to 100, reader 0 is not aware of the assignment of split A, then A will be 
added back to the enumerator and be assigned again.

In a nulshell, if a split is assigned to a reader and a checkpoint is made 
successfully, it should be reader’s responsibility to handle the state and 
recover, and the split won’t be returned to the enumerator. A split won’t be 
duplicately assigned or read under this pattern. 

Hope this is helpful!

[1] 
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html

Cheers, 

Qingsheng


> On May 31, 2022, at 16:29, Meghajit Mazumdar  
> wrote:
> 
> Hello,
> 
> I had a question with regards to the behaviour of FileSource and SourceReader 
> in cases of failures. Let me know if I missed something conceptually.
> 
> We are running a Parquet File Source. Let's say, we supply the source with a 
> directory path containing 5 files and the Flink job is configured to run with 
> a parallelism of 2.
> 
> When the job starts, 2 SourceReaders are created and when they ask for 
> splits, the split assigner assigns them one file each, which they start 
> processing.
> 
> Now, the documentation of FileSplitAssigner.addSplits method says the 
> following: 
> Adds a set of splits to this assigner. This happens for example when some 
> split processing failed and the splits need to be re-added, or when new 
> splits got discovered.
> 
> I understand this means that un-processed splits or splits that were not 
> processed completely due to some error with the SourceReader get added back 
> to the split assigner to be re-assigned to some other SourceReader.
> 
> However, the documentation of FileRecordFormat.restoreReader has this 
> statement written:
> Restores a reader from a checkpointed position. This method is called when 
> the reader is recovered from a checkpoint and the reader has previously 
> stored an offset into the checkpoint, by returning from the 
> FileRecordFormat.Reader.getCheckpointedPosition() a value with non-negative 
> offset. That value is supplied as the restoredOffset.
> 
> I am somewhat confused by these 2 documentation statements. If the split is 
> added back to the split assigner when split processing got failed by a 
> SourceReader (maybe due to some exception or fatal error), then the split 
> could be re-assigned to any other SourceReader next. Even if the failed 
> SourceReader comes back and starts processing the file from the last 
> checkpointed offset, there would be duplicate processing as the file could 
> have been assigned to somebody else in the meantime. Then what is the purpose 
> of `restoreReader` ? Or, am I missing something ?
> 
> -- 
> Regards,
> Meghajit



FileSource SourceReader failure scenario

2022-05-31 Thread Meghajit Mazumdar
Hello,

I had a question with regards to the behaviour of FileSource and
SourceReader in cases of failures. Let me know if I missed something
conceptually.

We are running a Parquet File Source. Let's say, we supply the source with
a directory path containing 5 files and the Flink job is configured to run
with a parallelism of 2.

When the job starts, 2 SourceReaders are created and when they ask for
splits, the split assigner assigns them one file each, which they start
processing.

Now, the documentation of FileSplitAssigner.addSplits

method says the following:
*Adds a set of splits to this assigner. This happens for example when some
split processing failed and the splits need to be re-added, or when new
splits got discovered.*

I understand this means that un-processed splits or splits that were not
processed completely due to some error with the SourceReader get added back
to the split assigner to be re-assigned to some other SourceReader.

However, the documentation of FileRecordFormat.restoreReader

has
this statement written:
*Restores a reader from a checkpointed position. This method is called when
the reader is recovered from a checkpoint and the reader has previously
stored an offset into the checkpoint, by returning from the
FileRecordFormat.Reader.getCheckpointedPosition() a value with non-negative
offset. That value is supplied as the restoredOffset.*

I am somewhat confused by these 2 documentation statements. If the split is
added back to the split assigner when split processing got failed by a
SourceReader (maybe due to some exception or fatal error), then the split
could be re-assigned to any other SourceReader next. Even if the failed
SourceReader comes back and starts processing the file from the last
checkpointed offset, there would be duplicate processing as the file could
have been assigned to somebody else in the meantime. Then what is the
purpose of `restoreReader` ? Or, am I missing something ?

-- 
*Regards,*
*Meghajit*


RE: Can we use CheckpointedFunction with the new Source api?

2022-05-31 Thread Qing Lim
Hi Qingsheng, thanks for getting back.

I manage to find a workaround, but if you can provide other suggestions it'd be 
great too.

I followed the documentation here: 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/

I implemented a Custom Source that emits all changes from a Zookeeper node 
(recursively), I modelled it such that everytime there's a change in the tree, 
it will produce a split for the specific node, then the reader is responsible 
to fetch the latest state of the node, and emit.
This works fine, but we have a use case to emit the whole tree (recursively), 
conceptually it is quite simple, I just need to accumulate the whole tree as 
State, and then emit the State whenever it updates.

I was trying to achieve this inside the Source, because this is meant to be 
something reusable within our organization, which is why I asked the original 
question, but I have now solved it by implementing a stateful Map function 
instead, it is a bit less ergonomic, but acceptable on my end. So if you have 
an alternative, please share with me, thank you.

Kind regards

-Original Message-
From: Qingsheng Ren  
Sent: 31 May 2022 03:57
To: Qing Lim 
Cc: user@flink.apache.org
Subject: Re: Can we use CheckpointedFunction with the new Source api?

Hi Qing,

I’m afraid CheckpointedFunction cannot be applied to the new source API, but 
could you share the abstractions of your source implementation, like which 
component a split maps to etc.? Maybe we can try to do some workarounds. 

Best, 

Qingsheng

> On May 30, 2022, at 20:09, Qing Lim  wrote:
> 
> Hi, is it possible to use CheckpointedFunction with the new Source api? (The 
> one in package org.apache.flink.api.connector.source)
>  
> My use case:
>  
> I have a custom source that emit individual nodes update from a tree, and I 
> wish to create a stream of the whole Tree snapshots, so I will have to 
> accumulate all updates and keep it as state. In addition to this, I wish to 
> expose this functionality as a library to my organization.
>  
> The custom source is written using the new Source api, I wonder if we can 
> attach state to it?
>  
> Kind regards
>  
> This e-mail and any attachments are confidential to the addressee(s) and may 
> contain information that is legally privileged and/or confidential. If you 
> are not the intended recipient of this e-mail you are hereby notified that 
> any dissemination, distribution, or copying of its content is strictly 
> prohibited. If you have received this message in error, please notify the 
> sender by return e-mail and destroy the message and all copies in your 
> possession.
> 
> 
> To find out more details about how we may collect, use and share your 
> personal information, please see https://www.mwam.com/privacy-policy. This 
> includes details of how calls you make to us may be recorded in order for us 
> to comply with our legal and regulatory obligations.
> 
> 
> To the extent that the contents of this email constitutes a financial 
> promotion, please note that it is issued only to and/or directed only at 
> persons who are professional clients or eligible counterparties as defined in 
> the FCA Rules. Any investment products or services described in this email 
> are available only to professional clients and eligible counterparties. 
> Persons who are not professional clients or eligible counterparties should 
> not rely or act on the contents of this email.
> 
> 
> Marshall Wace LLP is authorised and regulated by the Financial Conduct 
> Authority. Marshall Wace LLP is a limited liability partnership registered in 
> England and Wales with registered number OC302228 and registered office at 
> George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
> e-mail as a client, or an investor in an investment vehicle, managed or 
> advised by Marshall Wace North America L.P., the sender of this e-mail is 
> communicating with you in the sender's capacity as an associated or related 
> person of Marshall Wace North America L.P. ("MWNA"), which is registered with 
> the US Securities and Exchange Commission ("SEC") as an investment adviser.  
> Registration with the SEC does not imply that MWNA or its employees possess a 
> certain level of skill or training.
> 



Re: Status of File Sink Common (flink-file-sink-common)

2022-05-31 Thread Yun Gao
Hi Jun,

I think the release notes should only include the issues that cause changes
visible to users. Also I think by design flink-file-sink-common should not be 
used directly by users and it only serve as a shared module by the legacy
StreamingFileSink and the new FileSink. 

Best,
Yun



--
From:yuxia 
Send Time:2022 May 31 (Tue.) 09:14
To:Jun Qin 
Cc:User 
Subject:Re: Status of File Sink Common (flink-file-sink-common)

I'm afraid not. I can still find it in main repository[1].
[1] 
https://github.com/apache/flink/tree/master/flink-connectors/flink-file-sink-common

Best regards,
Yuxia

- 原始邮件 -
发件人: "Jun Qin" 
收件人: "User" 
发送时间: 星期二, 2022年 5 月 31日 上午 5:24:10
主题: Status of File Sink Common (flink-file-sink-common) 

Hi,  

Has File Sink Common (flink-file-sink-common) been dropped? If so, since which 
version? I do not seem to find anything related in the release notes of 1.13.x, 
1.14.x and 1.15.0.

Thanks
Jun