Re: 关于无法被看作POJO的类型,怎么转变可以让其不当作GenericType呢?

2021-03-31 Thread Lin Hou
你好,请问一下,这个问题是怎么解决的啊?

赵一旦  于2021年2月3日周三 下午1:59写道:

> 我看Flink的要求是public,每个属性要么public,要么有getter/setter。估计内嵌的属性也会递归检查的。
>
> ℡小新的蜡笔不见嘞、 <1515827...@qq.com> 于2021年2月3日周三 下午1:52写道:
>
> > 你好,我们是否可以通过对该类LinkedHashMap进行包装来实现当前功能呢?如果你需要PojoSerializer来序列化数据的话。
> >
> >
> >
> >
> > --原始邮件--
> > 发件人: "赵一旦" > 发送时间: 2021年2月3日(星期三) 中午1:24
> > 收件人: "user-zh" > 主题: 关于无法被看作POJO的类型,怎么转变可以让其不当作GenericType呢?
> >
> >
> >
> > 如题,按照flink对POJO的定义,感觉还是比较严格的。
> >
> >
> 我有个类是继承了LinkedHashMap的,就被当作GenericType了。像这种情况,我没办法去修改LinkedHashMap实现,同时也不好不继承。因为我一个实体是动态扩展,不清楚有多少属性的,需要json方式反序列化到Map类型上。
>


Re: PyFlink: Extract nested fields from JSON array

2021-03-31 Thread Dian Fu
Hi Sumeet,

I think it should be a bug and I have created a ticket 
https://issues.apache.org/jira/browse/FLINK-22082 
 as the following up.

Regards,
Dian


> 2021年4月1日 下午12:25,Guowei Ma  写道:
> 
> Hi, Sumeet
> 
> I am not an expert about PyFlink. But I think @Dian Fu 
>   might give some insight about this problem.
> 
> Best,
> Guowei
> 
> 
> On Thu, Apr 1, 2021 at 12:12 AM Sumeet Malhotra  > wrote:
> Cross posting from StackOverlow here:
> 
> https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array
>  
> 
> 
> Any pointers are appreciated!
> 
> Thanks,
> Sumeet



Flink Taskmanager failure recovery and large state

2021-03-31 Thread Yaroslav Tkachenko
Hi everyone,

I'm wondering if people have experienced issues with Taskmanager failure
recovery when dealing with a lot of state.

I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints and
checkpoints. ~150 task managers with 4 slots each.

When I run a pipeline without much state and kill one of the
taskmanagers, it takes a few minutes to recover (I see a few restarts), but
eventually when a new replacement taskmanager is registered with the
jobmanager things go back to healthy.

But when I run a pipeline with a lot of state (1TB+) and kill one of the
taskmanagers, the pipeline never recovers, even after the replacement
taskmanager has joined. It just enters an infinite loop of restarts and
failures.

On the jobmanager, I see an endless loop of state transitions: RUNNING
-> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING.
It stays in RUNNING for a few seconds, but then transitions into FAILED
with a message like this:


22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph - 
(569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to
FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
readAddress(..) failed: Connection reset by peer (connection to '
10.30.10.53/10.30.10.53:45789')
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
readAddress(..) failed: Connection reset by peer


Which, I guess, means a failed Taskmanager. And since there are not enough
task slots to run it goes into this endless loop again. It's never the same
Taskmanager that fails.



On the Taskmanager side, things look more interesting. I see a variety of
exceptions:


org.apache.flink.runtime.taskmanager.Task -  (141/624)#7
(6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution
attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.


also


WARNING: Failed read retry #1/10 for
'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'.
Sleeping...
java.nio.channels.ClosedByInterruptException
at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown
Source)
at
java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown
Source)
at
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
at
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
at java.base/java.io.DataInputStream.read(Unknown Source)
at
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at java.base/java.io.InputStream.read(Unknown Source)
at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
...


and


SEVERE: Interrupted while sleeping before retry. Giving up after 1/10
retries for
'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
20:52:46.894 [ (141/624)#7] ERROR
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder -
Caught unexpected exception.
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140)
~[flink-dist_2.12-1.12.0.jar:1.12.0]


also


20:52:46.895 [ (141/624)#7] WARN
 org.apache.flink.streaming.api.operators.BackendRestorerProcedure -
Exception while restoring keyed state backend for
KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from
alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected
exception.
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
...


and a few of


Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to download
data for state handles.
at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: 

Re: [讨论] Flink Connector 并行写入数据方案

2021-03-31 Thread Shengkai Fang
Hi jie.

User mail list 更多是用来讨论使用中的问题,请将关于dev相关的问题转发到d...@flink.apache.org

详情可以参考[1]

[1] https://flink.apache.org/community.html

jie mei  于2021年3月31日周三 下午3:03写道:

> Hi, Community
>
> 我想发起一个初步的讨论,关于如何扩大 Flink 写入数据的并行度,并且能够分表,可选事务支持的方案。
>
> 该方案应该支持三种场景:
>
> 1) 至少一次: 不支持或者有限支持 update 的存储,通常通过查询去重。 例如 ClickHouse
> 2) 相同主键或分区内有序: 支持 Upsert,但不支持事务或者跨行事务的存储,例如 ElasticSearch, MongoDB
> 3) 事务:支持跨行事务的存储,例如 MySQL。
>
> 另外说一下,第二种情况和第三种情况的一个重要区别是,当 CheckPoint 失败,第二种情况会从上一个快照重新执行,
> 那么会存在旧的数据可能覆盖新的数据的情况。举个例子: 假设正常情况下记录A在某个快照区间取值为
> A1, A2, A3。假如在写入 A2 后快照失败,当重新执行的时候,会短暂的存在这种情况,A1 覆盖了 A2 的值。
>
> 下面是不同场景扩大并行度的方案
> 1) 至少一次:
> 在这种场景下,数据乱顺是可容忍的,只要保证最少一次,就能达到最终一致性。可以考虑多线程异步写入数据,
> 当异步任务过多,则等待有异步任务完成,再执行新的异步写入任务。CheckPoint需要保证所有异步任务完成
>
> 2) 相同主键或分区内有序,最少一次:
> 在这种场景下,如果指定了分区字段,可以将相同分区的数据放到一个 Buffer 里,相同 Buffer 的数据有序,
> 不同 Buffer的数据并行写入,CheckPoint的时候需要保证所有数据写入;如果没有分区,单指定了主键,可以
> 根据主键的 Hash Code 对 Sink 并行读取模,得到的值用于决定数据缓存到哪一个 Buffer,同样相同的 Buffer
> 内有序,不同的 Buffer 并行。
>
> 3) 事务:
> 由于已经有了通用的 Sink API,可以考虑把数据缓存到 Buffer, 在 CheckPoint 的时候,开启事务,完成写入数据,并提交。
> [FLIP-143]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
>
> 分表:
> 对于 MySQL, MongoDB 这类存储,可以通过分区键来定义分表规则,假如表 A 定义了分区键 B,B 有 B1, B2 两个取值,
> 那么得到两个分表 A_B1, A_B2.
>
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>


Re: PyFlink: Extract nested fields from JSON array

2021-03-31 Thread Guowei Ma
Hi, Sumeet

I am not an expert about PyFlink. But I think @Dian Fu
  might give some insight about this problem.

Best,
Guowei


On Thu, Apr 1, 2021 at 12:12 AM Sumeet Malhotra 
wrote:

> Cross posting from StackOverlow here:
>
>
> https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array
>
> Any pointers are appreciated!
>
> Thanks,
> Sumeet
>


Re: [DISCUSS] Feature freeze date for 1.13

2021-03-31 Thread Kurt Young
Hi Guowei and Dawid,

I want to request the permission to merge this feature [1], it's a useful
improvement to sql client and won't affect
other components too much. We were plan to merge it yesterday but met some
tricky multi-process issue which
has a very high possibility hanging the tests. It took us a while to find
out the root cause and fix it.

Since it's not too far away from feature freeze and RC0 also not created
yet, thus I would like to include this
in 1.13.

[1] https://issues.apache.org/jira/browse/FLINK-20320

Best,
Kurt


On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma  wrote:

> Hi, community:
>
> Friendly reminder that today (3.31) is the last day of feature
> development. Under normal circumstances, you will not be able to submit new
> features from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for
> testing, welcome to help test together.
> After the test is relatively stable, we will cut the release-1.13 branch.
>
> Best,
> Dawid & Guowei
>
>
> On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann 
> wrote:
>
>> +1 for the 31st of March for the feature freeze.
>>
>> Cheers,
>> Till
>>
>> On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger 
>> wrote:
>>
>> > +1 for March 31st for the feature freeze.
>> >
>> >
>> >
>> > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz <
>> dwysakow...@apache.org>
>> > wrote:
>> >
>> > > Thank you Thomas! I'll definitely check the issue you linked.
>> > >
>> > > Best,
>> > >
>> > > Dawid
>> > >
>> > > On 23/03/2021 20:35, Thomas Weise wrote:
>> > > > Hi Dawid,
>> > > >
>> > > > Thanks for the heads up.
>> > > >
>> > > > Regarding the "Rebase and merge" button. I find that merge option
>> > useful,
>> > > > especially for small simple changes and for backports. The following
>> > > should
>> > > > help to safeguard from the issue encountered previously:
>> > > > https://github.com/jazzband/pip-tools/issues/1085
>> > > >
>> > > > Thanks,
>> > > > Thomas
>> > > >
>> > > >
>> > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
>> > dwysakow...@apache.org
>> > > >
>> > > > wrote:
>> > > >
>> > > >> Hi devs, users!
>> > > >>
>> > > >> 1. *Feature freeze date*
>> > > >>
>> > > >> We are approaching the end of March which we agreed would be the
>> time
>> > > for
>> > > >> a Feature Freeze. From the knowledge I've gather so far it still
>> seems
>> > > to
>> > > >> be a viable plan. I think it is a good time to agree on a
>> particular
>> > > date,
>> > > >> when it should happen. We suggest *(end of day CEST) March 31st*
>> > > >> (Wednesday next week) as the feature freeze time.
>> > > >>
>> > > >> Similarly as last time, we want to create RC0 on the day after the
>> > > feature
>> > > >> freeze, to make sure the RC creation process is running smoothly,
>> and
>> > to
>> > > >> have a common testing reference point.
>> > > >>
>> > > >> Having said that let us remind after Robert & Dian from the
>> previous
>> > > >> release what it a Feature Freeze means:
>> > > >>
>> > > >> *B) What does feature freeze mean?*After the feature freeze, no new
>> > > >> features are allowed to be merged to master. Only bug fixes and
>> > > >> documentation improvements.
>> > > >> The release managers will revert new feature commits after the
>> feature
>> > > >> freeze.
>> > > >> Rational: The goal of the feature freeze phase is to improve the
>> > system
>> > > >> stability by addressing known bugs. New features tend to introduce
>> new
>> > > >> instabilities, which would prolong the release process.
>> > > >> If you need to merge a new feature after the freeze, please open a
>> > > >> discussion on the dev@ list. If there are no objections by a PMC
>> > member
>> > > >> within 48 (workday)hours, the feature can be merged.
>> > > >>
>> > > >> 2. *Merge PRs from the command line*
>> > > >>
>> > > >> In the past releases it was quite frequent around the Feature
>> Freeze
>> > > date
>> > > >> that we ended up with a broken main branch that either did not
>> compile
>> > > or
>> > > >> there were failing tests. It was often due to concurrent merges to
>> the
>> > > main
>> > > >> branch via the "Rebase and merge" button. To overcome the problem
>> we
>> > > would
>> > > >> like to suggest only ever merging PRs from a command line. Thank
>> you
>> > > >> Stephan for the idea! The suggested workflow would look as follows:
>> > > >>
>> > > >>1. Pull the change and rebase on the current main branch
>> > > >>2. Build the project (e.g. from IDE, which should be faster than
>> > > >>building entire project from cmd) -> this should ensure the
>> project
>> > > compiles
>> > > >>3. Run the tests in the module that the change affects -> this
>> > should
>> > > >>greatly minimize the chances of failling tests
>> > > >>4. Push the change to the main branch
>> > > >>
>> > > >> Let us know what you think!
>> > > >>
>> > > >> Best,
>> > > >>
>> > > >> Guowei & Dawid
>> > > >>
>> > > >>
>> > > >>
>> > >
>> > >
>> >
>>
>


Re: Measuring the Size of State, Savepoint Size vs. Restore time

2021-03-31 Thread Guowei Ma
Hi, Kevin

If you use the RocksDB and want to know the data on the disk I think that
is the right metric. But the SST files might include some expired data.
Some data in memory is not included in the SST files yet. In general I
think it could reflect the state size of your application.

I think that there is no metric for the time that spends on restoring from
a savepoint.

As for why there is a huge difference between the size of sst and the size
of savepoint, I think @Yun can give some detailed insights.

Best,
Guowei


On Thu, Apr 1, 2021 at 1:38 AM Kevin Lam  wrote:

> Hi all,
>
> We're interested in doing some analysis on how the size of our savepoints
> and state affects the time it takes to restore from a savepoint. We're
> running Flink 1.12 and using RocksDB as a state backend, on Kubernetes.
>
> What is the best way to measure the size of a Flink Application's state?
> Is state.backend.rocksdb.metrics.total-sst-files-size
> 
> the right thing to look at?
>
> We tried looking at state.backend.rocksdb.metrics.total-sst-files-size for
> all our operators, after restoring from a savepoint, and we noticed that
> the sum of all the sst files sizes is much much smaller than the total size
> of our savepoint (7GB vs 10TB).  Where does that discrepancy come from?
>
> Do you have any general advice on correlating savepoint size with restore
> times?
>
> Thanks in advance!
>


Re: How to specific key serializer

2021-03-31 Thread 陳昌倬
On Wed, Mar 31, 2021 at 05:33:19PM +0800, Tzu-Li (Gordon) Tai wrote:
> You can try using TypeInfo annotations to specify a TypeInformationFactory
> for your key class [1].
> This allows you to "plug-in" the TypeInformation extracted by Flink for a
> given class. In that custom TypeInformation, you should let it return the
> correct serializer.

Hi Gordon,

Thanks for the tip. We have solve the problem by specific
TypeInformation in readKeyedState.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Flink 1.12.2 sql api 使用parquet格式报错

2021-03-31 Thread Luna Wong
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/parquet.html
Parquet你要下这个Jar包放在你flink/lib目录的。

Luna Wong  于2021年4月1日周四 上午10:45写道:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/parquet.html
>
> 太平洋 <495635...@qq.com> 于2021年4月1日周四 上午10:26写道:
> >
> > 使用 parquet 还需要手段添加其他相关的依赖吗?
> >
> >
> > 环境和报错信息如下:
> >
> >
> > Flink 版本: 1.12.2
> > 部署方式: standalone kubernetes session
> > 添加的相关依赖
> > > 
> >  > 
> >  >  >  >
> >
> >
> > 错误信息:
> > Caused by: org.apache.flink.table.api.ValidationException: Could not find 
> > any format factory for identifier 'parquet' in the classpath. at 
> > org.apache.flink.table.filesystem.FileSystemTableSink. >  at 
> > org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSink(FileSystemTableFactory.java:85)


Re: Flink 1.12.2 sql api 使用parquet格式报错

2021-03-31 Thread Luna Wong
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/parquet.html

太平洋 <495635...@qq.com> 于2021年4月1日周四 上午10:26写道:
>
> 使用 parquet 还需要手段添加其他相关的依赖吗?
>
>
> 环境和报错信息如下:
>
>
> Flink 版本: 1.12.2
> 部署方式: standalone kubernetes session
> 添加的相关依赖
> 
>  
>   
>
>
> 错误信息:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
> format factory for identifier 'parquet' in the classpath. at 
> org.apache.flink.table.filesystem.FileSystemTableSink.  at 
> org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSink(FileSystemTableFactory.java:85)


Flink 1.12.2 sql api ????parquet????????

2021-03-31 Thread ??????
 parquet 





Flink ?? 1.12.2
?? standalone kubernetes session
??
   

Flink 1.12.2 sql api 使用parquet格式报错

2021-03-31 Thread 霍米会
使用 parquet 还需要手段添加其他相关的依赖吗?


环境和报错信息如下:


Flink 版本: 1.12.2
部署方式: standalone kubernetes session
添加的相关依赖

  org.apache.flink
flink-parquet_2.11
1.12.2



错误信息:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
format factory for identifier 'parquet' in the classpath. at 
org.apache.flink.table.filesystem.FileSystemTableSink.(FileSystemTableSink.java:124)
 at 
org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSink(FileSystemTableFactory.java:85)

退订

2021-03-31 Thread zhaorui_9...@163.com
退订


zhaorui_9...@163.com


Re: s3 FileSystem Error "s3 file system implementation does not support recoverable writers"

2021-03-31 Thread Guowei Ma
Hi, Robert
I think you could try to change the "s3://argo-artifacts/" to "
s3a://argo-artifacts/".
It is because that currently `StreamingFileSink` only supports Hadoop based
s3 but not Presto based s3. [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html#important-considerations

Best,
Guowei


On Thu, Apr 1, 2021 at 3:56 AM Robert Cullen  wrote:

> I’m using a local instance of MINIO on my kubernetes cluster for
> checkpoint/savepoint storage. I’m using this StreamingFileSync
> configuration:
>
>
> final StreamingFileSink> sink =
> StreamingFileSink.forRowFormat(
> new Path("s3://argo-artifacts/"),
> new SimpleStringEncoder Long>>("UTF-8"))
> .withBucketAssigner(new KeyBucketAssigner())
> .withRollingPolicy(OnCheckpointRollingPolicy.build())
> .withOutputFileConfig(config)
> .build();
>
> Anyone know how to fix this exception?
>
> java.lang.UnsupportedOperationException: This s3 file system implementation 
> does not support recoverable writers.
> at 
> org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136)
>  ~[?:?]
> at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:132)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:70)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:288)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:298)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:469)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>
> --
> Robert Cullen
> 240-475-4490
>


Re: IO benchmarking

2021-03-31 Thread deepthi Sridharan
Thanks, Matthias. This is very helpful.

Regarding the checkpoint documentation, I was mostly looking for
information on how states from various tasks get serialized into one (or
more?) files on persistent storage. I'll check out the code pointers!

On Wed, Mar 31, 2021 at 7:07 AM Matthias Pohl 
wrote:

> Hi Deepthi,
> 1. Have you had a look at flink-benchmarks [1]? I haven't used it but it
> might be helpful.
> 2. Unfortunately, Flink doesn't provide metrics like that. But you might
> want to follow FLINK-21736 [2] for future developments.
> 3. Is there anything specific you are looking for? Unfortunately, I don't
> know any blogs for a more detailed summary. If you plan to look into the
> code CheckpointCoordinator [3] might be a starting point. Alternatively,
> something like MetadataV2V3SerializerBase [4] offers insights into how the
> checkpoints' metadata is serialized.
>
> Best,
> Matthias
>
> [1] https://github.com/apache/flink-benchmarks
> [2] https://issues.apache.org/jira/browse/FLINK-21736
> [3]
> https://github.com/apache/flink/blob/11550edbd4e1874634ec441bde4fe4952fc1ec4e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1493
> [4]
> https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
>
> On Tue, Mar 30, 2021 at 8:37 PM deepthi Sridharan <
> deepthi.sridha...@gmail.com> wrote:
>
>> Hi,
>>
>> I am trying to set up some benchmarking with a couple of IO options for
>> saving checkpoints and have a couple of questions :
>>
>> 1. Does flink come with any IO benchmarking tools? I couldn't find any. I
>> was hoping to use those to derive some insights about the storage
>> performance and extrapolate it for the checkpoint use case.
>>
>> 2. Are there any metrics pertaining to restore from checkpoints? The only
>> metric I can find is the last restore time, but neither the time it took to
>> read the checkpoints, nor the time it took to restore the operator/task
>> states seem to be covered. I am using RocksDB, but couldn't find any
>> metrics relating to how much time it took to restore the state backend from
>> rocksdb either.
>>
>> 3. I am trying to find documentation on how the states are serialized
>> into the checkpoint files from multiple operators and tasks to tailor the
>> testing use case, but can't seem to find any. Are there any bogs that go
>> into this detail or would reading the code be the only option?
>>
>> --
>> Thanks,
>> Deepthi
>>
>

-- 
Regards,
Deepthi


Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2021-03-31 Thread Lu Niu
Hi, Colletta

Thanks for sharing! Do you mind share one stacktrace for that error as
well? Thanks!

Best
Lu

On Sat, Mar 27, 2021 at 5:36 AM Colletta, Edward 
wrote:

>
>
> FYI, we experience a similar error again, lost leadership but not due to
> timeout but a disconnect from zookeeper.  This time I examined logs for
> other errors related to zookeeper and found the kafka cluster that uses the
> same zookeeper also was disconnected.
>
>
>
> We run on AWS and this seems to be AWS related.
>
>
>
>
>
> *From:* Xintong Song 
> *Sent:* Sunday, January 31, 2021 9:23 PM
> *To:* user 
> *Subject:* Re: Flink 1.11 job hit error "Job leader lost leadership" or
> "ResourceManager leader changed to new address null"
>
>
>
> *This email is from an external source - **exercise caution regarding
> links and attachments. *
>
>
>
> Hi Colletta,
>
>
>
> This error is kind of expected if the JobMaster / ResourceManager does not
> maintain a stable connection to the ZooKeeper service, which may be caused
> by network issues, GC pause, or unstable ZK service availability.
>
>
>
> By "similar issue", what I meant is I'm not aware of any issue related to
> the upgrading of the ZK version that may cause the leadership loss.
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Sun, Jan 31, 2021 at 4:14 AM Colletta, Edward 
> wrote:
>
> “but I'm not aware of any similar issue reported since the upgrading”
>
> For the record, we experienced this same error on Flink 1.11.2 this past
> week.
>
>
>
> *From:* Xintong Song 
> *Sent:* Friday, January 29, 2021 7:34 PM
> *To:* user 
> *Subject:* Re: Flink 1.11 job hit error "Job leader lost leadership" or
> "ResourceManager leader changed to new address null"
>
>
>
> *This email is from an external source - **exercise caution regarding
> links and attachments. *
>
>
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Sat, Jan 30, 2021 at 8:27 AM Xintong Song 
> wrote:
>
> There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not
> aware of any similar issue reported since the upgrading.
>
> I would suggest the following:
>
> - Turn on the DEBUG log see if there's any valuable details
>
> - Maybe try asking in the Apache Zookeeper community, see if this is a
> known issue.
>
>
> Thank you~
> Xintong Song
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sat, Jan 30, 2021 at 6:47 AM Lu Niu  wrote:
>
> Hi, Xintong
>
>
>
> Thanks for replying. Could it relate to the zk version? We are a platform
> team at Pinterest in the middle of migrating form 1.9.1 to 1.11. Both 1.9
> and 1.11 jobs talking to the same ZK for JM HA. This problem only surfaced
> in 1.11 jobs. That's why we think it is related to version upgrade.
>
>
>
> Best
>
> Lu
>
>
>
> On Thu, Jan 28, 2021 at 7:56 PM Xintong Song 
> wrote:
>
> The ZK client side uses 15s connection timeout and 60s session timeout
> in Flink. There's nothing similar to a heartbeat interval configured, which
> I assume is up to ZK's internal implementation. These things have not
> changed in FLink since at least 2017.
>
>
>
> If both ZK client and server complain about timeout, and there's no gc
> issue spotted, I would consider a network instability.
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Fri, Jan 29, 2021 at 3:15 AM Lu Niu  wrote:
>
> After checking the log I found the root cause is zk client timeout on TM:
>
> ```
>
> 2021-01-25 14:01:49,600 WARN
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
> session timed out, have not heard from server in 40020ms for sessionid
> 0x404f9ca531a5d6f
> 2021-01-25 14:01:49,610 INFO
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
> session timed out, have not heard from server in 40020ms for sessionid
> 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
> 2021-01-25 14:01:49,711 INFO
> org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
> - State change: SUSPENDED
> 2021-01-25 14:01:49,711 WARN
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
> ZooKeeper.
> 2021-01-25 14:01:49,712 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job
> 27ac39342913d29baac4cde13062c4a4 with leader id
> b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
> 2021-01-25 14:01:49,712 WARN
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
> ZooKeeper.
> 2021-01-25 14:01:49,712 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager
> connection for job 27ac39342913d29baac4cde13062c4a4.
> 2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Sink:
> USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360)
> (d5b5887e639874cb70d7fef939b957b7).
> 2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task -
> 

ARM support

2021-03-31 Thread Rex Fenley
Hello,

We would like to run Flink on ARM yet haven't found any resources
indicating that this is yet possible. We are wondering what the timeline is
for Flink supporting ARM. Given that all Mac Books are moving to ARM and
that AWS is excitedly supporting ARM, it seems important that Flink also
supports running on ARM.

Thank you

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



s3 FileSystem Error "s3 file system implementation does not support recoverable writers"

2021-03-31 Thread Robert Cullen
I’m using a local instance of MINIO on my kubernetes cluster for
checkpoint/savepoint storage. I’m using this StreamingFileSync
configuration:


final StreamingFileSink> sink =
StreamingFileSink.forRowFormat(
new Path("s3://argo-artifacts/"),
new SimpleStringEncoder>("UTF-8"))
.withBucketAssigner(new KeyBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
.build();

Anyone know how to fix this exception?

java.lang.UnsupportedOperationException: This s3 file system
implementation does not support recoverable writers.
at 
org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136)
~[?:?]
at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:132)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:70)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:288)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:298)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:469)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]

-- 
Robert Cullen
240-475-4490


Measuring the Size of State, Savepoint Size vs. Restore time

2021-03-31 Thread Kevin Lam
Hi all,

We're interested in doing some analysis on how the size of our savepoints
and state affects the time it takes to restore from a savepoint. We're
running Flink 1.12 and using RocksDB as a state backend, on Kubernetes.

What is the best way to measure the size of a Flink Application's state? Is
state.backend.rocksdb.metrics.total-sst-files-size

the right thing to look at?

We tried looking at state.backend.rocksdb.metrics.total-sst-files-size for
all our operators, after restoring from a savepoint, and we noticed that
the sum of all the sst files sizes is much much smaller than the total size
of our savepoint (7GB vs 10TB).  Where does that discrepancy come from?

Do you have any general advice on correlating savepoint size with restore
times?

Thanks in advance!


PyFlink: Extract nested fields from JSON array

2021-03-31 Thread Sumeet Malhotra
Cross posting from StackOverlow here:

https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array

Any pointers are appreciated!

Thanks,
Sumeet


Re: clear() in a ProcessWindowFunction

2021-03-31 Thread Vishal Santoshi
I had a query Say I have a single key with 2 live sessions ( A and B )
with a configured lateness .

Do these invariants hold?

* The state is scoped to the key (created per key in the
ProcessWindowFunction with a ttl )
* The state will remain alive irrespective of whether the Window is closed
or not (a TTL timer does the collection )
*  The execution on a key is sequential , as in if 2 events arrive for the
2 Sessions they happen sequentially ( or in any order but without the need
of synchronization )
* The state mutated by an event in Session A, will be visible to Session B
if an event incident on Session B was to happen subsequently.  There is no
need of synchronizing access to the state as it for the same key.

What I am not sure about is what happens when session A merge with session
B. I would assume that it just is defining new start and end of the merged
window, Gcing the old ones ( or at least one of them ) and assigning that
even to that new window. What one does with the custom state in
ProcessWindowFunction ( there is a CountTrigger of 1 ) ,  really what is
done in the process method above,  As in this state is 1 degree removed
from what ever flink does internally with it's merges given that the state
is scoped to the key.







On Fri, Mar 12, 2021 at 12:37 PM Vishal Santoshi 
wrote:

> Yep, makes sense.
>
> On Fri, Mar 12, 2021 at 10:12 AM Roman Khachatryan 
> wrote:
>
>> > Want to confirm that the keys are GCed ( along with state ) once the
>> (windows close + lateness ) ?
>> Window state is cleared (as well as the window itself), but global
>> state is not (unless you use TTL).
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>>
>> Regards,
>> Roman
>>
>> On Fri, Mar 12, 2021 at 2:16 PM Vishal Santoshi
>>  wrote:
>> >
>> > Sometimes writing it down makes you think. I now realize that this is
>> not the right approach, given that merging windows will have their own
>> states..and how the merge happens is really at the key level
>> >
>> >
>> >
>> > On Fri, Mar 12, 2021 at 6:27 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>> >>
>> >> I intend to augment every event in a session  with a unique ID.  To
>> keep the session lean, there is a PurgingTrigger on this aggregate that
>> fires on a count of 1.
>> >>
>> >> >> (except that the number of keys can grow).
>> >>
>> >> Want to confirm that the keys are GCed ( along with state ) once the
>> (windows close + lateness ) ?
>> >>
>> >>
>> >>
>> >> On Fri, Mar 12, 2021 at 5:32 AM Roman Khachatryan 
>> wrote:
>> >>>
>> >>> Hi Vishal,
>> >>>
>> >>> There is no leak in the code you provided (except that the number of
>> >>> keys can grow).
>> >>> But as you figured out the state is scoped to key, not to window+key.
>> >>>
>> >>> Could you explain what you are trying to achieve and why do you need
>> to combine
>> >>> sliding windows with state scoped to window+key?
>> >>>
>> >>> Regards,
>> >>> Roman
>> >>>
>> >>> On Fri, Mar 12, 2021 at 5:13 AM Vishal Santoshi
>> >>>  wrote:
>> >>> >
>> >>> > Essentially, Does this code leak state
>> >>> >
>> >>> > private static class SessionIdProcessWindowFunction> java.io.Serializable, VALUE extends java.io.Serializable>
>> >>> > extends
>> >>> > ProcessWindowFunction,
>> KeyedSessionWithSessionID, KEY, TimeWindow> {
>> >>> > private static final long serialVersionUID = 1L;
>> >>> > private final static ValueStateDescriptor sessionId = new
>> ValueStateDescriptor("session_uid",
>> >>> > String.class);
>> >>> >
>> >>> > @Override
>> >>> > public void process(KEY key,
>> >>> > ProcessWindowFunction,
>> KeyedSessionWithSessionID, KEY, TimeWindow>.Context context,
>> >>> > Iterable> elements,
>> Collector> out)
>> >>> > throws Exception {
>> >>> > // I need this scoped to key/window
>> >>> > if (getRuntimeContext().getState(sessionId).value() == null) {
>> >>> > UUID uuid = UUID.randomUUID();
>> >>> > getRuntimeContext().getState(sessionId).update(uuid.toString());
>> >>> > }
>> >>> > String uuid = getRuntimeContext().getState(sessionId).value();
>> >>> > out.collect(new
>> KeyedSessionWithSessionID<>(elements.iterator().next(), uuid));
>> >>> > }
>> >>> > }
>> >>> >
>> >>> > On Thu, Mar 11, 2021 at 11:09 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>> >>> >>
>> >>> >> Hello folks,
>> >>> >>   The suggestion is to use windowState() for a key
>> key per window state and clear the state explicitly.  Also it seems that
>> getRuntime().getState() will return a globalWindow() where state is shared
>> among windows with the same key. I desire of course to have state scoped to
>> a key per window and was wanting to use windowState().. The caveat is that
>> my window is a Session Window and when I try to use clear()  I am thrown
>> this exception  ( Session Windows are Merging Windows )
>> >>> >>
>> >>> >> Caused by: java.lang.UnsupportedOperationException: Per-window
>> state is not allowed when using merging 

退订

2021-03-31 Thread 孙晨瞳
退订

Re: Restoring from Flink Savepoint in Kubernetes not working

2021-03-31 Thread Claude M
Thanks for your reply.  I'm using the flink docker
image flink:1.12.2-scala_2.11-java8.  Yes, the folder was created in S3.  I
took a look at the UI and it showed the following:

*Latest Restore ID: 49Restore Time: 2021-03-31 09:37:43Type:
CheckpointPath:
s3:fcc82deebb4565f31a7f63989939c463/chk-49*

However, this is different from the savepoint path I specified.  I
specified the following:

*s3:savepoint2/savepoint-9fe457-504c312ffabe*

Is there anything specific you're looking for in the logs?  I did not find
any exceptions and there is a lot of sensitive information I would have to
extract from it.

Also, this morning, I tried creating another savepoint.  It first showed it
was In Progress.

curl 
http://localhost:8081/jobs/fcc82deebb4565f31a7f63989939c463/savepoints/4d19307dd99337257c4738871b1c63d8
{"status":{"id":"IN_PROGRESS"},"operation":null}

Then later when I tried to check the status, I saw the attached exception.

In the UI, I see the following:

*Latest Failed Checkpoint ID: 50Failure Time: 2021-03-31 09:34:43Cause:
Asynchronous task checkpoint failed.*

What does this failure mean?


On Wed, Mar 31, 2021 at 9:22 AM Matthias Pohl 
wrote:

> Hi Claude,
> thanks for reaching out to the Flink community. Could you provide the
> Flink logs for this run to get a better understanding of what's going on?
> Additionally, what exact Flink 1.12 version are you using? Did you also
> verify that the snapshot was created by checking the actual folder?
>
> Best,
> Matthias
>
> On Wed, Mar 31, 2021 at 4:56 AM Claude M  wrote:
>
>> Hello,
>>
>> I have Flink setup as an Application Cluster in Kubernetes, using Flink
>> version 1.12.  I created a savepoint using the curl command and the status
>> indicated it was completed.  I then tried to relaunch the job from that
>> save point using the following arguments as indicated in the doc found
>> here:
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes
>>
>> args: ["standalone-job", "--job-classname", "", "--job-id",
>> "", "--fromSavepoint", "s3:///",
>> "--allowNonRestoredState"]
>>
>> After the job launches, I check the offsets and they are not the same as
>> when the savepoint was created.  The job id passed in also does not match
>> the job id that was launched.  I even put an incorrect savepoint path to
>> see what happens and there were no errors in the logs and the job still
>> launches.  It seems these arguments are not even being evaluated.  Any
>> ideas about this?
>>
>>
>> Thanks
>>
>
{"errors":["org.apache.flink.runtime.rest.NotFoundException: Operation not 
found under key: 
org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@4b261c41\n\tat
 
org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers$StatusHandler.handleRequest
(AbstractAsynchronousOperationHandlers.java:182)\n\tat 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.handleRequest
(SavepointHandlers.java:219)\n\tat 
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest
(AbstractRestHandler.java:83)\n\tat 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader
(AbstractHandler.java:195)\n\tat 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0
(LeaderRetrievalHandler.java:83)\n\tat 
java.util.Optional.ifPresent(Optional.java:159)\n\tat 
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat
 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)\n\tat
 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat
 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)\n\tat
 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)\n\tat
 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)\n\tat
 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead
(SimpleChannelInboundHandler.java:99)\n\tat 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat
 

Re: IO benchmarking

2021-03-31 Thread Matthias Pohl
Hi Deepthi,
1. Have you had a look at flink-benchmarks [1]? I haven't used it but it
might be helpful.
2. Unfortunately, Flink doesn't provide metrics like that. But you might
want to follow FLINK-21736 [2] for future developments.
3. Is there anything specific you are looking for? Unfortunately, I don't
know any blogs for a more detailed summary. If you plan to look into the
code CheckpointCoordinator [3] might be a starting point. Alternatively,
something like MetadataV2V3SerializerBase [4] offers insights into how the
checkpoints' metadata is serialized.

Best,
Matthias

[1] https://github.com/apache/flink-benchmarks
[2] https://issues.apache.org/jira/browse/FLINK-21736
[3]
https://github.com/apache/flink/blob/11550edbd4e1874634ec441bde4fe4952fc1ec4e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1493
[4]
https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83

On Tue, Mar 30, 2021 at 8:37 PM deepthi Sridharan <
deepthi.sridha...@gmail.com> wrote:

> Hi,
>
> I am trying to set up some benchmarking with a couple of IO options for
> saving checkpoints and have a couple of questions :
>
> 1. Does flink come with any IO benchmarking tools? I couldn't find any. I
> was hoping to use those to derive some insights about the storage
> performance and extrapolate it for the checkpoint use case.
>
> 2. Are there any metrics pertaining to restore from checkpoints? The only
> metric I can find is the last restore time, but neither the time it took to
> read the checkpoints, nor the time it took to restore the operator/task
> states seem to be covered. I am using RocksDB, but couldn't find any
> metrics relating to how much time it took to restore the state backend from
> rocksdb either.
>
> 3. I am trying to find documentation on how the states are serialized into
> the checkpoint files from multiple operators and tasks to tailor the
> testing use case, but can't seem to find any. Are there any bogs that go
> into this detail or would reading the code be the only option?
>
> --
> Thanks,
> Deepthi
>


转发:FLIP-146中TableSource并行度设置预计哪个版本做?

2021-03-31 Thread 张立志



退订

- 转发的邮件 -

发件人: Luna Wong
发送日期: 2021年03月31日 21:45
收件人: user-zh
主题: FLIP-146中TableSource并行度设置预计哪个版本做?
DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。


Re: FLIP-146中TableSource并行度设置预计哪个版本做?

2021-03-31 Thread Luna Wong
DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。
问题1:ScanTable 并行度在FLIP-146中有提及, LookupTable的并行度设置有FLIP或者issue吗?
问题2:这两类Table的并行度设置,预计在Flink哪个版本推出。

Luna Wong  于2021年3月31日周三 下午9:46写道:
>
> DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。
>
> Luna Wong  于2021年3月31日周三 下午9:45写道:
> >
> > DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。


Re: FLIP-146中TableSource并行度设置预计哪个版本做?

2021-03-31 Thread Luna Wong
DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。

Luna Wong  于2021年3月31日周三 下午9:45写道:
>
> DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。


FLIP-146中TableSource并行度设置预计哪个版本做?

2021-03-31 Thread Luna Wong
DynamicTableSource,目前ScanTable和LookupTable都不可自定义并行度。


Re: Restoring from Flink Savepoint in Kubernetes not working

2021-03-31 Thread Matthias Pohl
Hi Claude,
thanks for reaching out to the Flink community. Could you provide the Flink
logs for this run to get a better understanding of what's going on?
Additionally, what exact Flink 1.12 version are you using? Did you also
verify that the snapshot was created by checking the actual folder?

Best,
Matthias

On Wed, Mar 31, 2021 at 4:56 AM Claude M  wrote:

> Hello,
>
> I have Flink setup as an Application Cluster in Kubernetes, using Flink
> version 1.12.  I created a savepoint using the curl command and the status
> indicated it was completed.  I then tried to relaunch the job from that
> save point using the following arguments as indicated in the doc found
> here:
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes
>
> args: ["standalone-job", "--job-classname", "", "--job-id",
> "", "--fromSavepoint", "s3:///",
> "--allowNonRestoredState"]
>
> After the job launches, I check the offsets and they are not the same as
> when the savepoint was created.  The job id passed in also does not match
> the job id that was launched.  I even put an incorrect savepoint path to
> see what happens and there were no errors in the logs and the job still
> launches.  It seems these arguments are not even being evaluated.  Any
> ideas about this?
>
>
> Thanks
>


Re: DataStream from kafka topic

2021-03-31 Thread Arian Rohani
The issue at hand is that the record contains an unmodifiable collection
which the kryo serialiser attempts to modify by first initialising the
object and then adding items to the collection (iirc).

Caused by: java.lang.UnsupportedOperationException
> at
> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)


Without knowing the specifics of what it is exactly you are trying to
deserialise I can only attempt to give a generic answer which is to try
something like:


> StreamExecutionEnvironment see =
> StreamExecutionEnvironment.getExecutionEnvironment();
> Class unmodColl =
> Class.forName("java.util.Collections$UnmodifiableCollection");
> see.getConfig().addDefaultKryoSerializer(unmodColl,
> UnmodifiableCollectionsSerializer.class);


An even better approach is to set-up a local sandbox environment in docker
with Kafka and a sink of your choice and simply running the application
form the main method in debug mode and setting a breakpoint right before it
throws the exception.

Kind regards,
Arian Rohani


Den ons 31 mars 2021 kl 13:27 skrev Matthias Pohl :

> Hi Maminspapin,
> I haven't worked with Kafka/Flink, yet. But have you had a look at the
> docs about the DeserializationSchema [1]? It
> mentions ConfluentRegistryAvroDeserializationSchema. Is this something
> you're looking for?
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>
> On Tue, Mar 30, 2021 at 6:55 AM Maminspapin  wrote:
>
>> I tried this:
>>
>> 1. Schema (found in stackoverflow)
>>
>> class GenericRecordSchema implements
>> KafkaDeserializationSchema {
>>
>> private String registryUrl;
>> private transient KafkaAvroDeserializer deserializer;
>>
>> public GenericRecordSchema(String registryUrl) {
>> this.registryUrl = registryUrl;
>> }
>>
>> @Override
>> public boolean isEndOfStream(GenericRecord nextElement) {
>> return false;
>> }
>>
>> @Override
>> public GenericRecord deserialize(ConsumerRecord
>> consumerRecord) throws Exception {
>> checkInitialized();
>> return (GenericRecord)
>> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
>> }
>>
>> @Override
>> public TypeInformation getProducedType() {
>> return TypeExtractor.getForClass(GenericRecord.class);
>> }
>>
>> private void checkInitialized() {
>> if (deserializer == null) {
>> Map props = new HashMap<>();
>>
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> registryUrl);
>>
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>> SchemaRegistryClient client =
>> new CachedSchemaRegistryClient(
>> registryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>> deserializer = new KafkaAvroDeserializer(client, props);
>> }
>> }
>> }
>>
>> 2. Consumer
>>
>> private static FlinkKafkaConsumer getConsumer(String
>> topic) {
>>
>> return new FlinkKafkaConsumer<>(
>> topic,
>> new GenericRecordSchema("http://xxx.xx.xxx.xx:8081;),
>> getConsumerProperties());
>> }
>>
>> But when I start the app, the following error is happen:
>>
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.UnsupportedOperationException
>> Serialization trace:
>> reserved (org.apache.avro.Schema$Field)
>> fieldMap (org.apache.avro.Schema$RecordSchema)
>> schema (org.apache.avro.generic.GenericData$Record)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>> at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>> at
>>
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
>> at
>>
>> 

Re: Proper way to get DataStream

2021-03-31 Thread Matthias Pohl
Hi Maminspapin again,
have you checked whether your topic actually contains data that matches
your schema specified through cep.model.User?

Best,
Matthias

On Tue, Mar 30, 2021 at 3:39 PM Maminspapin  wrote:

> Hi,
>
> I'm trying to solve a task with getting data from topic. This topic keeps
> avro format data.
>
> I wrote next code:
>
>  public static void main(String[] args) throws Exception {
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> Schema schema = ReflectData.get().getSchema(User.class);
> FlinkKafkaConsumer userConsumer = new
> FlinkKafkaConsumer<>(
>"test_topic",
> *// First*
> AvroDeserializationSchema.forGeneric(schema),
> *// Second*
> //
> ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
> "http://xxx.xx.xxx.xx:8081;),
> getConsumerProperties());
>
> DataStream userStream =
> env.addSource(userConsumer).name("UserSource").uid("UserSourceUID");
> userStream.print("users");
>
> env.execute();
> }
>
> So, as I think right, there are two ways to get the result:
> 1. AvroDeserializationSchema.forGeneric(schema)
> 2. ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
> "http://xxx.xx.xxx.xx:8081;)
>
> And I use ReflectData.get().getSchema(User.class) to get schema.
>
>
> Please, Flink guru, tell me if I am on the right way or not.
>
>
> If I use First way, there is next error:
>
> java.io.EOFException
> at org.apache.avro.io
> .BinaryDecoder.ensureBounds(BinaryDecoder.java:510)
> at org.apache.avro.io
> .BinaryDecoder.readInt(BinaryDecoder.java:150)
> at org.apache.avro.io
> .ValidatingDecoder.readInt(ValidatingDecoder.java:82)
>
> If I use Second way, there is next error:
>
> Caused by: org.apache.avro.AvroTypeException: Found user_visit.Envelope,
> expecting cep.model.User, missing required field userId
> at org.apache.avro.io
> .ResolvingDecoder.doAction(ResolvingDecoder.java:308)
> at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>
> How can I get the correct result?
>
> Sorry, if duplicated:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStream-lt-GenericRecord-gt-from-kafka-topic-td42607.html
>
> Today is third day I'm working with this issue (((
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: DataStream from kafka topic

2021-03-31 Thread Matthias Pohl
Ok, it looks like you've found that solution already based on your question
in [1].

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Proper-way-to-get-DataStream-lt-GenericRecord-gt-td42640.html

On Wed, Mar 31, 2021 at 1:26 PM Matthias Pohl 
wrote:

> Hi Maminspapin,
> I haven't worked with Kafka/Flink, yet. But have you had a look at the
> docs about the DeserializationSchema [1]? It
> mentions ConfluentRegistryAvroDeserializationSchema. Is this something
> you're looking for?
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>
> On Tue, Mar 30, 2021 at 6:55 AM Maminspapin  wrote:
>
>> I tried this:
>>
>> 1. Schema (found in stackoverflow)
>>
>> class GenericRecordSchema implements
>> KafkaDeserializationSchema {
>>
>> private String registryUrl;
>> private transient KafkaAvroDeserializer deserializer;
>>
>> public GenericRecordSchema(String registryUrl) {
>> this.registryUrl = registryUrl;
>> }
>>
>> @Override
>> public boolean isEndOfStream(GenericRecord nextElement) {
>> return false;
>> }
>>
>> @Override
>> public GenericRecord deserialize(ConsumerRecord
>> consumerRecord) throws Exception {
>> checkInitialized();
>> return (GenericRecord)
>> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
>> }
>>
>> @Override
>> public TypeInformation getProducedType() {
>> return TypeExtractor.getForClass(GenericRecord.class);
>> }
>>
>> private void checkInitialized() {
>> if (deserializer == null) {
>> Map props = new HashMap<>();
>>
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> registryUrl);
>>
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>> SchemaRegistryClient client =
>> new CachedSchemaRegistryClient(
>> registryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>> deserializer = new KafkaAvroDeserializer(client, props);
>> }
>> }
>> }
>>
>> 2. Consumer
>>
>> private static FlinkKafkaConsumer getConsumer(String
>> topic) {
>>
>> return new FlinkKafkaConsumer<>(
>> topic,
>> new GenericRecordSchema("http://xxx.xx.xxx.xx:8081;),
>> getConsumerProperties());
>> }
>>
>> But when I start the app, the following error is happen:
>>
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.UnsupportedOperationException
>> Serialization trace:
>> reserved (org.apache.avro.Schema$Field)
>> fieldMap (org.apache.avro.Schema$RecordSchema)
>> schema (org.apache.avro.generic.GenericData$Record)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>> at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>> at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>> at
>>
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>> at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>> at
>>
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>> at
>>
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>> at
>>
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
>> at
>>
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
>> at
>>
>> 

Re: DataStream from kafka topic

2021-03-31 Thread Matthias Pohl
Hi Maminspapin,
I haven't worked with Kafka/Flink, yet. But have you had a look at the docs
about the DeserializationSchema [1]? It
mentions ConfluentRegistryAvroDeserializationSchema. Is this something
you're looking for?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema

On Tue, Mar 30, 2021 at 6:55 AM Maminspapin  wrote:

> I tried this:
>
> 1. Schema (found in stackoverflow)
>
> class GenericRecordSchema implements
> KafkaDeserializationSchema {
>
> private String registryUrl;
> private transient KafkaAvroDeserializer deserializer;
>
> public GenericRecordSchema(String registryUrl) {
> this.registryUrl = registryUrl;
> }
>
> @Override
> public boolean isEndOfStream(GenericRecord nextElement) {
> return false;
> }
>
> @Override
> public GenericRecord deserialize(ConsumerRecord
> consumerRecord) throws Exception {
> checkInitialized();
> return (GenericRecord)
> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
> }
>
> @Override
> public TypeInformation getProducedType() {
> return TypeExtractor.getForClass(GenericRecord.class);
> }
>
> private void checkInitialized() {
> if (deserializer == null) {
> Map props = new HashMap<>();
>
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> registryUrl);
>
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
> SchemaRegistryClient client =
> new CachedSchemaRegistryClient(
> registryUrl,
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
> deserializer = new KafkaAvroDeserializer(client, props);
> }
> }
> }
>
> 2. Consumer
>
> private static FlinkKafkaConsumer getConsumer(String topic)
> {
>
> return new FlinkKafkaConsumer<>(
> topic,
> new GenericRecordSchema("http://xxx.xx.xxx.xx:8081;),
> getConsumerProperties());
> }
>
> But when I start the app, the following error is happen:
>
> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> Serialization trace:
> reserved (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
> at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
> at
>
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
> at
>
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
> at
>
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
>
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
>
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
> at
>
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>  

yarn-per-job模式下,sql-client如何指定提交yarn的资源队列

2021-03-31 Thread 姚波旭



yarn-per-job模式下,sql-client如何指定提交yarn的资源队列

2021-03-31 Thread 姚波旭



Re: Flink Table to DataStream: how to access column name?

2021-03-31 Thread Yik San Chan
Thank you, Till!

Actually I find I can access this via `Table.getSchema.getFieldNames` in
version 1.12.0

Best,
Yik San

On Wed, Mar 31, 2021 at 4:26 PM Till Rohrmann  wrote:

> You are right Yik San. This feature has only been introduced in the
> upcoming 1.13 release [1]. Sorry for causing confusion here. I fear that
> you have to either use 1.13-SNAPSHOT or wait for the 1.13 release which
> should happen in a couple of weeks if you really need this feature.
>
> [1] https://issues.apache.org/jira/browse/FLINK-19981
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 6:33 PM Yik San Chan 
> wrote:
>
>> Hi Till,
>>
>> From the version I am using (1.12.0), getFieldNames is not available in
>> Row ... See
>> https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java
>> .
>>
>> Is there any workaround for this in version 1.12.0? Thanks.
>>
>> Best,
>> Yik San
>>
>> On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann 
>> wrote:
>>
>>> There is a method Row.getFieldNames.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan 
>>> wrote:
>>>
 Hi Till,

 I look inside the Row class, it does contain a member `private final
 Object[] fields;` though I wonder how to get column names out of the
 member?

 Thanks!

 Best,
 Yik San

 On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann 
 wrote:

> Hi Yik San,
>
> by converting the rows to a Tuple3 you effectively lose the
> information about the column names. You could also call
> `toRetractStream[Row]` which will give you a `DataStream[Row]` where you
> keep the column names.
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <
> evan.chanyik...@gmail.com> wrote:
>
>> The question is cross-posted on Stack Overflow
>> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
>> .
>>
>> I want to consume a Kafka topic into a table using Flink SQL, then
>> convert it back to a DataStream.
>>
>> Here is the `SOURCE_DDL`:
>>
>> ```
>> CREATE TABLE kafka_source (
>> user_id BIGINT,
>> datetime TIMESTAMP(3),
>> last_5_clicks STRING
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'aiinfra.fct.userfeature.0',
>> 'properties.bootstrap.servers' = 'localhost:9092',
>> 'properties.group.id' = 'test-group',
>> 'format' = 'json'
>> )
>> ```
>>
>> With Flink, I execute the DDL.
>>
>> ```scala
>> val settings = EnvironmentSettings.newInstance.build
>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
>> tableEnv.executeSql(SOURCE_DDL)
>> val table = tableEnv.from("kafka_source")
>> ```
>>
>> Then, I convert it into DataStream, and do downstream logic in the
>> `map(e => ...)` part.
>>
>> ```scala
>> tableEnv.toRetractStream[(Long, java.sql.Timestamp,
>> String)](table).map(e => ...)
>> ```
>>
>> Inside the `map(e => ...)` part, I would like to access the column
>> name, in this case, `last_5_clicks`. Why? Because I may have different
>> sources with different columns names (such as `last_10min_page_view`), 
>> but
>> I would like to reuse the code in `map(e => ...)`.
>>
>> Is there a way to do this? Thanks.
>>
>> Best,
>> Yik San
>>
>


Re: [DISCUSS] Feature freeze date for 1.13

2021-03-31 Thread Guowei Ma
Hi, community:

Friendly reminder that today (3.31) is the last day of feature development.
Under normal circumstances, you will not be able to submit new features
from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for testing,
welcome to help test together.
After the test is relatively stable, we will cut the release-1.13 branch.

Best,
Dawid & Guowei


On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann  wrote:

> +1 for the 31st of March for the feature freeze.
>
> Cheers,
> Till
>
> On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger 
> wrote:
>
> > +1 for March 31st for the feature freeze.
> >
> >
> >
> > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz  >
> > wrote:
> >
> > > Thank you Thomas! I'll definitely check the issue you linked.
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 23/03/2021 20:35, Thomas Weise wrote:
> > > > Hi Dawid,
> > > >
> > > > Thanks for the heads up.
> > > >
> > > > Regarding the "Rebase and merge" button. I find that merge option
> > useful,
> > > > especially for small simple changes and for backports. The following
> > > should
> > > > help to safeguard from the issue encountered previously:
> > > > https://github.com/jazzband/pip-tools/issues/1085
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
> > dwysakow...@apache.org
> > > >
> > > > wrote:
> > > >
> > > >> Hi devs, users!
> > > >>
> > > >> 1. *Feature freeze date*
> > > >>
> > > >> We are approaching the end of March which we agreed would be the
> time
> > > for
> > > >> a Feature Freeze. From the knowledge I've gather so far it still
> seems
> > > to
> > > >> be a viable plan. I think it is a good time to agree on a particular
> > > date,
> > > >> when it should happen. We suggest *(end of day CEST) March 31st*
> > > >> (Wednesday next week) as the feature freeze time.
> > > >>
> > > >> Similarly as last time, we want to create RC0 on the day after the
> > > feature
> > > >> freeze, to make sure the RC creation process is running smoothly,
> and
> > to
> > > >> have a common testing reference point.
> > > >>
> > > >> Having said that let us remind after Robert & Dian from the previous
> > > >> release what it a Feature Freeze means:
> > > >>
> > > >> *B) What does feature freeze mean?*After the feature freeze, no new
> > > >> features are allowed to be merged to master. Only bug fixes and
> > > >> documentation improvements.
> > > >> The release managers will revert new feature commits after the
> feature
> > > >> freeze.
> > > >> Rational: The goal of the feature freeze phase is to improve the
> > system
> > > >> stability by addressing known bugs. New features tend to introduce
> new
> > > >> instabilities, which would prolong the release process.
> > > >> If you need to merge a new feature after the freeze, please open a
> > > >> discussion on the dev@ list. If there are no objections by a PMC
> > member
> > > >> within 48 (workday)hours, the feature can be merged.
> > > >>
> > > >> 2. *Merge PRs from the command line*
> > > >>
> > > >> In the past releases it was quite frequent around the Feature Freeze
> > > date
> > > >> that we ended up with a broken main branch that either did not
> compile
> > > or
> > > >> there were failing tests. It was often due to concurrent merges to
> the
> > > main
> > > >> branch via the "Rebase and merge" button. To overcome the problem we
> > > would
> > > >> like to suggest only ever merging PRs from a command line. Thank you
> > > >> Stephan for the idea! The suggested workflow would look as follows:
> > > >>
> > > >>1. Pull the change and rebase on the current main branch
> > > >>2. Build the project (e.g. from IDE, which should be faster than
> > > >>building entire project from cmd) -> this should ensure the
> project
> > > compiles
> > > >>3. Run the tests in the module that the change affects -> this
> > should
> > > >>greatly minimize the chances of failling tests
> > > >>4. Push the change to the main branch
> > > >>
> > > >> Let us know what you think!
> > > >>
> > > >> Best,
> > > >>
> > > >> Guowei & Dawid
> > > >>
> > > >>
> > > >>
> > >
> > >
> >
>


Re:Re: How does Flink SQL read Avro union?

2021-03-31 Thread Vincent Dong
Hi Arvid,

I cannot decide the schema of the Kafka source topic since others also consume 
this topic.
I use Flink DataStream to consume the topic and then transform it to schema 
without union field in it, to avoid the Flink SQL issue.

Cheers,
Vincent



At 2021-03-22 22:04:53, "Arvid Heise"  wrote:

Hi Vincent,


I'm not well into Flink SQL, so I'm pulling in Jark.


I have stopped using union records in your way and instead only use nullable 
fields (technically also a union field but much easier to handle in all 
languages).


So if you have a way to change the schema, maybe try it out:

  record RowEvent {
union { null, ItemRow } item_row default null;
union { null, RefundRow } refund_row default null;
  }





On Thu, Mar 18, 2021 at 7:35 AM Vincent Dong  wrote:

Hi All, 
  How does Flink SQL read Kafka Avro message which has union field?
  For me,  avro schema is defined as following, 
```
  record ItemRow {
string num_iid;
string has_showcase;
string jdp_created;
  }


  record RefundRow {
string refund_id;
string status;
string jdp_created;
  }


  record RowEvent {
union { ItemRow, RefundRow } item_row;
  }
```
Now I'm sure that for a specific kafka topic, the item_row in all messages is 
RefundRow, but I don't know how to define source table and query the table. 
Can I define the table to force Flink SQL converts all messages to RefundRow? 
Then I can `select status, refund_id from the_converted_table`.




Thanks
Vincent Dong




 

Re: How to specific key serializer

2021-03-31 Thread Tzu-Li (Gordon) Tai
Hi CZ,

The issue here is that the Scala DataStream API uses Scala macros to decide
the serializer to be used. Since that recognizes Scala case classes, the
CaseClassSerializer will be used.
However, in the State Processor API, those Scala macros do not come into
play, and therefore it directly goes to Flink's type extraction for Java
classes, which recognizes this as a Avro generated class.
In general, currently the State Processor API doesn't support savepoints
written by Scala DataStream jobs that well.

You can try using TypeInfo annotations to specify a TypeInformationFactory
for your key class [1].
This allows you to "plug-in" the TypeInformation extracted by Flink for a
given class. In that custom TypeInformation, you should let it return the
correct serializer.

Best,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#defining-type-information-using-a-factory

On Mon, Mar 29, 2021 at 2:42 PM ChangZhuo Chen (陳昌倬) 
wrote:

> Hi,
>
> Currently we use sbt-avrohugger [0] to generate key class for keyed
> state.  The key class generated by sbt-avrohugger is both case class,
> and AVRO specific record. However, in the following scenarons, Flink
> uses different serializers:
>
>
> * In streaming application, Flink uses CaseClassSerializer for key
>   class.
> * In state processor API application, Flink uses AvroSerializer for key
>   class.
>
>
> Since they use different serializers for key, they are not compatible.
> Is there any way to specific key serializer so that both applications
> use the same serializer?
>
>
>
> [0] https://github.com/julianpeeters/sbt-avrohugger
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>


Re: Fw:A question about flink watermark illustration in official documents

2021-03-31 Thread Matthias Pohl
Hi 罗昊,
the 2nd picture is meant to visualize the issue of out-of-orderness in
general. I'd say it's not referring to a specific strategy.

But one way to interpret the image is using the BoundedOutOfOrderness
strategy for watermark generation [1]: You can define an upper bound B for
the out-of-orderness. The watermark generator assumes that there's a delay
of B, i.e. for an event with timestamp T, no events older than {@code T -
B} will follow any more. The delayed watermarks you see in image 2 could be
achieved using this bounded out-of-orderness strategy.

The usage of watermark strategies is also addressed in the docs [2].

I hope this helps.
Matthias

[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/eventtime/BoundedOutOfOrdernessWatermarks.java#L37
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#generating-watermarks

On Tue, Mar 30, 2021 at 6:26 AM 罗昊  wrote:

> Recently I read flink official documents for something about watermarks。
> url:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html
> there are two pictures illustrating flink watermark mechanism, which
> puzzle me mush:
>
>
> The first picture is easy to understand, But in the second, I wonder how
> do we get w(11) and w(17)?
> As we know, we can define how to generate watermark in the flink job, in
> other words, watermarks are generated by certain rules. So what are the
> rules that the watermarks are generated in the second pic.
>
> I look up for almost  all offficial documents of different version flink
> and they use the same pictures.
> It puzzled me much。Is there any explaination?
> waiting for your answers ,Thx!
>


对于多张亿级事实表的历史全量数据 regular join,通过mysql cdc怎样避免OOM,以及优化

2021-03-31 Thread 王敏超
对于多张亿级事实表的历史全量数据 regular join,通过mysql cdc怎样避免OOM,以及优化?

求大佬们给点建议



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Table to DataStream: how to access column name?

2021-03-31 Thread Till Rohrmann
You are right Yik San. This feature has only been introduced in the
upcoming 1.13 release [1]. Sorry for causing confusion here. I fear that
you have to either use 1.13-SNAPSHOT or wait for the 1.13 release which
should happen in a couple of weeks if you really need this feature.

[1] https://issues.apache.org/jira/browse/FLINK-19981

Cheers,
Till

On Tue, Mar 30, 2021 at 6:33 PM Yik San Chan 
wrote:

> Hi Till,
>
> From the version I am using (1.12.0), getFieldNames is not available in
> Row ... See
> https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java
> .
>
> Is there any workaround for this in version 1.12.0? Thanks.
>
> Best,
> Yik San
>
> On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann 
> wrote:
>
>> There is a method Row.getFieldNames.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan 
>> wrote:
>>
>>> Hi Till,
>>>
>>> I look inside the Row class, it does contain a member `private final
>>> Object[] fields;` though I wonder how to get column names out of the
>>> member?
>>>
>>> Thanks!
>>>
>>> Best,
>>> Yik San
>>>
>>> On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann 
>>> wrote:
>>>
 Hi Yik San,

 by converting the rows to a Tuple3 you effectively lose the information
 about the column names. You could also call `toRetractStream[Row]` which
 will give you a `DataStream[Row]` where you keep the column names.

 Cheers,
 Till

 On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan 
 wrote:

> The question is cross-posted on Stack Overflow
> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
> .
>
> I want to consume a Kafka topic into a table using Flink SQL, then
> convert it back to a DataStream.
>
> Here is the `SOURCE_DDL`:
>
> ```
> CREATE TABLE kafka_source (
> user_id BIGINT,
> datetime TIMESTAMP(3),
> last_5_clicks STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'aiinfra.fct.userfeature.0',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'test-group',
> 'format' = 'json'
> )
> ```
>
> With Flink, I execute the DDL.
>
> ```scala
> val settings = EnvironmentSettings.newInstance.build
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
> tableEnv.executeSql(SOURCE_DDL)
> val table = tableEnv.from("kafka_source")
> ```
>
> Then, I convert it into DataStream, and do downstream logic in the
> `map(e => ...)` part.
>
> ```scala
> tableEnv.toRetractStream[(Long, java.sql.Timestamp,
> String)](table).map(e => ...)
> ```
>
> Inside the `map(e => ...)` part, I would like to access the column
> name, in this case, `last_5_clicks`. Why? Because I may have different
> sources with different columns names (such as `last_10min_page_view`), but
> I would like to reuse the code in `map(e => ...)`.
>
> Is there a way to do this? Thanks.
>
> Best,
> Yik San
>



flink 小文件合并及分区没数据无法提交问题

2021-03-31 Thread kandy.wang
1.flink 小文件合并
测试下来发现,同一分区在不同时期去查询数据量不断增长,直到分区下所有的文件都compact完成,才定下来 ? 
这个是什么原因。目前看起来像是分区提交没有等到分区下所有文件compact完成
2. 某些分区没数据时无法触发分区提交问题
我们实现了自定分区提交策略,为了通知离线下游。这样如果分区没数据,不能提交的话,就会把下游调度hang住。这种问题,怎么解决




 

flink 小文件合并及分区没数据无法提交问题

2021-03-31 Thread kandy.wang
1.flink 小文件合并
测试下来发现,同一分区在不同时期去查询数据量不断增长,直到分区下所有的文件都compact完成,才定下来 ? 
这个是什么原因。目前看起来像是分区提交没有等到分区下所有文件compact完成
2. 某些分区没数据时无法触发分区提交问题
我们实现了自定分区提交策略,为了通知离线下游。这样如果分区没数据,不能提交的话,就会把下游调度hang住。这种问题,怎么解决

yarn-per-job????????sql-client????????????yarn??????????

2021-03-31 Thread ????


Re: PyFlink Table API: Interpret datetime field from Kafka as event time

2021-03-31 Thread Sumeet Malhotra
Thanks Dawid. This looks like what I needed :-)

On Tue, Mar 30, 2021 at 12:28 PM Dawid Wysakowicz 
wrote:

> Hey,
>
> I am not sure which format you use, but if you work with JSON maybe this
> option[1] could help you.
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/json.html#json-timestamp-format-standard
> On 30/03/2021 06:45, Sumeet Malhotra wrote:
>
> Thanks. Yes, that's a possibility. I'd still prefer something that can be
> done within the Table API. If it's not possible, then there's no other
> option but to use the DataStream API to read from Kafka, do the time
> conversion and create a table from it.
>
> ..Sumeet
>
> On Mon, Mar 29, 2021 at 10:41 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> I hope someone else might have a better answer, but one thing that would
>> most likely work is to convert this field and define even time during
>> DataStream to table conversion [1]. You could always pre process this field
>> in the DataStream API.
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion
>>
>> pon., 29 mar 2021 o 18:07 Sumeet Malhotra 
>> napisał(a):
>>
>>> Hi,
>>>
>>> Might be a simple, stupid question, but I'm not able to find how to
>>> convert/interpret a UTC datetime string like
>>> *2021-03-23T07:37:00.613910Z* as event-time using a DDL/Table API. I'm
>>> ingesting data from Kafka and can read this field as a string, but would
>>> like to mark it as event-time by defining a watermark.
>>>
>>> I'm able to achieve this using the DataStream API, by defining my own
>>> TimestampAssigner that converts the datetime string to milliseconds since
>>> epoch. How can I do this using a SQL DDL or Table API?
>>>
>>> I tried to directly interpret the string as TIMESTAMP(3) but it fails
>>> with the following exception:
>>>
>>> java.time.format.DateTimeParseException: Text
>>> '2021-03-23T07:37:00.613910Z' could not be parsed...
>>>
>>> Any pointers?
>>>
>>> Thanks!
>>> Sumeet
>>>
>>>


flink1.12.0 python udf任务,集群可正常执行,本地执行报错:java.lang.RuntimeException: Failed to create stage bundle factory!

2021-03-31 Thread xiaoyue
使用python flink1.12 写了UDAF的处理函数,local执行的时候会报错:
已确定当前py3环境下安装了apache-flink1.12.0
希望路过的大佬,能帮忙分析一下~ 感谢!
Traceback (most recent call last):
  File "C:/projects/virtual_pyflink1.12/TestScript/udaf_timeWeightedReturn.py", 
line 199, in udaf_p_case
env.execute('UDAF_timeWeightReturn_p')
  File 
"C:\projects\virtual_pyflink1.12\lib\site-packages\pyflink\table\table_environment.py",
 line 1276, in execute
return JobExecutionResult(self._j_tenv.execute(job_name))
  File 
"C:\projects\virtual_pyflink1.12\lib\site-packages\py4j\java_gateway.py", line 
1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\projects\virtual_pyflink1.12\lib\site-packages\pyflink\util\exceptions.py", 
line 147, in deco
return f(*a, **kw)
  File "C:\projects\virtual_pyflink1.12\lib\site-packages\py4j\protocol.py", 
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
at 

[讨论] Flink Connector 并行写入数据方案

2021-03-31 Thread jie mei
Hi, Community

我想发起一个初步的讨论,关于如何扩大 Flink 写入数据的并行度,并且能够分表,可选事务支持的方案。

该方案应该支持三种场景:

1) 至少一次: 不支持或者有限支持 update 的存储,通常通过查询去重。 例如 ClickHouse
2) 相同主键或分区内有序: 支持 Upsert,但不支持事务或者跨行事务的存储,例如 ElasticSearch, MongoDB
3) 事务:支持跨行事务的存储,例如 MySQL。

另外说一下,第二种情况和第三种情况的一个重要区别是,当 CheckPoint 失败,第二种情况会从上一个快照重新执行,
那么会存在旧的数据可能覆盖新的数据的情况。举个例子: 假设正常情况下记录A在某个快照区间取值为
A1, A2, A3。假如在写入 A2 后快照失败,当重新执行的时候,会短暂的存在这种情况,A1 覆盖了 A2 的值。

下面是不同场景扩大并行度的方案
1) 至少一次:
在这种场景下,数据乱顺是可容忍的,只要保证最少一次,就能达到最终一致性。可以考虑多线程异步写入数据,
当异步任务过多,则等待有异步任务完成,再执行新的异步写入任务。CheckPoint需要保证所有异步任务完成

2) 相同主键或分区内有序,最少一次:
在这种场景下,如果指定了分区字段,可以将相同分区的数据放到一个 Buffer 里,相同 Buffer 的数据有序,
不同 Buffer的数据并行写入,CheckPoint的时候需要保证所有数据写入;如果没有分区,单指定了主键,可以
根据主键的 Hash Code 对 Sink 并行读取模,得到的值用于决定数据缓存到哪一个 Buffer,同样相同的 Buffer
内有序,不同的 Buffer 并行。

3) 事务:
由于已经有了通用的 Sink API,可以考虑把数据缓存到 Buffer, 在 CheckPoint 的时候,开启事务,完成写入数据,并提交。
[FLIP-143]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

分表:
对于 MySQL, MongoDB 这类存储,可以通过分区键来定义分表规则,假如表 A 定义了分区键 B,B 有 B1, B2 两个取值,
那么得到两个分表 A_B1, A_B2.


-- 

*Best Regards*
*Jeremy Mei*


Re: flink-1.12.2 TM无法使用自定的serviceAccount访问configmap

2021-03-31 Thread Yang Wang
你只配置了JM的service account,-Dkubernetes.jobmanager
.service-account=flink-service-account

你试试改成-Dkubernetes.service-account=flink-service-account

Best,
Yang

1120344670 <1120344...@qq.com> 于2021年3月31日周三 下午2:26写道:

> 您好, 这是TM的报错,
> 
>
> 启动的命令如下:
> ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=tuiwen-flink
> -Dtaskmanager.memory.process.size=2200m -Dkubernetes.taskmanager.cpu=0.3
> -Dkubernetes.jobmanager.cpu=0.3 -Dtaskmanager.numberOfTaskSlots=2
> -Dkubernetes.rest-service.exposed.type=ClusterIP
> -Dkubernetes.jobmanager.service-account=flink-service-account
> -Dresourcemanager.taskmanager-timeout=345600   -Dkubernetes.namespace=flink
>
> 镜像使我们根据: apache/flink:1.12.2-scala_2.12 自己做的。
>
>
>
> Yang Wang wrote
> > 我可以确认1.12.1和1.12.2已经修复,如果还是不能正常使用,麻烦发一下启动命令以及对应的TM报错日志
> >
> > Best,
> > Yang
> >
> > 1120344670 <
>
> > 1120344670@
>
> >> 于2021年3月29日周一 下午5:09写道:
> >
> >> 您好:
> >>之前提交过一个关于这方面的issue,链接如下:
> >>
> http://apache-flink.147419.n8.nabble.com/flink1-12-k8s-session-TM-td10153.html
> >>目前看还是没有fix对应的issue。
> >>
> >>报错如下:
> >>
> >>
> >> 目前看jira上的issue已经关闭了, 请确认是否修复。
> >>
>
>
> Yang Wang wrote
> > 我可以确认1.12.1和1.12.2已经修复,如果还是不能正常使用,麻烦发一下启动命令以及对应的TM报错日志
> >
> > Best,
> > Yang
> >
> > 1120344670 <
>
> > 1120344670@
>
> >> 于2021年3月29日周一 下午5:09写道:
> >
> >> 您好:
> >>之前提交过一个关于这方面的issue,链接如下:
> >>
> http://apache-flink.147419.n8.nabble.com/flink1-12-k8s-session-TM-td10153.html
> >>目前看还是没有fix对应的issue。
> >>
> >>报错如下:
> >>
> >>
> >> 目前看jira上的issue已经关闭了, 请确认是否修复。
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink-1.12.2 TM无法使用自定的serviceAccount访问configmap

2021-03-31 Thread 1120344670
您好, 这是TM的报错, 
 

启动的命令如下:
./bin/kubernetes-session.sh -Dkubernetes.cluster-id=tuiwen-flink
-Dtaskmanager.memory.process.size=2200m -Dkubernetes.taskmanager.cpu=0.3
-Dkubernetes.jobmanager.cpu=0.3 -Dtaskmanager.numberOfTaskSlots=2 
-Dkubernetes.rest-service.exposed.type=ClusterIP
-Dkubernetes.jobmanager.service-account=flink-service-account
-Dresourcemanager.taskmanager-timeout=345600   -Dkubernetes.namespace=flink

镜像使我们根据: apache/flink:1.12.2-scala_2.12 自己做的。



Yang Wang wrote
> 我可以确认1.12.1和1.12.2已经修复,如果还是不能正常使用,麻烦发一下启动命令以及对应的TM报错日志
> 
> Best,
> Yang
> 
> 1120344670 <

> 1120344670@

>> 于2021年3月29日周一 下午5:09写道:
> 
>> 您好:
>>之前提交过一个关于这方面的issue,链接如下:
>> http://apache-flink.147419.n8.nabble.com/flink1-12-k8s-session-TM-td10153.html
>>目前看还是没有fix对应的issue。
>>
>>报错如下:
>>
>>
>> 目前看jira上的issue已经关闭了, 请确认是否修复。
>>


Yang Wang wrote
> 我可以确认1.12.1和1.12.2已经修复,如果还是不能正常使用,麻烦发一下启动命令以及对应的TM报错日志
> 
> Best,
> Yang
> 
> 1120344670 <

> 1120344670@

>> 于2021年3月29日周一 下午5:09写道:
> 
>> 您好:
>>之前提交过一个关于这方面的issue,链接如下:
>> http://apache-flink.147419.n8.nabble.com/flink1-12-k8s-session-TM-td10153.html
>>目前看还是没有fix对应的issue。
>>
>>报错如下:
>>
>>
>> 目前看jira上的issue已经关闭了, 请确认是否修复。
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/


退订

2021-03-31 Thread 小鲲鹏
退订


| |
小鲲鹏
|
|
邮箱:xxpe...@163.com
|

签名由 网易邮箱大师 定制

退订

2021-03-31 Thread kk wi
退订