Re: batch mode如何同步等待执行结果

2022-11-09 Thread weijie guo
什么部署模式(per-job/session/application),另外是不是使用Detach参数,例如-d或者-yd

Best regards,

Weijie


唐世伟  于2022年11月9日周三 10:00写道:

> 谢谢回复,这个应该只能在table api或者sql的情况下使用,stream api应该不行吧
>
>
> > 2022年11月8日 下午8:10,yuxia  写道:
> >
> > set table.dml-sync = true
> > 是不是可以
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "唐世伟" 
> > 收件人: "user-zh" 
> > 发送时间: 星期二, 2022年 11 月 08日 下午 8:06:18
> > 主题: batch mode如何同步等待执行结果
> >
> > 通过./bin/flink run 提交一个batch
> mode的任务到yarn集群。命令只会返回任务是否正常被提交并返回applicationId。任务本身是否执行成功不会返回。当我们通过离线调度平台来调度flink
> batch任务的时候,没法捕捉到任务执行结果。请问有什么方式可以同步等待执行结果的吗?
>
>


Re: How to get checkpoint stats after job has terminated

2022-11-09 Thread yidan zhao
First of all, you should trigger a savepoint before stopping the job,
and then you can restart the job with the savepoint.

For checkpoints, you need to set
‘execution.checkpointing.externalized-checkpoint-retention’ to
'RETAIN_ON_CANCELLATION'. You can get the checkpoints info via history
server.

Guojun Li  于2022年11月3日周四 11:31写道:
>
> Hi, Flink User Group
>
> I want to retrieve the last few completed checkpoints' stats even if the job 
> has terminated, these stats are useful for restarting the job manually. 
> Because we prefer to restore job from retained checkpoint rather than 
> savepoint. Doc Monitoring Checkpointing said these stats available after the 
> job has terminated.
>
> So I’m wondering
> 1. How to get checkpoint stats after job has terminated?
> 2. How long these stats will retain?
>
> Thanks,
> Guojun
>
>


Re: FeatHub - 一个基于Flink的开源实时特征工程平台

2022-11-09 Thread Dong Lin
最近时间有限,还在改善基本功能的开发。所以暂时没有中文文档。

因为中文社区中有能看英文说明的开发者,但英文社区里基本没有会中文的开发者,所以优先提供英文的文档了。

我们之后会提供中文的文档的~

On Thu, Nov 10, 2022 at 11:11 AM kanata163  wrote:

> 既然到中文社区来做宣传了,至少文档也搞个中文版的呀
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-11-08 13:49:50,"Dong Lin"  写道:
> >大家好,
> >
> >我们 (阿里云Flink团队) 最近开源了FeatHub,一个基于Flink来完成实时特征工程的特征工程平台 (feature
> >store)。我们希望这个项目极大简化和支持大家基于Flink来完成特征的定义,部署,监控 etc.
> >
> >目前项目已经在https://github.com/alibaba/feathub 开源。github页面中有更多的信息。
> >https://github.com/flink-extended/feathub-examples
> 提供了更多的代码样例。并且我们将在今年11月的Flink
> >Forward Asia 大会上介绍这个项目。
> >
> >这个项目还在早期开发中。我们正在添加更多的功能,例如对于Redis的支持。希望大家能关注FeatHub项目和提供反馈!
> >
> >Cheers,
> >Dong
>


Re:FeatHub - 一个基于Flink的开源实时特征工程平台

2022-11-09 Thread kanata163
既然到中文社区来做宣传了,至少文档也搞个中文版的呀

















在 2022-11-08 13:49:50,"Dong Lin"  写道:
>大家好,
>
>我们 (阿里云Flink团队) 最近开源了FeatHub,一个基于Flink来完成实时特征工程的特征工程平台 (feature
>store)。我们希望这个项目极大简化和支持大家基于Flink来完成特征的定义,部署,监控 etc.
>
>目前项目已经在https://github.com/alibaba/feathub 开源。github页面中有更多的信息。
>https://github.com/flink-extended/feathub-examples提供了更多的代码样例。并且我们将在今年11月的Flink
>Forward Asia 大会上介绍这个项目。
>
>这个项目还在早期开发中。我们正在添加更多的功能,例如对于Redis的支持。希望大家能关注FeatHub项目和提供反馈!
>
>Cheers,
>Dong


Any caveats about processing abstract classes ?

2022-11-09 Thread Davide Bolcioni via user
Greetings,
I am looking at Flink pipeline processing events consumed from a Kafka
topic, which now needs to also consume events which have a different, but
related, schema. Traditional Java OOP would suggest transitioning from

class Dog { ... }
new FilterFunction { ... }

to

abstract class Animal { ... }
class Dog extends Animal { ... }
class Cat extends Animal { ... }
new FilterFunction { ... }

but I am wondering if there is anything that might surprise the unwary down
that road, considering that the code base also uses asynchronous functions
and the broadcast pattern.

Thank you in advance,
Davide Bolcioni
--
There is no place like /home


RocksDB checkpoints clean up with TaskManager restart.

2022-11-09 Thread Vidya Sagar Mula
Hi,

I am using RocksDB state backend for incremental checkpointing with Flink
1.11 version.

Question:
--
For a given Job ID, Intermediate RocksDB checkpoints are stored under the
path defined with ""

The files are stored with "_jobID+ radom UUID" prefixed to the location.

Case : 1
-
- When I cancel the job, then all the rocksDB checkpoints are deleted
properly from the location corresponding to that JobId.
(based on "instanceBasePath" variable stored in RocksDBKeyedStateBackend
object).
"NO Issue here. Working as expected".

Case : 2
-
- When my TaskManger is restarted, the existing rocksDb checkpoints are not
deleted.
New "instanceBasePath" is constructed with the new Random UUID appended to
the directory.
And, old checkpoint directories are still there.

questions:
- Is this expected behaviour not to delete the existing checkPoint
dirs under the rocksDB local directory?
- I see the "StreamTaskStateInitializerImpl.java", where new StateBackend
objects are created. In this case, new directory is created for this Job ID
appended with new random UUID.
What happens to the old Directories. Are they going to be purged later on?
If not, the disk is going to filled up with the older checkpoints. Please
clarify this.

Thanks,
Vidya Sagar.


Flink fails to authenticate with adlsgen2 azure storage account using managed identities in an Azure Kubernetes Cluster

2022-11-09 Thread DEROCCO, CHRISTOPHER
Flink fails to authenticate with adlsgen2 azure storage account using  managed 
identities in an Azure Kubernetes Cluster. We receive the following error from 
flink when we try to configure managed identities to authenticate to adlsgen2.



Caused by: Unable to load key provider class.

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getTokenProvider(AbfsConfiguration.java:540)

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:1136)

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.(AzureBlobFileSystemStore.java:174)

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:110)

at 
org.apache.flink.fs.azurefs.AbstractAzureFSFactory.create(AbstractAzureFSFactory.java:79)

at 
org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62)

at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)

at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:99)

... 12 more



We are trying to run flink in an azure Kubernetes cluster using the Flink 
Apache operator (https://github.com/apache/flink-kubernetes-operator)

The following are the settings we are using in the spec.flinkConfigurations to 
update the flink-conf.yaml file to use azure managed identities based off this 
documentation 
(https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Authentication). 
The client Id we are using is the user assigned identity of the AKS agent pools 
/VM scalesets . We applied the storage data blob contributor role scoped to the 
adlsgen2 azure storage account for the client id.



Using the storage account key is not recommended for adlsgen2.  Any insights 
into this matter will be helpful as we would prefer to use managed identities.



apiVersion: flink.apache.org/v1beta1

kind: FlinkDeployment

metadata:

  name: kafkatest

spec:

  flinkVersion: v1_15

  flinkConfiguration:

taskmanager.numberOfTaskSlots: "2"

state.backend: rocksdb

state.backend.fs.checkpointdir: 
abfs://containern...@storageaccountname.dfs.core.windows.net/kafkatest-checkpoints/

state.checkpoints.dir:  
abfs://containern...@storageaccountname.dfs.core.windows.net/kafkatest-externalized-checkpoints/

state.savepoints.dir:  
abfs://containern...@storageaccountname.dfs.core.windows.net/kafkatest-savepoints/

high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.storageDir: 
abfs://containern...@storageaccountname.dfs.core.windows.net/kafkatest-ha

fs.azure.createRemoteFileSystemDuringInitialization: "true"

fs.azure.account.auth.type.storageaccountname.dfs.core.windows.net: OAuth


fs.azure.account.oauth.provider.type.storageaccountname.dfs.core.windows.net: 
org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider

fs.azure.account.oauth2.msi.tenant.storageaccountname.dfs.core.windows.net: 
XXX

fs.azure.account.oauth2.client.id.storageaccountname.dfs.core.windows.net: 
XXX


fs.azure.account.oauth2.msi.endpoint.storageaccountname.dfs.core.windows.net: 
https://login.microsoftonline.com/https://login.microsoftonline.com/%3cTENANT>
 ID>/oauth2/token

  serviceAccount: workload-identity-sa

  podTemplate:

metadata:

  name: test

spec:

  securityContext:

runAsUser: 

runAsGroup: 

fsGroup: 

runAsNonRoot: true

  containers:

# Do not change the main container name

- name: flink-main-container

  env:

  - name: ENABLE_BUILT_IN_PLUGINS

value: flink-azure-fs-hadoop-1.15.2.jar



Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-09 Thread Andrew Otto
Interesting, yeah I think you'll have to implement code to recurse through
the (Row) DataType and somehow auto generate the JSONSchema you want.

We abstracted the conversions from JSONSchema to other type systems in this
JsonSchemaConverter
.
There's nothing special going on here, I've seen versions of this schema
conversion code over and over again in different frameworks. This one just
allows us to plug in a SchemaConversions

implementation
to provide the mappings to the output type system (like the Flink DataType
mappings

I
linked to before), rather than hardcoding the output types.

If I were trying to do what you are doing (in our codebase)...I'd create a
Flink DataTypeConverter that iterated through a (Row) DataType and a
SchemaConversions implementation that mapped to the JsonNode that
represented the JSONSchema.  (If not using Jackson...then you could use
another Java JSON object than JsonNode).
You could also make a SchemaConversions (with whatever
Protobuf class to use...I'm not familiar with Protobuf) and then use the
same DataTypeConverter to convert to ProtobufSchema.   AND THEN...I'd
wonder if the input schema recursion code itself could be abstracted too so
that it would work for either JsonSchema OR DataType OR whatever but anyway
that is probably too crazy and too much for what you are doing...but it
would be cool! :p





On Wed, Nov 9, 2022 at 9:52 AM Theodor Wübker 
wrote:

> I want to register the result-schema in a schema registry, as I am pushing
> the result-data to a Kafka topic. The result-schema is not known at
> compile-time, so I need to find a way to compute it at runtime from the
> resulting Flink Schema.
>
> -Theo
>
> (resent - again sorry, I forgot to add the others in the cc)
>
> On 9. Nov 2022, at 14:59, Andrew Otto  wrote:
>
> >  I want to convert the schema of a Flink table to both Protobuf *schema* and
> JSON *schema*
> Oh, you want to convert from Flink Schema TO JSONSchema?  Interesting.
> That would indeed be something that is not usually done.  Just curious, why
> do you want to do this?
>
> On Wed, Nov 9, 2022 at 8:46 AM Andrew Otto  wrote:
>
>> Hello!
>>
>> I see you are talking about JSONSchema, not just JSON itself.
>>
>> We're trying to do a similar thing at Wikimedia and have developed some
>> tooling around this.
>>
>> JsonSchemaFlinkConverter
>> 
>> has some logic to convert from JSONSchema Jackson ObjectNodes to Flink
>> Table DataType or Table SchemaBuilder, or Flink DataStream
>> TypeInformation[Row].  Some of the conversions from JSONSchema to Flink
>> type are opinionated.  You can see the mappings here
>> 
>> .
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker 
>> wrote:
>>
>>> Thanks for your reply Yaroslav! The way I do it with Avro seems similar
>>> to what you pointed out:
>>>
>>> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
>>> DataType type = resultSchema.toSinkRowDataType();
>>> org.apache.avro.Schema converted = 
>>> AvroSchemaConverter.convertToSchema(type.getLogicalType());
>>>
>>> I mentioned the ResolvedSchema because it is my starting point after the
>>> SQL operation. It seemed to me that I can not retrieve something that
>>> contains more schema information from the table so I got myself this. About
>>> your other answers: It seems the classes you mentioned can be used to
>>> serialize actual Data? However this is not quite what I want to do.
>>> Essentially I want to convert the schema of a Flink table to both
>>> Protobuf *schema* and JSON *schema* (for Avro as you can see I have it
>>> already). It seems odd that this is not easily possible, because converting
>>> from a JSON schema to a Schema of Flink is possible using the
>>> JsonRowSchemaConverter. However the other way is not implemented it seems.
>>> This is how I got a Table Schema (that I can use in a table descriptor)
>>> from a JSON schema:
>>>
>>> TypeInformation type = JsonRowSchemaConverter.convert(json);
>>> DataType row = 

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

2022-11-09 Thread Etienne Chauchot

Hi Yun Gao,

FYI I just updated the article after your review: 
https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html


Best

Etienne

Le 09/11/2022 à 10:04, Etienne Chauchot a écrit :


Hi Yun Gao,

thanks for your email and your review !

My comments are inline

Le 08/11/2022 à 06:51, Yun Gao a écrit :

Hi Etienne,

Very thanks for the article! Flink is currently indeed keeping 
increasing the
ability of unified batch / stream processing with the same api, and 
its a great
pleasure that more and more users are trying this functionality. But 
I also

have some questions regarding some details.

First IMO, as a whole for the long run Flink will have two unified 
APIs, namely Table / SQL
API and DataStream API. Users could express the computation logic 
with these two APIs

for both bounded and unbounded data processing.



Yes that is what I understood also throughout the discussions and 
jiras. And I also think IMHO that reducing the number of APIs to 2 was 
the good move.




Underlying Flink provides two
execution modes:  the streaming mode works with both bounded and 
unbounded data,
and it executes in a way of incremental processing based on state; 
the batch mode works
only with bounded data, and it executes in a ways level-by-level 
similar to the traditional

batch processing frameworks. Users could switch the execution mode via
EnvironmentSettings.inBatchMode() for 
StreamExecutionEnvironment.setRuntimeMode().


As recommended in Flink docs(1) I have enabled the batch mode as I 
though it would be more efficient on my bounded pipeline but as a 
matter of fact the streaming mode seems to be more efficient on my use 
case. I'll test with higher volumes to confirm.





Specially for DataStream, as implemented in FLIP-140, currently all 
the existing DataStream
operation supports the batch execution mode in a unified way[1]:  
data will be sorted for the
keyBy() edges according to the key, then the following operations 
like reduce() could receive
all the data belonging to the same key consecutively, then it could 
directly reducing the records
of the same key without maintaining the intermediate states. In this 
way users could write the

same code for both streaming and batch processing with the same code.



Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream 
pipeline will work with no modification if I plug an unbounded source 
to it.





# Regarding the migration of Join / Reduce

First I think Reduce is always supported and users could write 
dataStream.keyBy().reduce(xx)
directly, and  if batch  execution mode is set, the reduce will not 
be executed in a incremental way,
instead is acts much  like sort-based  aggregation in the traditional 
batch processing framework.


Regarding Join, although the issue of FLINK-22587 indeed exists: 
current join has to be bound
to a window and the GlobalWindow does not work properly, but with 
some more try currently
it does not need users to  re-write the whole join from scratch: 
Users could write a dedicated
window assigner that assigns all the  records to the same window 
instance  and return
EventTimeTrigger.create() as the default event-time trigger [2]. Then 
it works


source1.join(source2)
                .where(a -> a.f0)
                .equalTo(b -> b.f0)
                .window(new EndOfStreamWindows())
                .apply();

It does not requires records have event-time attached since the 
trigger of window is only
relying on the time range of the window and the assignment does not 
need event-time either.


The behavior of the join is also similar to sort-based join if batch 
mode is enabled.


Of course it is not easy to use to let users do the workaround and 
we'll try to fix this issue in 1.17.



Yes, this is a better workaround than the manual state-based join that 
I proposed. I tried it and it works perfectly with similar 
performance. Thanks.




# Regarding support of Sort / Limit

Currently these two operators are indeed not supported in the 
DataStream API directly. One initial
though for these two operations are that users may convert the 
DataStream to Table API and use

Table API for these two operators:

DataStream xx = ... // Keeps the customized logic in DataStream
Table tableXX = tableEnv.fromDataStream(dataStream);
tableXX.orderBy($("a").asc());



Yes I knew that workaround but I decided not to use it because I have 
a special SQL based implementation (for comparison reasons) so I did 
not want to mix SQL and DataStream APIs in the same pipeline.




How do you think about this option? We are also assessing if the 
combination of DataStream
API / Table API is sufficient for all the batch users. Any 
suggestions are warmly welcome.



I guess that outside of my use case of comparing the performance of 
the 3 Flink APIs (broader subject than this article), users can easily 
mix the APIs in the same pipeline. If we really want to have these 
operations in the DataStream API 

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-09 Thread Theodor Wübker
I want to register the result-schema in a schema registry, as I am pushing the 
result-data to a Kafka topic. The result-schema is not known at compile-time, 
so I need to find a way to compute it at runtime from the resulting Flink 
Schema.

-Theo

(resent - again sorry, I forgot to add the others in the cc)

> On 9. Nov 2022, at 14:59, Andrew Otto  wrote:
> 
> >  I want to convert the schema of a Flink table to both Protobuf schema and 
> > JSON schema
> Oh, you want to convert from Flink Schema TO JSONSchema?  Interesting.  That 
> would indeed be something that is not usually done.  Just curious, why do you 
> want to do this?
> 
> On Wed, Nov 9, 2022 at 8:46 AM Andrew Otto  > wrote:
> Hello! 
> 
> I see you are talking about JSONSchema, not just JSON itself.
> 
> We're trying to do a similar thing at Wikimedia and have developed some 
> tooling around this.  
> 
> JsonSchemaFlinkConverter 
> 
>  has some logic to convert from JSONSchema Jackson ObjectNodes to Flink Table 
> DataType or Table SchemaBuilder, or Flink DataStream TypeInformation[Row].  
> Some of the conversions from JSONSchema to Flink type are opinionated.  You 
> can see the mappings here 
> .
> 
> 
> 
> 
> 
> 
> 
> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker  > wrote:
> Thanks for your reply Yaroslav! The way I do it with Avro seems similar to 
> what you pointed out:
> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
> DataType type = resultSchema.toSinkRowDataType();
> org.apache.avro.Schema converted = 
> AvroSchemaConverter.convertToSchema(type.getLogicalType());
> I mentioned the ResolvedSchema because it is my starting point after the SQL 
> operation. It seemed to me that I can not retrieve something that contains 
> more schema information from the table so I got myself this. About your other 
> answers: It seems the classes you mentioned can be used to serialize actual 
> Data? However this is not quite what I want to do.
> Essentially I want to convert the schema of a Flink table to both Protobuf 
> schema and JSON schema (for Avro as you can see I have it already). It seems 
> odd that this is not easily possible, because converting from a JSON schema 
> to a Schema of Flink is possible using the JsonRowSchemaConverter. However 
> the other way is not implemented it seems. This is how I got a Table Schema 
> (that I can use in a table descriptor) from a JSON schema:
> 
> TypeInformation type = JsonRowSchemaConverter.convert(json);
> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
> Sidenote: I use deprecated methods here, so if there is a better approach 
> please let me know! But it shows that in Flink its easily possible to create 
> a Schema for a TableDescriptor from a JSON Schema - the other way is just not 
> so trivial it seems. And for Protobuf so far I don’t have any solutions, not 
> even creating a Flink Schema from a Protobuf Schema - not to mention the 
> other way around.
> 
> -Theo
> 
> (resent because I accidentally only responded to you, not the Mailing list - 
> sorry)
> 



smime.p7s
Description: S/MIME cryptographic signature


Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-09 Thread Theodor Wübker
Hey,

thank you for your reply. Your converter looks very interesting. However, Flink 
comes with the JsonRowSchemaConverter that converts a JSONSchema-String to a 
TypeInformation already. From there you can convert the TypeInformation to, 
say, a DataType (Although I must admit I only got this done using deprecated 
methods in Flink). I am struggling to get the reverse way done - converting 
from a Flink ResolvedSchema (or LogicalType, or DataType) to a JSONSchema. Is 
that something you want to implement in your converter as well?

Your project is encouraging me though, maybe I will try to implement DataType 
to JSONSchema and ProtobufSchema to DataType (and the reverse) myself, given I 
do not find anything that does the trick.

-Theo

> On 9. Nov 2022, at 14:46, Andrew Otto  wrote:
> 
> Hello! 
> 
> I see you are talking about JSONSchema, not just JSON itself.
> 
> We're trying to do a similar thing at Wikimedia and have developed some 
> tooling around this.  
> 
> JsonSchemaFlinkConverter 
> 
>  has some logic to convert from JSONSchema Jackson ObjectNodes to Flink Table 
> DataType or Table SchemaBuilder, or Flink DataStream TypeInformation[Row].  
> Some of the conversions from JSONSchema to Flink type are opinionated.  You 
> can see the mappings here 
> .
> 
> 
> 
> 
> 
> 
> 
> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker  > wrote:
> Thanks for your reply Yaroslav! The way I do it with Avro seems similar to 
> what you pointed out:
> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
> DataType type = resultSchema.toSinkRowDataType();
> org.apache.avro.Schema converted = 
> AvroSchemaConverter.convertToSchema(type.getLogicalType());
> I mentioned the ResolvedSchema because it is my starting point after the SQL 
> operation. It seemed to me that I can not retrieve something that contains 
> more schema information from the table so I got myself this. About your other 
> answers: It seems the classes you mentioned can be used to serialize actual 
> Data? However this is not quite what I want to do.
> Essentially I want to convert the schema of a Flink table to both Protobuf 
> schema and JSON schema (for Avro as you can see I have it already). It seems 
> odd that this is not easily possible, because converting from a JSON schema 
> to a Schema of Flink is possible using the JsonRowSchemaConverter. However 
> the other way is not implemented it seems. This is how I got a Table Schema 
> (that I can use in a table descriptor) from a JSON schema:
> 
> TypeInformation type = JsonRowSchemaConverter.convert(json);
> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
> Sidenote: I use deprecated methods here, so if there is a better approach 
> please let me know! But it shows that in Flink its easily possible to create 
> a Schema for a TableDescriptor from a JSON Schema - the other way is just not 
> so trivial it seems. And for Protobuf so far I don’t have any solutions, not 
> even creating a Flink Schema from a Protobuf Schema - not to mention the 
> other way around.
> 
> -Theo
> 
> (resent because I accidentally only responded to you, not the Mailing list - 
> sorry)
> 



smime.p7s
Description: S/MIME cryptographic signature


Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-09 Thread Andrew Otto
>  I want to convert the schema of a Flink table to both Protobuf *schema* and
JSON *schema*
Oh, you want to convert from Flink Schema TO JSONSchema?  Interesting.
That would indeed be something that is not usually done.  Just curious, why
do you want to do this?

On Wed, Nov 9, 2022 at 8:46 AM Andrew Otto  wrote:

> Hello!
>
> I see you are talking about JSONSchema, not just JSON itself.
>
> We're trying to do a similar thing at Wikimedia and have developed some
> tooling around this.
>
> JsonSchemaFlinkConverter
> 
> has some logic to convert from JSONSchema Jackson ObjectNodes to Flink
> Table DataType or Table SchemaBuilder, or Flink DataStream
> TypeInformation[Row].  Some of the conversions from JSONSchema to Flink
> type are opinionated.  You can see the mappings here
> 
> .
>
>
>
>
>
>
>
> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker 
> wrote:
>
>> Thanks for your reply Yaroslav! The way I do it with Avro seems similar
>> to what you pointed out:
>>
>> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
>> DataType type = resultSchema.toSinkRowDataType();
>> org.apache.avro.Schema converted = 
>> AvroSchemaConverter.convertToSchema(type.getLogicalType());
>>
>> I mentioned the ResolvedSchema because it is my starting point after the
>> SQL operation. It seemed to me that I can not retrieve something that
>> contains more schema information from the table so I got myself this. About
>> your other answers: It seems the classes you mentioned can be used to
>> serialize actual Data? However this is not quite what I want to do.
>> Essentially I want to convert the schema of a Flink table to both
>> Protobuf *schema* and JSON *schema* (for Avro as you can see I have it
>> already). It seems odd that this is not easily possible, because converting
>> from a JSON schema to a Schema of Flink is possible using the
>> JsonRowSchemaConverter. However the other way is not implemented it seems.
>> This is how I got a Table Schema (that I can use in a table descriptor)
>> from a JSON schema:
>>
>> TypeInformation type = JsonRowSchemaConverter.convert(json);
>> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
>> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
>>
>> Sidenote: I use deprecated methods here, so if there is a better approach
>> please let me know! But it shows that in Flink its easily possible to
>> create a Schema for a TableDescriptor from a JSON Schema - the other way is
>> just not so trivial it seems. And for Protobuf so far I don’t have any
>> solutions, not even creating a Flink Schema from a Protobuf Schema - not to
>> mention the other way around.
>>
>> -Theo
>>
>> (resent because I accidentally only responded to you, not the Mailing
>> list - sorry)
>>
>>


Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-09 Thread Andrew Otto
Hello!

I see you are talking about JSONSchema, not just JSON itself.

We're trying to do a similar thing at Wikimedia and have developed some
tooling around this.

JsonSchemaFlinkConverter

has some logic to convert from JSONSchema Jackson ObjectNodes to Flink
Table DataType or Table SchemaBuilder, or Flink DataStream
TypeInformation[Row].  Some of the conversions from JSONSchema to Flink
type are opinionated.  You can see the mappings here

.







On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker 
wrote:

> Thanks for your reply Yaroslav! The way I do it with Avro seems similar to
> what you pointed out:
>
> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
> DataType type = resultSchema.toSinkRowDataType();
> org.apache.avro.Schema converted = 
> AvroSchemaConverter.convertToSchema(type.getLogicalType());
>
> I mentioned the ResolvedSchema because it is my starting point after the
> SQL operation. It seemed to me that I can not retrieve something that
> contains more schema information from the table so I got myself this. About
> your other answers: It seems the classes you mentioned can be used to
> serialize actual Data? However this is not quite what I want to do.
> Essentially I want to convert the schema of a Flink table to both Protobuf
> *schema* and JSON *schema* (for Avro as you can see I have it already).
> It seems odd that this is not easily possible, because converting from a
> JSON schema to a Schema of Flink is possible using the
> JsonRowSchemaConverter. However the other way is not implemented it seems.
> This is how I got a Table Schema (that I can use in a table descriptor)
> from a JSON schema:
>
> TypeInformation type = JsonRowSchemaConverter.convert(json);
> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
>
> Sidenote: I use deprecated methods here, so if there is a better approach
> please let me know! But it shows that in Flink its easily possible to
> create a Schema for a TableDescriptor from a JSON Schema - the other way is
> just not so trivial it seems. And for Protobuf so far I don’t have any
> solutions, not even creating a Flink Schema from a Protobuf Schema - not to
> mention the other way around.
>
> -Theo
>
> (resent because I accidentally only responded to you, not the Mailing list
> - sorry)
>
>


Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

2022-11-09 Thread Etienne Chauchot

Hi,

And by the way, I was planing on writing another article to compare the 
performances of DataSet, DataStream and SQL APIs over TPCDS query3. I 
thought that I could run the pipelines on an Amazon EMR cluster with 
different data sizes 1GB, 100GB, 1TB.


Would it be worth it, what do you think ?

Best

Etienne

Le 09/11/2022 à 10:04, Etienne Chauchot a écrit :


Hi Yun Gao,

thanks for your email and your review !

My comments are inline

Le 08/11/2022 à 06:51, Yun Gao a écrit :

Hi Etienne,

Very thanks for the article! Flink is currently indeed keeping 
increasing the
ability of unified batch / stream processing with the same api, and 
its a great
pleasure that more and more users are trying this functionality. But 
I also

have some questions regarding some details.

First IMO, as a whole for the long run Flink will have two unified 
APIs, namely Table / SQL
API and DataStream API. Users could express the computation logic 
with these two APIs

for both bounded and unbounded data processing.



Yes that is what I understood also throughout the discussions and 
jiras. And I also think IMHO that reducing the number of APIs to 2 was 
the good move.




Underlying Flink provides two
execution modes:  the streaming mode works with both bounded and 
unbounded data,
and it executes in a way of incremental processing based on state; 
the batch mode works
only with bounded data, and it executes in a ways level-by-level 
similar to the traditional

batch processing frameworks. Users could switch the execution mode via
EnvironmentSettings.inBatchMode() for 
StreamExecutionEnvironment.setRuntimeMode().


As recommended in Flink docs(1) I have enabled the batch mode as I 
though it would be more efficient on my bounded pipeline but as a 
matter of fact the streaming mode seems to be more efficient on my use 
case. I'll test with higher volumes to confirm.





Specially for DataStream, as implemented in FLIP-140, currently all 
the existing DataStream
operation supports the batch execution mode in a unified way[1]:  
data will be sorted for the
keyBy() edges according to the key, then the following operations 
like reduce() could receive
all the data belonging to the same key consecutively, then it could 
directly reducing the records
of the same key without maintaining the intermediate states. In this 
way users could write the

same code for both streaming and batch processing with the same code.



Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream 
pipeline will work with no modification if I plug an unbounded source 
to it.





# Regarding the migration of Join / Reduce

First I think Reduce is always supported and users could write 
dataStream.keyBy().reduce(xx)
directly, and  if batch  execution mode is set, the reduce will not 
be executed in a incremental way,
instead is acts much  like sort-based  aggregation in the traditional 
batch processing framework.


Regarding Join, although the issue of FLINK-22587 indeed exists: 
current join has to be bound
to a window and the GlobalWindow does not work properly, but with 
some more try currently
it does not need users to  re-write the whole join from scratch: 
Users could write a dedicated
window assigner that assigns all the  records to the same window 
instance  and return
EventTimeTrigger.create() as the default event-time trigger [2]. Then 
it works


source1.join(source2)
                .where(a -> a.f0)
                .equalTo(b -> b.f0)
                .window(new EndOfStreamWindows())
                .apply();

It does not requires records have event-time attached since the 
trigger of window is only
relying on the time range of the window and the assignment does not 
need event-time either.


The behavior of the join is also similar to sort-based join if batch 
mode is enabled.


Of course it is not easy to use to let users do the workaround and 
we'll try to fix this issue in 1.17.



Yes, this is a better workaround than the manual state-based join that 
I proposed. I tried it and it works perfectly with similar 
performance. Thanks.




# Regarding support of Sort / Limit

Currently these two operators are indeed not supported in the 
DataStream API directly. One initial
though for these two operations are that users may convert the 
DataStream to Table API and use

Table API for these two operators:

DataStream xx = ... // Keeps the customized logic in DataStream
Table tableXX = tableEnv.fromDataStream(dataStream);
tableXX.orderBy($("a").asc());



Yes I knew that workaround but I decided not to use it because I have 
a special SQL based implementation (for comparison reasons) so I did 
not want to mix SQL and DataStream APIs in the same pipeline.




How do you think about this option? We are also assessing if the 
combination of DataStream
API / Table API is sufficient for all the batch users. Any 
suggestions are warmly welcome.



I guess that outside of my use case of comparing the performance of 
the 3 Flink APIs 

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

2022-11-09 Thread Etienne Chauchot

Hi Yun Gao,

thanks for your email and your review !

My comments are inline

Le 08/11/2022 à 06:51, Yun Gao a écrit :

Hi Etienne,

Very thanks for the article! Flink is currently indeed keeping 
increasing the
ability of unified batch / stream processing with the same api, and 
its a great
pleasure that more and more users are trying this functionality. But I 
also

have some questions regarding some details.

First IMO, as a whole for the long run Flink will have two unified 
APIs, namely Table / SQL
API and DataStream API. Users could express the computation logic with 
these two APIs

for both bounded and unbounded data processing.



Yes that is what I understood also throughout the discussions and jiras. 
And I also think IMHO that reducing the number of APIs to 2 was the good 
move.




Underlying Flink provides two
execution modes:  the streaming mode works with both bounded and 
unbounded data,
and it executes in a way of incremental processing based on state; the 
batch mode works
only with bounded data, and it executes in a ways level-by-level 
similar to the traditional

batch processing frameworks. Users could switch the execution mode via
EnvironmentSettings.inBatchMode() for 
StreamExecutionEnvironment.setRuntimeMode().


As recommended in Flink docs(1) I have enabled the batch mode as I 
though it would be more efficient on my bounded pipeline but as a matter 
of fact the streaming mode seems to be more efficient on my use case. 
I'll test with higher volumes to confirm.





Specially for DataStream, as implemented in FLIP-140, currently all 
the existing DataStream
operation supports the batch execution mode in a unified way[1]:  data 
will be sorted for the
keyBy() edges according to the key, then the following operations like 
reduce() could receive
all the data belonging to the same key consecutively, then it could 
directly reducing the records
of the same key without maintaining the intermediate states. In this 
way users could write the

same code for both streaming and batch processing with the same code.



Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream 
pipeline will work with no modification if I plug an unbounded source to it.





# Regarding the migration of Join / Reduce

First I think Reduce is always supported and users could write 
dataStream.keyBy().reduce(xx)
directly, and  if batch  execution mode is set, the reduce will not be 
executed in a incremental way,
instead is acts much  like sort-based  aggregation in the traditional 
batch processing framework.


Regarding Join, although the issue of FLINK-22587 indeed exists: 
current join has to be bound
to a window and the GlobalWindow does not work properly, but with some 
more try currently
it does not need users to  re-write the whole join from scratch: Users 
could write a dedicated
window assigner that assigns all the records to the same window 
instance  and return
EventTimeTrigger.create() as the default event-time trigger [2]. Then 
it works


source1.join(source2)
                .where(a -> a.f0)
                .equalTo(b -> b.f0)
                .window(new EndOfStreamWindows())
                .apply();

It does not requires records have event-time attached since the 
trigger of window is only
relying on the time range of the window and the assignment does not 
need event-time either.


The behavior of the join is also similar to sort-based join if batch 
mode is enabled.


Of course it is not easy to use to let users do the workaround and 
we'll try to fix this issue in 1.17.



Yes, this is a better workaround than the manual state-based join that I 
proposed. I tried it and it works perfectly with similar performance. 
Thanks.




# Regarding support of Sort / Limit

Currently these two operators are indeed not supported in the 
DataStream API directly. One initial
though for these two operations are that users may convert the 
DataStream to Table API and use

Table API for these two operators:

DataStream xx = ... // Keeps the customized logic in DataStream
Table tableXX = tableEnv.fromDataStream(dataStream);
tableXX.orderBy($("a").asc());



Yes I knew that workaround but I decided not to use it because I have a 
special SQL based implementation (for comparison reasons) so I did not 
want to mix SQL and DataStream APIs in the same pipeline.




How do you think about this option? We are also assessing if the 
combination of DataStream
API / Table API is sufficient for all the batch users. Any suggestions 
are warmly welcome.



I guess that outside of my use case of comparing the performance of the 
3 Flink APIs (broader subject than this article), users can easily mix 
the APIs in the same pipeline. If we really want to have these 
operations in the DataStream API maybe wrapping state-based 
implementations could be good if their performance meets our expectations.




Best,
Yun Gao


I'll update the article and the code with your suggestions. Thanks again.

[1] 

Re: How to write custom serializer for dynamodb connector

2022-11-09 Thread Danny Cranmer
Hey Matt,

Thanks for the feedback, I have updated the SinkIntoDynamoDb [1] sample to
avoid this in future. We have recently added support for @DynamoDbBean
annotated POJOs which you might find interesting. This removes the need to
create a custom ElementConverter all together,
see SinkDynamoDbBeanIntoDynamoDb [2].

Thanks Hong for looking in to this!

[1]
https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java
[2]
https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkDynamoDbBeanIntoDynamoDb.java

Thanks,

On Tue, Nov 8, 2022 at 9:06 PM Matt Fysh  wrote:

> Thanks Hong, I moved the AttributeValue creation into the ElementConverter
> and it started working without any custom serde work!
>
> The reason for creating AttributeValue instances in a previous operator is
> that I was closely following the example code:
> https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java
>
> Thank you again for your help and sharing those resources.
>
> Cheers,
> Matt.
>
>
> On Wed, 9 Nov 2022 at 03:51, Teoh, Hong  wrote:
>
>> Hi Matt,
>>
>>
>>
>> First of all, awesome that you are using the DynamoDB sink!
>>
>>
>>
>> To resolve your issue with serialization in the DDB sink, you are right,
>> the issue only happens when you create the AttributeValue object in a
>> previous operator and send it to the sink.
>>
>> The issue here is with serializing of ImmutableMap. Kryo tries to call
>> the put(), which is unsupported since its immutable, so you can register a
>> specific serializer for it. Like below:
>>
>>
>>
>> env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class,
>> ImmutableMapSerializer.class);
>>
>>
>>
>> You can get ImmutableMapSerializer.class from a pre-package library like
>> this: https://github.com/magro/kryo-serializers
>>
>> Just add the following to your pom.xml
>>
>>
>>
>> 
>>
>> de.javakaffee
>>
>> kryo-serializers
>>
>> 0.45
>>
>> 
>>
>>
>>
>> Regarding resources, I find the following helpful:
>>
>>- Article on serialization
>>- The FlinkForward youtube channel has a couple of useful deep dives
>>on Flink in general :
>>https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA/playlists
>>
>>
>>
>> Hope the above helps.
>>
>>
>>
>>
>>
>> A more general question on your use case, what is the reason you want to
>> generate the AttributeValue in a previous operator rather than in the sink
>> directly? Is it for some dynamic generation of objects to write into DDB?
>>
>>
>>
>> Regards,
>>
>> Hong
>>
>>
>>
>>
>>
>> *From: *Matt Fysh 
>> *Date: *Tuesday, 8 November 2022 at 14:04
>> *To: *User 
>> *Subject: *[EXTERNAL] How to write custom serializer for dynamodb
>> connector
>>
>>
>>
>> *CAUTION*: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> I'm attempting to use the dynamodb sink located at
>> https://github.com/apache/flink-connector-aws
>>
>>
>>
>> The example
>> 
>> in the repo is working as expected, however when I try to create a nested
>> data structure, I receive a Kryo serialization error message:
>>
>>
>>
>> Caused by: com.esotericsoftware.kryo.KryoException:
>> java.lang.UnsupportedOperationException
>>
>> Serialization trace:
>>
>> m (software.amazon.awssdk.services.dynamodb.model.AttributeValue)
>>
>> at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>
>>
>>
>> The value that cannot be serialized is produced by this code:
>>
>> import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
>>
>>
>>
>> AttributeValue.builder().m(
>>
>>   ImmutableMap.of(
>>
>> "innerkey", AttributeValue.builder().s("innervalue").build()
>>
>>   )
>>
>> ).build();
>>
>>
>>
>> There are tests in the connector repo
>> 
>> for nested map structures, but they do not test that the structure can be
>> ser/de by Flink, which I believe occurs when the operator that produces the
>> value is separate to the sink operator.
>>
>>
>>
>> Given that this is a fairly simple data type, I should be able to
>> register a custom serializer with Flink, but since I'm new to java I'm
>> having trouble making sense of the docs
>>