Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-05 Thread Martijn Visser
I also noticed that we two replies in a separate thread on the User mailing
list, which can be found at
https://lists.apache.org/thread/m5ntl3cj81wg7frbfqg9v75c7hqnxtls.

I've included Clayton and David in this email, to at least centralize the
conversation once more :)



On Wed, Oct 5, 2022 at 11:36 AM Martijn Visser 
wrote:

> @Maciek
>
> I saw that I missed replying to your question:
>
> > Could you please remind what was the conclusion of discussion on
> upgrading Scala to 2.12.15/16?
> > https://lists.apache.org/thread/hwksnsqyg7n3djymo7m1s7loymxxbc3t - I
> couldn't find any follow-up vote?
>
> There is a vote thread, but that never got enough votes. See
> https://lists.apache.org/thread/l93l5qqr5n2oty3r2bjsz3ks3tjf1655
>
> > If it's acceptable to break binary compatibility by such an upgrade,
> then upgrading to JDK17 before 2.0 will be doable?
>
> I'm not sure, because I don't think a discussion and vote has been made
> yet if upgrading JDK17 can/will be done in a Flink 1.0 release or if it
> requires a 2.0 release. It was mentioned in the original discussion thread
> on dropping Java 8 support within 2/3 releases, but if I recall correctly
> there was no discussion yet on when Java 17 support would be added [1].
>
> Best regards,
>
> Martijn
>
> [1] https://lists.apache.org/thread/0fwo7nwzy51gck4vxhyfnbnttd4jycpx
>
> On Wed, Oct 5, 2022 at 6:58 AM Gaël Renoux 
> wrote:
>
>> > I'm curious what target Scala versions people are currently interested
>> in.
>> > I would've expected that everyone wants to migrate to Scala 3, for
>> which several wrapper projects around Flink already exist
>>
>> The Scala 3 tooling is still subpar (we're using IntelliJ), so I'm not
>> sure I would move my team to Scala 3 right now (I'm currently toying with
>> it on a personal project). In addition, moving to Scala 3 is not completely
>> free - you have to do some rewrites, and developers will need some
>> adaptation time. Scala 2.13 is another thing entirely, we've wanted to
>> migrate for a long while.
>>
>> On Wed, Oct 5, 2022 at 12:53 PM Chesnay Schepler 
>> wrote:
>>
>>> > It's possible that for the sake of the Scala API, we would
>>> occasionally require some changes in the Java API. As long as those changes
>>> are not detrimental to Java users, they should be considered.
>>>
>>> That's exactly the model we're trying to get to. Don't fix
>>> scala-specific issues with scala code, but rather on the Java side as much
>>> as possible which could also benefit other JVM languages (e.g., Kotlin).
>>>
>>> > A question regarding the Flink wrapper: would it be possible to keep
>>> it under the Flink project's umbrella? Or does it need to be a completely
>>> separate structure? I'm not aware of the full organizational implications
>>> of this, I'm afraid.
>>>
>>> Technically it can be under the Flink umbrella, but then Flink would
>>> still be (at least for a while) be the bottleneck because we'd have to
>>> review any changes coming in.
>>> That would only improve once several new committers were added to take
>>> care of this project.
>>> (In the end you'd just split Flink and the Scala API _codebases_, but
>>> achieve little else)
>>>
>>> > And if that is what it takes to move beyond Scala 2.12.7… This has
>>> been a big pain point for us.
>>>
>>> I'm curious what target Scala versions people are currently interested
>>> in.
>>> I would've expected that everyone wants to migrate to Scala 3, for which
>>> several wrapper projects around Flink already exist.
>>>
>>> On 05/10/2022 12:35, Gaël Renoux wrote:
>>>
>>> Hello everyone,
>>>
>>> I've already answered a bit on Twitter, I'll develop my thoughts a bit
>>> here. For context, my company (DataDome) has a significant codebase on
>>> Scala Flink (around 110K LOC), having been using it since 2017. I myself am
>>> an enthusiastic Scala developer (I don't think I'd like moving back to
>>> Java)
>>>
>>> Given that, I think separating the Scala support from Flink is actually
>>> a good move long term. We would then have a full-Java Flink, and a separate
>>> Scala wrapper. It would help a lot in solving the skills issue: Flink
>>> maintainers would no longer need to be fluent in Scala, and maintainers of
>>> the Scala wrapper would not need a deep knowledge of Flink's inner
>>> workings, just the API would be sufficient. And if that is what it takes to
>>> move beyond Scala 2.12.7… This has been a big pain point for us.
>>>
>>> I'm not too worried about finding contributors for the Scala wrapper.
>>> Within my company, we have developed additional wrappers and extension
>>> methods (for parts where we felt the Flink Scala API was insufficient), and
>>> we've been looking at ways we could contribute back. What held us back was
>>> our lack of knowledge of the full Flink environment (we're only using the
>>> Scala Datastream API). I don't think we're the only ones in that situation.
>>> One major point, though, is that Flink developers would not be completely
>>> rid of us ;-) 

Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-05 Thread David Anderson
I want to clarify one point here, which is that modifying jobs written in
Scala to use Flink's Java API does not require porting them to Java. I can
readily understand why folks using Scala might rather use Java 17 than Java
11, but sticking to Scala will remain an option even if Flink's Scala API
goes away.

For more on this, see [1] and some of the examples it points to, such as
those in [2].

[1] https://flink.apache.org/2022/02/22/scala-free.html
[2] https://github.com/sjwiesman/flink-scala-3

On Tue, Oct 4, 2022 at 6:16 PM Clayton Wohl  wrote:

> +1
>
> At my employer, we maintain several Flink jobs in Scala. We've been
> writing newer jobs in Java, and we'd be fine with porting our Scala jobs
> over to the Java API.
>
> I'd like to request Java 17 support. Specifically, Java records is a
> feature our Flink code would use a lot of and make the Java syntax much
> nicer.
>


Re: ClassNotFoundException when loading protobuf message class in Flink SQL

2022-10-05 Thread James McGuire via user
Thanks for the tip, I will use the flink-sql-connector-kafka jar instead.

I was able to get it to work by moving the SimpleTest.jar inside the lib
folder.  I am not sure why this worked but passing in the jar with the
--jar flag did not work.

Thanks,
James

On Tue, Oct 4, 2022 at 7:32 PM Benchao Li  wrote:

> Hi James,
>
> Your steps seem right. Could you check your jar file
> '~/repos/simple_protobuf/SimpleTest/SimpleTest.jar'
> that it does contain 'com.example.SimpleTest.class'?
>
> Besides that, to use Kafka connector in sql-client, you should use
> 'flink-sql-connector-kafka' instead of
> 'flink-connector-kafka'.
>
>
> James McGuire via user  于2022年10月5日周三 07:21写道:
>
>> Hi Flink Community,
>> I am trying to prove out the new protobuf functionality added to 1.16
>> ([1]).  I have built master locally and have attempted following the
>> Protobuf Format doc ([2]) to create a table with the kafka connector using
>> the protobuf format.
>>
>> I compiled the sample .proto file using protoc version 3.2.0, compiled
>> the .java output files using javac, linking to protobuf-java-3.5.1.jar
>> (using earlier versions gives me compiler errors
>> about UnusedPrivateParameter) and packaged the resulting class files into
>> SimpleTest.jar.
>>
>> However, when I try to select the table, I get the following error:
>> % ./sql-client.sh --jar ~/repos/simple_protobuf/SimpleTest/SimpleTest.jar
>> --jar
>> ~/repos/flink/flink-connectors/flink-connector-kafka/target/flink-connector-kafka-1.17-SNAPSHOT.jar
>> --jar
>> ~/repos/flink/flink-formats/flink-sql-protobuf/target/flink-sql-protobuf-1.17-SNAPSHOT.jar
>> Flink SQL> CREATE TABLE simple_test (
>> >   uid BIGINT,
>> >   name STRING,
>> >   category_type INT,
>> >   content BINARY,
>> >   price DOUBLE,
>> >   value_map map>,
>> >   value_arr array>,
>> >   corpus_int INT,
>> >   corpus_str STRING
>> > ) WITH (
>> >  'connector' = 'kafka',
>> >  'topic' = 'user_behavior',
>> >  'properties.bootstrap.servers' = 'localhost:9092',
>> >  'properties.group.id
>> '
>> = 'testGroup',
>> >  'format' = 'protobuf',
>> >  'protobuf.message-class-name' = 'com.example.SimpleTest',
>> >  'protobuf.ignore-parse-errors' = 'true'
>> > )
>> > ;
>> [INFO] Execute statement succeed.
>>
>> Flink SQL> select * from simple_test;
>> [ERROR] Could not execute SQL statement. Reason:
>> java.lang.ClassNotFoundException: com.example.SimpleTest
>>
>> Flink SQL>
>>
>> Any advice greatly appreciated, thank you.
>>
>> [1]
>> https://github.com/apache/flink/commit/5c87b69b5300e8678629aa8b769d60ec2fdbf3d1
>> 
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/protobuf/
>> 
>>
>
>
> --
>
> Best,
> Benchao Li
>


-- 

*James McGuire*
Staff Software Engineer
  |  james.mcgu...@procore.com <%7BmainEmail%7D>  |  procore.com


*Discover the tools that are driving results for construction leaders
across the globe.* Download the 2022 ROI Report.


[image: Procore]
LinkedIn   |
Facebook   |  Instagram
  |  Twitter
  |  YouTube

This email is intended only for the person(s) named in the message header
and, unless otherwise indicated, contains confidential and/or privileged
information. If you have received this message in error, please notify the
sender of the error and delete the message.


Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-05 Thread Martijn Visser
@Maciek

I saw that I missed replying to your question:

> Could you please remind what was the conclusion of discussion on
upgrading Scala to 2.12.15/16?
> https://lists.apache.org/thread/hwksnsqyg7n3djymo7m1s7loymxxbc3t - I
couldn't find any follow-up vote?

There is a vote thread, but that never got enough votes. See
https://lists.apache.org/thread/l93l5qqr5n2oty3r2bjsz3ks3tjf1655

> If it's acceptable to break binary compatibility by such an upgrade, then
upgrading to JDK17 before 2.0 will be doable?

I'm not sure, because I don't think a discussion and vote has been made yet
if upgrading JDK17 can/will be done in a Flink 1.0 release or if it
requires a 2.0 release. It was mentioned in the original discussion thread
on dropping Java 8 support within 2/3 releases, but if I recall correctly
there was no discussion yet on when Java 17 support would be added [1].

Best regards,

Martijn

[1] https://lists.apache.org/thread/0fwo7nwzy51gck4vxhyfnbnttd4jycpx

On Wed, Oct 5, 2022 at 6:58 AM Gaël Renoux  wrote:

> > I'm curious what target Scala versions people are currently interested
> in.
> > I would've expected that everyone wants to migrate to Scala 3, for which
> several wrapper projects around Flink already exist
>
> The Scala 3 tooling is still subpar (we're using IntelliJ), so I'm not
> sure I would move my team to Scala 3 right now (I'm currently toying with
> it on a personal project). In addition, moving to Scala 3 is not completely
> free - you have to do some rewrites, and developers will need some
> adaptation time. Scala 2.13 is another thing entirely, we've wanted to
> migrate for a long while.
>
> On Wed, Oct 5, 2022 at 12:53 PM Chesnay Schepler 
> wrote:
>
>> > It's possible that for the sake of the Scala API, we would occasionally
>> require some changes in the Java API. As long as those changes are not
>> detrimental to Java users, they should be considered.
>>
>> That's exactly the model we're trying to get to. Don't fix scala-specific
>> issues with scala code, but rather on the Java side as much as possible
>> which could also benefit other JVM languages (e.g., Kotlin).
>>
>> > A question regarding the Flink wrapper: would it be possible to keep it
>> under the Flink project's umbrella? Or does it need to be a completely
>> separate structure? I'm not aware of the full organizational implications
>> of this, I'm afraid.
>>
>> Technically it can be under the Flink umbrella, but then Flink would
>> still be (at least for a while) be the bottleneck because we'd have to
>> review any changes coming in.
>> That would only improve once several new committers were added to take
>> care of this project.
>> (In the end you'd just split Flink and the Scala API _codebases_, but
>> achieve little else)
>>
>> > And if that is what it takes to move beyond Scala 2.12.7… This has been
>> a big pain point for us.
>>
>> I'm curious what target Scala versions people are currently interested in.
>> I would've expected that everyone wants to migrate to Scala 3, for which
>> several wrapper projects around Flink already exist.
>>
>> On 05/10/2022 12:35, Gaël Renoux wrote:
>>
>> Hello everyone,
>>
>> I've already answered a bit on Twitter, I'll develop my thoughts a bit
>> here. For context, my company (DataDome) has a significant codebase on
>> Scala Flink (around 110K LOC), having been using it since 2017. I myself am
>> an enthusiastic Scala developer (I don't think I'd like moving back to
>> Java)
>>
>> Given that, I think separating the Scala support from Flink is actually a
>> good move long term. We would then have a full-Java Flink, and a separate
>> Scala wrapper. It would help a lot in solving the skills issue: Flink
>> maintainers would no longer need to be fluent in Scala, and maintainers of
>> the Scala wrapper would not need a deep knowledge of Flink's inner
>> workings, just the API would be sufficient. And if that is what it takes to
>> move beyond Scala 2.12.7… This has been a big pain point for us.
>>
>> I'm not too worried about finding contributors for the Scala wrapper.
>> Within my company, we have developed additional wrappers and extension
>> methods (for parts where we felt the Flink Scala API was insufficient), and
>> we've been looking at ways we could contribute back. What held us back was
>> our lack of knowledge of the full Flink environment (we're only using the
>> Scala Datastream API). I don't think we're the only ones in that situation.
>> One major point, though, is that Flink developers would not be completely
>> rid of us ;-) It's possible that for the sake of the Scala API, we would
>> occasionally require some changes in the Java API. As long as those changes
>> are not detrimental to Java users, they should be considered.
>>
>> A question regarding the Flink wrapper: would it be possible to keep it
>> under the Flink project's umbrella? Or does it need to be a completely
>> separate structure? I'm not aware of the full organizational implications
>> of this, I'm afraid.

Re: KafkaSource, consumer assignment, and Kafka ‘stretch’ clusters for multi-DC streaming apps

2022-10-05 Thread Martijn Visser
Hi Andrew,

While definitely no expert on this topic, my first thought was if this idea
could be solved with the idea that was proposed in FLIP-246
https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source

I'm also looping in Mason Chen who was the initiator of that FLIP :)

Best regards,

Martijn

On Wed, Oct 5, 2022 at 10:00 AM Andrew Otto  wrote:

> (Ah, note that I am considering very simple streaming apps here, e.g.
> event enrichment apps.  No windowing or complex state.  The only state is
> the Kafka offsets, which I suppose would also have to be managed from
> Kafka, not from Flink state.)
>
>
>
> On Wed, Oct 5, 2022 at 9:54 AM Andrew Otto  wrote:
>
>> Hi all,
>>
>> *tl;dr: Would it be possible to make a KafkaSource that uses Kafka's
>> built in consumer assignment for Flink tasks?*
>>
>> At the Wikimedia Foundation we are evaluating
>>  whether we can use a Kafka
>> 'stretch' cluster to simplify the multi-datacenter deployment architecture
>> of streaming applications.
>>
>> A Kafka stretch cluster is one in which the brokers span multiple
>> datacenters, relying on the usual Kafka broker replication for multi DC
>> replication (rather than something like Kafka MirrorMaker).  This is
>> feasible with Kafka today mostly because of follower fetching
>> 
>> support, allowing consumers to be assigned to consume from partitions that
>> are 'closest' to them, e.g. in the same 'rack' (or DC :) ).
>>
>> Having a single Kafka cluster makes datacenter failover for streaming
>> applications a little bit simpler, as there is only one set of offsets to
>> use when saving state.  We can run a streaming app in active/passive mode.
>> This would allow us to stop the app in one datacenter, and then start it up
>> in another, using the same state snapshot and same Kafka cluster.
>>
>> But, this got me wondering...would it be possible to run a streaming app
>> in an active/active mode, where in normal operation, half of the work was
>> being done in each DC, and in failover, all of the work would automatically
>> failover to the online DC.
>>
>> I don't think running a single Flink application cross DC would work
>> well; there's too much inter-node traffic happening, and the Flink tasks
>> don't have any DC awareness.
>>
>> But would it be possible to run two separate streaming applications in
>> each DC, but in the *same Kafka consumer group*? I believe that, if the
>> streaming app was using Kafka's usual consumer assignment and rebalancing
>> protocol, it would.  Kafka would just see clients connecting from either DC
>> in the same consumer group, and assign each consumer an equal number of
>> partitions to consume, resulting in equal partition balancing in DCs.  If
>> we shut down one of the streaming apps, Kafka would automatically rebalance
>> the consumers in the consumer group, assigning all of the work to the
>> remaining streaming app in the other DC.
>>
>> I got excited about this possibility, only to learn that Flink's
>> KafkaSource does not use Kafka for consumer assignment.  I think I
>> understand why it does this: the Source API can do a lot more than Kafka,
>> so having some kind of state management (offsets) and task assignment
>> (Kafka consumer balance protocol) outside of the usual Flink Source would
>> be pretty weird.  Implementing offset and task assignment inside of the
>> KafkaSource allows it to work like any other Source implementation.
>>
>> However, this active/active multi DC streaming app idea seems pretty
>> compelling, as it would greatly reduce operator/SRE overhead.  It seems to
>> me that any Kafka streaming app that did use Kafka's built in consumer
>> assignment protocol (like Kafka Streams) would be deployable in this way.
>> But in Flink this is not possible because of the way it assigns tasks.
>>
>> I'm writing this email to see what others think about this, and wonder if
>> it might be possible to implement a KafkaSource that assigned tasks using
>> Kafka's usual consumer assignment protocol.  Hopefully someone more
>> knowledgeable about Sources and TaskSplits, etc. could advise here.
>>
>> Thank you!
>>
>> - Andrew Otto
>>   Wikimedia Foundation
>>
>>
>>


Re: KafkaSource, consumer assignment, and Kafka ‘stretch’ clusters for multi-DC streaming apps

2022-10-05 Thread Andrew Otto
(Ah, note that I am considering very simple streaming apps here, e.g. event
enrichment apps.  No windowing or complex state.  The only state is the
Kafka offsets, which I suppose would also have to be managed from Kafka,
not from Flink state.)



On Wed, Oct 5, 2022 at 9:54 AM Andrew Otto  wrote:

> Hi all,
>
> *tl;dr: Would it be possible to make a KafkaSource that uses Kafka's built
> in consumer assignment for Flink tasks?*
>
> At the Wikimedia Foundation we are evaluating
>  whether we can use a Kafka
> 'stretch' cluster to simplify the multi-datacenter deployment architecture
> of streaming applications.
>
> A Kafka stretch cluster is one in which the brokers span multiple
> datacenters, relying on the usual Kafka broker replication for multi DC
> replication (rather than something like Kafka MirrorMaker).  This is
> feasible with Kafka today mostly because of follower fetching
> 
> support, allowing consumers to be assigned to consume from partitions that
> are 'closest' to them, e.g. in the same 'rack' (or DC :) ).
>
> Having a single Kafka cluster makes datacenter failover for streaming
> applications a little bit simpler, as there is only one set of offsets to
> use when saving state.  We can run a streaming app in active/passive mode.
> This would allow us to stop the app in one datacenter, and then start it up
> in another, using the same state snapshot and same Kafka cluster.
>
> But, this got me wondering...would it be possible to run a streaming app
> in an active/active mode, where in normal operation, half of the work was
> being done in each DC, and in failover, all of the work would automatically
> failover to the online DC.
>
> I don't think running a single Flink application cross DC would work well;
> there's too much inter-node traffic happening, and the Flink tasks don't
> have any DC awareness.
>
> But would it be possible to run two separate streaming applications in
> each DC, but in the *same Kafka consumer group*? I believe that, if the
> streaming app was using Kafka's usual consumer assignment and rebalancing
> protocol, it would.  Kafka would just see clients connecting from either DC
> in the same consumer group, and assign each consumer an equal number of
> partitions to consume, resulting in equal partition balancing in DCs.  If
> we shut down one of the streaming apps, Kafka would automatically rebalance
> the consumers in the consumer group, assigning all of the work to the
> remaining streaming app in the other DC.
>
> I got excited about this possibility, only to learn that Flink's
> KafkaSource does not use Kafka for consumer assignment.  I think I
> understand why it does this: the Source API can do a lot more than Kafka,
> so having some kind of state management (offsets) and task assignment
> (Kafka consumer balance protocol) outside of the usual Flink Source would
> be pretty weird.  Implementing offset and task assignment inside of the
> KafkaSource allows it to work like any other Source implementation.
>
> However, this active/active multi DC streaming app idea seems pretty
> compelling, as it would greatly reduce operator/SRE overhead.  It seems to
> me that any Kafka streaming app that did use Kafka's built in consumer
> assignment protocol (like Kafka Streams) would be deployable in this way.
> But in Flink this is not possible because of the way it assigns tasks.
>
> I'm writing this email to see what others think about this, and wonder if
> it might be possible to implement a KafkaSource that assigned tasks using
> Kafka's usual consumer assignment protocol.  Hopefully someone more
> knowledgeable about Sources and TaskSplits, etc. could advise here.
>
> Thank you!
>
> - Andrew Otto
>   Wikimedia Foundation
>
>
>


KafkaSource, consumer assignment, and Kafka ‘stretch’ clusters for multi-DC streaming apps

2022-10-05 Thread Andrew Otto
Hi all,

*tl;dr: Would it be possible to make a KafkaSource that uses Kafka's built
in consumer assignment for Flink tasks?*

At the Wikimedia Foundation we are evaluating
 whether we can use a Kafka
'stretch' cluster to simplify the multi-datacenter deployment architecture
of streaming applications.

A Kafka stretch cluster is one in which the brokers span multiple
datacenters, relying on the usual Kafka broker replication for multi DC
replication (rather than something like Kafka MirrorMaker).  This is
feasible with Kafka today mostly because of follower fetching

support, allowing consumers to be assigned to consume from partitions that
are 'closest' to them, e.g. in the same 'rack' (or DC :) ).

Having a single Kafka cluster makes datacenter failover for streaming
applications a little bit simpler, as there is only one set of offsets to
use when saving state.  We can run a streaming app in active/passive mode.
This would allow us to stop the app in one datacenter, and then start it up
in another, using the same state snapshot and same Kafka cluster.

But, this got me wondering...would it be possible to run a streaming app in
an active/active mode, where in normal operation, half of the work was
being done in each DC, and in failover, all of the work would automatically
failover to the online DC.

I don't think running a single Flink application cross DC would work well;
there's too much inter-node traffic happening, and the Flink tasks don't
have any DC awareness.

But would it be possible to run two separate streaming applications in each
DC, but in the *same Kafka consumer group*? I believe that, if the
streaming app was using Kafka's usual consumer assignment and rebalancing
protocol, it would.  Kafka would just see clients connecting from either DC
in the same consumer group, and assign each consumer an equal number of
partitions to consume, resulting in equal partition balancing in DCs.  If
we shut down one of the streaming apps, Kafka would automatically rebalance
the consumers in the consumer group, assigning all of the work to the
remaining streaming app in the other DC.

I got excited about this possibility, only to learn that Flink's
KafkaSource does not use Kafka for consumer assignment.  I think I
understand why it does this: the Source API can do a lot more than Kafka,
so having some kind of state management (offsets) and task assignment
(Kafka consumer balance protocol) outside of the usual Flink Source would
be pretty weird.  Implementing offset and task assignment inside of the
KafkaSource allows it to work like any other Source implementation.

However, this active/active multi DC streaming app idea seems pretty
compelling, as it would greatly reduce operator/SRE overhead.  It seems to
me that any Kafka streaming app that did use Kafka's built in consumer
assignment protocol (like Kafka Streams) would be deployable in this way.
But in Flink this is not possible because of the way it assigns tasks.

I'm writing this email to see what others think about this, and wonder if
it might be possible to implement a KafkaSource that assigned tasks using
Kafka's usual consumer assignment protocol.  Hopefully someone more
knowledgeable about Sources and TaskSplits, etc. could advise here.

Thank you!

- Andrew Otto
  Wikimedia Foundation


Re: How to rebalance a Flink streaming table?

2022-10-05 Thread Yaroslav Tkachenko
Hey Pavel,

I was looking for something similar a while back and the best thing I came
up with was using the DataStream API to do all the shuffling and THEN
converting the stream to a table using fromDataStream/fromChangelogStream.

On Wed, Oct 5, 2022 at 4:54 AM Pavel Penkov  wrote:

> I have a table that reads a Kafka topic and effective parallelism is equal
> to the number of Kafka partitions. Is there a way to reshuffle the data
> like with DataStream API to increase effective parallelism?
>


Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-05 Thread Gaël Renoux
> I'm curious what target Scala versions people are currently interested
in.
> I would've expected that everyone wants to migrate to Scala 3, for which
several wrapper projects around Flink already exist

The Scala 3 tooling is still subpar (we're using IntelliJ), so I'm not sure
I would move my team to Scala 3 right now (I'm currently toying with it on
a personal project). In addition, moving to Scala 3 is not completely free
- you have to do some rewrites, and developers will need some adaptation
time. Scala 2.13 is another thing entirely, we've wanted to migrate for a
long while.

On Wed, Oct 5, 2022 at 12:53 PM Chesnay Schepler  wrote:

> > It's possible that for the sake of the Scala API, we would occasionally
> require some changes in the Java API. As long as those changes are not
> detrimental to Java users, they should be considered.
>
> That's exactly the model we're trying to get to. Don't fix scala-specific
> issues with scala code, but rather on the Java side as much as possible
> which could also benefit other JVM languages (e.g., Kotlin).
>
> > A question regarding the Flink wrapper: would it be possible to keep it
> under the Flink project's umbrella? Or does it need to be a completely
> separate structure? I'm not aware of the full organizational implications
> of this, I'm afraid.
>
> Technically it can be under the Flink umbrella, but then Flink would still
> be (at least for a while) be the bottleneck because we'd have to review any
> changes coming in.
> That would only improve once several new committers were added to take
> care of this project.
> (In the end you'd just split Flink and the Scala API _codebases_, but
> achieve little else)
>
> > And if that is what it takes to move beyond Scala 2.12.7… This has been
> a big pain point for us.
>
> I'm curious what target Scala versions people are currently interested in.
> I would've expected that everyone wants to migrate to Scala 3, for which
> several wrapper projects around Flink already exist.
>
> On 05/10/2022 12:35, Gaël Renoux wrote:
>
> Hello everyone,
>
> I've already answered a bit on Twitter, I'll develop my thoughts a bit
> here. For context, my company (DataDome) has a significant codebase on
> Scala Flink (around 110K LOC), having been using it since 2017. I myself am
> an enthusiastic Scala developer (I don't think I'd like moving back to
> Java)
>
> Given that, I think separating the Scala support from Flink is actually a
> good move long term. We would then have a full-Java Flink, and a separate
> Scala wrapper. It would help a lot in solving the skills issue: Flink
> maintainers would no longer need to be fluent in Scala, and maintainers of
> the Scala wrapper would not need a deep knowledge of Flink's inner
> workings, just the API would be sufficient. And if that is what it takes to
> move beyond Scala 2.12.7… This has been a big pain point for us.
>
> I'm not too worried about finding contributors for the Scala wrapper.
> Within my company, we have developed additional wrappers and extension
> methods (for parts where we felt the Flink Scala API was insufficient), and
> we've been looking at ways we could contribute back. What held us back was
> our lack of knowledge of the full Flink environment (we're only using the
> Scala Datastream API). I don't think we're the only ones in that situation.
> One major point, though, is that Flink developers would not be completely
> rid of us ;-) It's possible that for the sake of the Scala API, we would
> occasionally require some changes in the Java API. As long as those changes
> are not detrimental to Java users, they should be considered.
>
> A question regarding the Flink wrapper: would it be possible to keep it
> under the Flink project's umbrella? Or does it need to be a completely
> separate structure? I'm not aware of the full organizational implications
> of this, I'm afraid.
>
> Finally, the hard part would be the migration to the new version. My dream
> solution would be to have the existing Scala API be entirely converted into
> a Scala wrapper over the Java API. That way, migration would be pretty
> minimal: add a dependency, change the imports for the Scala API, and we're
> done. However, even starting from the existing flink4s project, that's
> still quite a lot of work. So, more realistically, I'd settle for at least
> a partial implementation. We would have some broken code that we could fix,
> but at the very least I'd like the basic DataStream functions (process,
> uid, name…) to be available.
>
> Thanks for all the work that went into making Flink what it is!
>
>
> Gaël Renoux - Lead R Engineer
> E - gael.ren...@datadome.co
> W - www.datadome.co
>
>
>
> On Wed, Oct 5, 2022 at 9:30 AM Maciek Próchniak  wrote:
>
>> Hi Martin,
>>
>> Could you please remind what was the conclusion of discussion on
>> upgrading Scala to 2.12.15/16?
>> https://lists.apache.org/thread/hwksnsqyg7n3djymo7m1s7loymxxbc3t - I
>> couldn't find any follow-up vote?
>>
>> If it's 

Support Checkpointing in MongoDB Changestream Source(s)

2022-10-05 Thread Armin Schnabel
Dear Flink community,

my Flink pipeline listens on inserts into MongoDB GridFS and processes the
inserted document which is then written into a MongoDB Sink again, which is
working so far.

Now I want to enable Checkpointing but after reading the documentations,
training slides, stackoverflow, archives, etc. for a day I still don't know
how to make our source support checkpointing.

I.e. my main question is:
1. How can I enable our source to be rewindable so that the Changestreams
are continued where the pipeline crashes? (see pipeline code below)

Side questions/notes:
2. Is 1. possible with reasonable amount of work or would you recommend
adding Kafka between Flink and MongoDB?
*(We're planning on replace MongoDB GridFS with Google Object Storage
soon, as our documents, currently being a few MB will become GB-large,
soon.)*
3. I noticed that there are existing community MongoDB Connectors, but I
don't think they're made for the Changestream use-case of ours:
   a. [FLINK-6573] PR#20848
   b. gihub/mongo-flink/mongo-flink


CODE:

Watch GridFS inserts on both collections, fs.files and fs.chunks with:
```
class MongoDBSource extends
RichSourceFunction> {
  void run(...) {
collection.watch(operationType: insert)   .iterator()
.forEachRemaining(ctx::collect)
  }
}
```

Join the two changestreams and process the events:
```
DataStream stream(...) {
  chunks = new MongoDBSource(fs.chunks)
  files = new MongoDBSource(fs.files)

  env.addSource(chunks).addSource(files)

  files.join(chunks).where(filesKeySelector).where(chunksFilesKeySelector)

  .window(withGap(1s))
  .apply(new JoinFunction() { join() { return new Tuple2<>(filesChange,
chunkChange) }

  .process(new DocumentsMerger())
}
```

Load the files from GridFS to process it:
```
class DocumentsMerger extends new
ProcessFunction, ChangeStreamDocument<>>,
XYZ> {
  processElement(changeTupel, ctx, Collector out) {

// Wait for GridFS `fs.files` and all GridFS `fs.chunks` to be available
gridFSBucket.find(files_id)
if (measurementChunkIndex != numOfMeasurementChunks - 1) return

dataStream = gridFSBucket.openDownloadStream(files_id)
XYZ = dataStream.process()
out.collect(XYZ)
  }
}
```


I hope this is the right place for this question, but I can also move this
question to Stackoverflow and reference the mailing list there if you want,
to be easier found by others.

Regards
Armin


Re: Flink FaultTolerant at operator level

2022-10-05 Thread Krzysztof Chmielewski
I had a similar use case.

What we did is that we decided that data for enrichment must be versioned,
for example our enrichment data was "refreshed" once a day and we kept old
data.
During the enrichment process we lookup data for given version based on
record's metadata.

Regards.
Krzysztof Chmielewski

śr., 5 paź 2022 o 10:25 Great Info  napisał(a):

> I have flink job and the current flow looks like below
>
> Source_Kafka->*operator-1*(key by partition)->*operator-2*(enrich the
> record)-*Sink1-Operator* & *Sink2-Operator *
>
> With this flow the current problem is at operator-2, the core logic runs
> here is to lookup some reference status data from redis cache and enrich
> the stream, this works fine if job runs well but recently I saw if job
> failed at this operator or sink operators, entire jobs gets restarts and
> stream gets repossessed from source, that causes different
> reference status(if reference status in cache changes during this restart)
> in enrichment, as per the business requirement I need to enrich with
> reference status when stream received at my job.
>
> 1. Is there any way to just reprocess sink1,sink2 operators?
> 2. How to just resume Sink2 during some cases like Sink-1 was
> successful where Sink2 failed
>
>
>


Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-05 Thread Chesnay Schepler
> It's possible that for the sake of the Scala API, we would 
occasionally require some changes in the Java API. As long as those 
changes are not detrimental to Java users, they should be considered.


That's exactly the model we're trying to get to. Don't fix 
scala-specific issues with scala code, but rather on the Java side as 
much as possible which could also benefit other JVM languages (e.g., 
Kotlin).


> A question regarding the Flink wrapper: would it be possible to keep 
it under the Flink project's umbrella? Or does it need to be a 
completely separate structure? I'm not aware of the full organizational 
implications of this, I'm afraid.


Technically it can be under the Flink umbrella, but then Flink would 
still be (at least for a while) be the bottleneck because we'd have to 
review any changes coming in.
That would only improve once several new committers were added to take 
care of this project.
(In the end you'd just split Flink and the Scala API _codebases_, but 
achieve little else)


> And if that is what it takes to move beyond Scala 2.12.7… This has 
been a big pain point for us.


I'm curious what target Scala versions people are currently interested in.
I would've expected that everyone wants to migrate to Scala 3, for which 
several wrapper projects around Flink already exist.


On 05/10/2022 12:35, Gaël Renoux wrote:

Hello everyone,

I've already answered a bit on Twitter, I'll develop my thoughts a bit 
here. For context, my company (DataDome) has a significant codebase on 
Scala Flink (around 110K LOC), having been using it since 2017. I 
myself am an enthusiastic Scala developer (I don't think I'd like 
moving back to Java)


Given that, I think separating the Scala support from Flink is 
actually a good move long term. We would then have a full-Java Flink, 
and a separate Scala wrapper. It would help a lot in solving the 
skills issue: Flink maintainers would no longer need to be fluent in 
Scala, and maintainers of the Scala wrapper would not need a deep 
knowledge of Flink's inner workings, just the API would be sufficient. 
And if that is what it takes to move beyond Scala 2.12.7… This has 
been a big pain point for us.


I'm not too worried about finding contributors for the Scala wrapper. 
Within my company, we have developed additional wrappers and extension 
methods (for parts where we felt the Flink Scala API was 
insufficient), and we've been looking at ways we could contribute 
back. What held us back was our lack of knowledge of the full Flink 
environment (we're only using the Scala Datastream API). I don't think 
we're the only ones in that situation. One major point, though, is 
that Flink developers would not be completely rid of us ;-) It's 
possible that for the sake of the Scala API, we would occasionally 
require some changes in the Java API. As long as those changes are not 
detrimental to Java users, they should be considered.


A question regarding the Flink wrapper: would it be possible to keep 
it under the Flink project's umbrella? Or does it need to be a 
completely separate structure? I'm not aware of the full 
organizational implications of this, I'm afraid.


Finally, the hard part would be the migration to the new version. My 
dream solution would be to have the existing Scala API be entirely 
converted into a Scala wrapper over the Java API. That way, migration 
would be pretty minimal: add a dependency, change the imports for the 
Scala API, and we're done. However, even starting from the existing 
flink4s project, that's still quite a lot of work. So, more 
realistically, I'd settle for at least a partial implementation. We 
would have some broken code that we could fix, but at the very least 
I'd like the basic DataStream functions (process, uid, name…) to be 
available.


Thanks for all the work that went into making Flink what it is!


Gaël Renoux - Lead R Engineer
E - gael.ren...@datadome.co
W - www.datadome.co 



On Wed, Oct 5, 2022 at 9:30 AM Maciek Próchniak  wrote:

Hi Martin,

Could you please remind what was the conclusion of discussion on
upgrading Scala to 2.12.15/16?
https://lists.apache.org/thread/hwksnsqyg7n3djymo7m1s7loymxxbc3t -
I couldn't find any follow-up vote?

If it's acceptable to break binary compatibility by such an
upgrade, then upgrading to JDK17 before 2.0 will be doable?


thanks,

maciek


On 04.10.2022 18:21, Martijn Visser wrote:

Hi Yaroslav,

Thanks for the feedback, that's much appreciated! Regarding Java
17 as a prerequisite, we would have to break compatibility
already since Scala 2.12.7 doesn't compile on Java 17 [1].

Given that we can only remove Scala APIs with the next major
Flink (2.0) version, would that still impact you a lot? I do
imagine that if we get to a Flink 2.0 version there would be more
breaking involved anyway. The biggest consequence of deprecating
support for Scala in Flink 1.x would 

Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-05 Thread Gaël Renoux
Hello everyone,

I've already answered a bit on Twitter, I'll develop my thoughts a bit
here. For context, my company (DataDome) has a significant codebase on
Scala Flink (around 110K LOC), having been using it since 2017. I myself am
an enthusiastic Scala developer (I don't think I'd like moving back to
Java)

Given that, I think separating the Scala support from Flink is actually a
good move long term. We would then have a full-Java Flink, and a separate
Scala wrapper. It would help a lot in solving the skills issue: Flink
maintainers would no longer need to be fluent in Scala, and maintainers of
the Scala wrapper would not need a deep knowledge of Flink's inner
workings, just the API would be sufficient. And if that is what it takes to
move beyond Scala 2.12.7… This has been a big pain point for us.

I'm not too worried about finding contributors for the Scala wrapper.
Within my company, we have developed additional wrappers and extension
methods (for parts where we felt the Flink Scala API was insufficient), and
we've been looking at ways we could contribute back. What held us back was
our lack of knowledge of the full Flink environment (we're only using the
Scala Datastream API). I don't think we're the only ones in that situation.
One major point, though, is that Flink developers would not be completely
rid of us ;-) It's possible that for the sake of the Scala API, we would
occasionally require some changes in the Java API. As long as those changes
are not detrimental to Java users, they should be considered.

A question regarding the Flink wrapper: would it be possible to keep it
under the Flink project's umbrella? Or does it need to be a completely
separate structure? I'm not aware of the full organizational implications
of this, I'm afraid.

Finally, the hard part would be the migration to the new version. My dream
solution would be to have the existing Scala API be entirely converted into
a Scala wrapper over the Java API. That way, migration would be pretty
minimal: add a dependency, change the imports for the Scala API, and we're
done. However, even starting from the existing flink4s project, that's
still quite a lot of work. So, more realistically, I'd settle for at least
a partial implementation. We would have some broken code that we could fix,
but at the very least I'd like the basic DataStream functions (process,
uid, name…) to be available.

Thanks for all the work that went into making Flink what it is!


Gaël Renoux - Lead R Engineer
E - gael.ren...@datadome.co
W - www.datadome.co



On Wed, Oct 5, 2022 at 9:30 AM Maciek Próchniak  wrote:

> Hi Martin,
>
> Could you please remind what was the conclusion of discussion on upgrading
> Scala to 2.12.15/16?
> https://lists.apache.org/thread/hwksnsqyg7n3djymo7m1s7loymxxbc3t - I
> couldn't find any follow-up vote?
>
> If it's acceptable to break binary compatibility by such an upgrade, then
> upgrading to JDK17 before 2.0 will be doable?
>
>
> thanks,
>
> maciek
>
>
> On 04.10.2022 18:21, Martijn Visser wrote:
>
> Hi Yaroslav,
>
> Thanks for the feedback, that's much appreciated! Regarding Java 17 as a
> prerequisite, we would have to break compatibility already since Scala
> 2.12.7 doesn't compile on Java 17 [1].
>
> Given that we can only remove Scala APIs with the next major Flink (2.0)
> version, would that still impact you a lot? I do imagine that if we get to
> a Flink 2.0 version there would be more breaking involved anyway. The
> biggest consequence of deprecating support for Scala in Flink 1.x would be
> that new APIs would only be available in Java, but since these don't exist
> yet there would be no refactoring involved. I can imagine that we might
> change something in an existing API, but that would have certain
> compatibility guarantees already (depending if it's
> Public/PublicEvolving/Experimental). If a change would happen there, I
> think it would be smaller refactoring.
>
> Best regards,
>
> Martijn
>
> [1] https://issues.apache.org/jira/browse/FLINK-25000
>
> On Tue, Oct 4, 2022 at 10:58 AM Yaroslav Tkachenko 
> wrote:
>
>> Hi Martijn,
>>
>> As a Scala user, this change would affect me a lot and I'm not looking
>> forward to rewriting my codebase, and it's not even a very large one :)
>>
>> I'd like to suggest supporting Java 17 as a prerequisite (
>> https://issues.apache.org/jira/browse/FLINK-15736). Things like switch
>> expressions and records could simplify the migration quite a bit. Would you
>> consider adding it to the FLIP?
>>
>> On Tue, Oct 4, 2022 at 10:50 AM Jing Ge  wrote:
>>
>>> Hi Martijn,
>>>
>>> Thanks for bringing this up. It is generally a great idea, so +1.
>>>
>>> Since both scala extension projects mentioned in the FLIP are still very
>>> young and I don't think they will attract more scala developers as Flink
>>> could just because they are external projects. It will be a big issue for
>>> users who have to rewrite their large codebases. Those users should be
>>> aware of the effort from now on and 

How to rebalance a Flink streaming table?

2022-10-05 Thread Pavel Penkov
I have a table that reads a Kafka topic and effective parallelism is equal
to the number of Kafka partitions. Is there a way to reshuffle the data
like with DataStream API to increase effective parallelism?


Old s3 files referenced in sink's state after migration from 1.14 to 1.15

2022-10-05 Thread Vararu, Vadim
Hi all,

We have some jobs that write parquet files in s3, bucketing by processing time 
in a structure like /year/month/day/hour.

On 13th of September, we have migrated our Flink runtime 1.14.5 to 1.15.2 and 
now we have some jobs crashing at checkpointing because of being unable to find 
some s3 files from the 13th of September (those being removed by retention 
policy).

Being unable to explain why would it try to access 2-3 weeks old files, I 
looked into a couple of checkpoint/savepoint files and found there old files 
being referenced along with the current ones. The only association I could make 
is the migration from 1.14 to 1.15 performed on 13th of September. Have no idea 
how those files got stuck and passed from one checkpoint to another.

bucket-states�s3p://flink-state/prod/imp/landslide-eu-west-1/checkpoints/eb0ca029a6a45006216b7df464a9c44a/chk-255210/ae1d9611-4118-4068-92de-315b90ad733f�writer_raw_states
 
OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshotstreaming_committer_raw_states
 
OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typ2022/10/05/09�s3a://
 
raw-data-prod/default/imp/v3/ds-meru-prod-kinesis-eu-west-1-20211101-v3-ad-impression/2022/10/05/09���&�%,v\�,�<��2v��default/imp/v3/ds-meru-prod-kinesis-eu-west-1-20211101-v3-ad-impression/2022/09/13/12/part-bdc414ff-a05f-4d0d-8c7e-0e0a62c00588-c13ff983-d266-4c7b-b13e-cb22f6681e5d-19.gz.parquet�3m.FuN.xuBx4BpyfPXavcgdXn.PtySRyjFI7rkfGJ60EfD2Pn3eOzpXtWppSicdEre1SzGh2brRGPGdtNrMVr85jDKoTM98qTDaU7Y9gm0AavlRN152MxtJABGzR.alZ_YH9WlEUGOM1xUv96j4CCla25fIfguHx83QVpapN2iQ-

In the checkpoint snippet above, observe a normal file, processed on 5th of 
October and a stuck one from 13th of September.

Any idea why would the sink keep old written files in the state and pass them 
from checkpoint to checkpoint? Is that a bug or a migration issue between 1.14 
and 1.15?




Flink FaultTolerant at operator level

2022-10-05 Thread Great Info
I have flink job and the current flow looks like below

Source_Kafka->*operator-1*(key by partition)->*operator-2*(enrich the
record)-*Sink1-Operator* & *Sink2-Operator *

With this flow the current problem is at operator-2, the core logic runs
here is to lookup some reference status data from redis cache and enrich
the stream, this works fine if job runs well but recently I saw if job
failed at this operator or sink operators, entire jobs gets restarts and
stream gets repossessed from source, that causes different
reference status(if reference status in cache changes during this restart)
in enrichment, as per the business requirement I need to enrich with
reference status when stream received at my job.

1. Is there any way to just reprocess sink1,sink2 operators?
2. How to just resume Sink2 during some cases like Sink-1 was
successful where Sink2 failed


RE: Re:Question about Flink Broadcast State event ordering

2022-10-05 Thread Qing Lim
Oh, thank you for your explanation!

From: 仙路尽头谁为峰 
Sent: 05 October 2022 09:13
To: Qing Lim 
Cc: User 
Subject: 回复: Re:Question about Flink Broadcast State event ordering

Hi Qing:
  The key point is that the broadcast side may have different partitions that 
interleaves. If you can make sure those messages you want to be ordered go into 
the same partition, then I think the order can be reserved.

Best regards!
从 Windows 版邮件发送

发件人: Qing Lim
发送时间: 2022年10月5日 15:16
收件人: xljtswf2022
抄送: User
主题: RE: Re:Question about Flink Broadcast State event ordering

Hi, thanks for answering my question.

Is there anyway to make the order reflecting the upstream? I wish to broadcast 
messages that has deletion semantic, so ordering matters here.
I guess worst case I can use some logical timestamp to reason about order at 
downstream.

From: xljtswf2022 mailto:xljtswf2...@163.com>>
Sent: 05 October 2022 03:02
To: Qing Lim mailto:q@mwam.com>>
Cc: User mailto:user@flink.apache.org>>
Subject: Re:Question about Flink Broadcast State event ordering

Hi Qing:
> I think this is refering to the order between broadcasted element and non 
> broadcasted element, right?
  No, as broadcast and nonbroadcast stream are different streams, they will 
usually transfer with different tcp connection, we can not control the order of 
elements in different connections.
> The broadcasted element should arrive in the same order across all tasks, 
> right?
no. image the broadcast stream has 2 partitions ,say p1 and p2. and each 
partition has elements with index 1, 2, 3
then one downstream task may see the broadcast stream p1-1, p1-2 . p2-1, 
p2-2...
and another will see p1-1, p2-1,p1-2,p2-2
ps: for elements usually come in bulk, the index is just for explanation.

Best regards!



At 2022-10-04 21:54:23, "Qing Lim" mailto:q@mwam.com>> 
wrote:
Hi Flink user group,

I have a question around broadcast.

Reading the docs 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#important-considerations,
 it says the following:

> Order of events in Broadcast State may differ across tasks: Although 
> broadcasting the elements of a stream guarantees that all elements will 
> (eventually) go to all downstream tasks, elements may arrive in a different 
> order to each task. So the state updates for each incoming element MUST NOT 
> depend on the ordering of the incoming events.

I think this is refering to the order between broadcasted element and non 
broadcasted element, right?
The broadcasted element should arrive in the same order across all tasks, right?

For example, given a broadcasted stream A, and a non-broadcasted stream B

When joining A and B, elements from A should always reach all tasks in the same 
order right? Its just the interleaving of A and B that might differ across 
tasks, did I understand it correctly? I wasn’t sure because its not clear to me 
by just reading the doc, happy to update the doc once its clarified here.

Kind regards.



This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. If you are 
not the intended recipient of this e-mail you are hereby notified that any 
dissemination, distribution, or copying of its content is strictly prohibited. 
If you have received this message in error, please notify the sender by return 
e-mail and destroy the message and all copies in your possession.

To find out more details about how we may collect, use and share your personal 
information, please see https://www.mwam.com/privacy-policy. This includes 
details of how calls you make to us may be recorded in order for us to comply 
with our legal and regulatory obligations.

To the extent that the contents of this email constitutes a financial 
promotion, please note that it is issued only to and/or directed only at 
persons who are professional clients or eligible counterparties as defined in 
the FCA Rules. Any investment products or services described in this email are 
available only to professional clients and eligible counterparties. Persons who 
are not professional clients or eligible counterparties should not rely or act 
on the contents of this email.

Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated or related person of 
Marshall Wace North America L.P. ("MWNA"), which is 

回复: Re:Question about Flink Broadcast State event ordering

2022-10-05 Thread 仙路尽头谁为峰
Hi Qing:
  The key point is that the broadcast side may have different partitions that 
interleaves. If you can make sure those messages you want to be ordered go into 
the same partition, then I think the order can be reserved. 

Best regards!
从 Windows 版邮件发送

发件人: Qing Lim
发送时间: 2022年10月5日 15:16
收件人: xljtswf2022
抄送: User
主题: RE: Re:Question about Flink Broadcast State event ordering

Hi, thanks for answering my question.

Is there anyway to make the order reflecting the upstream? I wish to broadcast 
messages that has deletion semantic, so ordering matters here.
I guess worst case I can use some logical timestamp to reason about order at 
downstream.

From: xljtswf2022  
Sent: 05 October 2022 03:02
To: Qing Lim 
Cc: User 
Subject: Re:Question about Flink Broadcast State event ordering

Hi Qing:
> I think this is refering to the order between broadcasted element and non 
> broadcasted element, right?
  No, as broadcast and nonbroadcast stream are different streams, they will 
usually transfer with different tcp connection, we can not control the order of 
elements in different connections.
> The broadcasted element should arrive in the same order across all tasks, 
> right?
no. image the broadcast stream has 2 partitions ,say p1 and p2. and each 
partition has elements with index 1, 2, 3
then one downstream task may see the broadcast stream p1-1, p1-2 . p2-1, 
p2-2...
and another will see p1-1, p2-1,p1-2,p2-2
ps: for elements usually come in bulk, the index is just for explanation.

Best regards!

At 2022-10-04 21:54:23, "Qing Lim"  wrote:
Hi Flink user group,
 
I have a question around broadcast.
 
Reading the docs 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#important-considerations,
 it says the following:
 
> Order of events in Broadcast State may differ across tasks: Although 
> broadcasting the elements of a stream guarantees that all elements will 
> (eventually) go to all downstream tasks, elements may arrive in a different 
> order to each task. So the state updates for each incoming element MUST NOT 
> depend on the ordering of the incoming events.
 
I think this is refering to the order between broadcasted element and non 
broadcasted element, right? 
The broadcasted element should arrive in the same order across all tasks, right?
 
For example, given a broadcasted stream A, and a non-broadcasted stream B
 
When joining A and B, elements from A should always reach all tasks in the same 
order right? Its just the interleaving of A and B that might differ across 
tasks, did I understand it correctly? I wasn’t sure because its not clear to me 
by just reading the doc, happy to update the doc once its clarified here.
 
Kind regards. 
 
This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. If you are 
not the intended recipient of this e-mail you are hereby notified that any 
dissemination, distribution, or copying of its content is strictly prohibited. 
If you have received this message in error, please notify the sender by return 
e-mail and destroy the message and all copies in your possession.

To find out more details about how we may collect, use and share your personal 
information, please see https://www.mwam.com/privacy-policy. This includes 
details of how calls you make to us may be recorded in order for us to comply 
with our legal and regulatory obligations.

To the extent that the contents of this email constitutes a financial 
promotion, please note that it is issued only to and/or directed only at 
persons who are professional clients or eligible counterparties as defined in 
the FCA Rules. Any investment products or services described in this email are 
available only to professional clients and eligible counterparties. Persons who 
are not professional clients or eligible counterparties should not rely or act 
on the contents of this email.

Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated or related person of 
Marshall Wace North America L.P. ("MWNA"), which is registered with the US 
Securities and Exchange Commission ("SEC") as an investment adviser.  
Registration with the SEC does not imply that MWNA or its employees possess a 
certain level of skill or training.



Re:如何处理Flink KafkaSource的异常的数据

2022-10-05 Thread RS
Hi,


当前的SQL是不支持的,需要的话,可以自己实现一个connector或者UDF,把错误数据输出到其他地方


Thanks




在 2022-09-29 10:02:34,"Summer"  写道:
>
>你好,我想问一下,如果来源于Kakfka的一条数据出现错误,会导致任务执行失败,日志抛出该条错误数据。
>
>
>为保证任务执行,需要在*** WITH内加'value.json.ignore-parse-errors' = 'true',  
>'value.json.fail-on-missing-field' = 'false'
>
>
>
>
>那么之后如果出现异常的数据,我应该怎么感知到呢??
>
>
>
>
>
>
>
>


Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-05 Thread Maciek Próchniak

Hi Martin,

Could you please remind what was the conclusion of discussion on 
upgrading Scala to 2.12.15/16? 
https://lists.apache.org/thread/hwksnsqyg7n3djymo7m1s7loymxxbc3t - I 
couldn't find any follow-up vote?


If it's acceptable to break binary compatibility by such an upgrade, 
then upgrading to JDK17 before 2.0 will be doable?



thanks,

maciek


On 04.10.2022 18:21, Martijn Visser wrote:

Hi Yaroslav,

Thanks for the feedback, that's much appreciated! Regarding Java 17 as 
a prerequisite, we would have to break compatibility already since 
Scala 2.12.7 doesn't compile on Java 17 [1].


Given that we can only remove Scala APIs with the next major Flink 
(2.0) version, would that still impact you a lot? I do imagine that if 
we get to a Flink 2.0 version there would be more breaking involved 
anyway. The biggest consequence of deprecating support for Scala in 
Flink 1.x would be that new APIs would only be available in Java, but 
since these don't exist yet there would be no refactoring involved. I 
can imagine that we might change something in an existing API, but 
that would have certain compatibility guarantees already (depending if 
it's Public/PublicEvolving/Experimental). If a change would happen 
there, I think it would be smaller refactoring.


Best regards,

Martijn

[1] https://issues.apache.org/jira/browse/FLINK-25000

On Tue, Oct 4, 2022 at 10:58 AM Yaroslav Tkachenko 
 wrote:


Hi Martijn,

As a Scala user, this change would affect me a lot and I'm not
looking forward to rewriting my codebase, and it's not even a very
large one :)

I'd like to suggest supporting Java 17 as a prerequisite
(https://issues.apache.org/jira/browse/FLINK-15736). Things like
switch expressions and records could simplify the migration
quite a bit. Would you consider adding it to the FLIP?

On Tue, Oct 4, 2022 at 10:50 AM Jing Ge  wrote:

Hi Martijn,

Thanks for bringing this up. It is generally a great idea, so +1.

Since both scala extension projects mentioned in the FLIP are
still very young and I don't think they will attract more
scala developers as Flink could just because they are external
projects. It will be a big issue for users who have to rewrite
their large codebases. Those users should be aware of the
effort from now on and would better not count on those scala
extension projects and prepare their migration plan
before Flink 2.0.

Best regards,
Jing


On Tue, Oct 4, 2022 at 1:59 PM Martijn Visser
 wrote:

Hi Marton,

You're making a good point, I originally wanted to include
already the User mailing list to get their feedback but
forgot to do so. I'll do some more outreach via other
channels as well.

@Users of Flink, I've made a proposal to deprecate and
remove Scala API support in a future version of Flink.
Your feedback on this topic is very much appreciated.

Regarding the large Scala codebase for Flink, a potential
alternative could be to have a wrapper for all Java APIs
that makes them available as Scala APIs. However, this
still requires Scala maintainers and I don't think that we
currently have those in our community. The easiest
solution for them would be to use the Java APIs directly.
Yes it would involve work, but we won't actually be able
to remove the Scala APIs until Flink 2.0 so there's still
time for that :)

Best regards,

Martijn

On Tue, Oct 4, 2022 at 1:26 AM Márton Balassi
 wrote:

Hi Martjin,

Thanks for compiling the FLIP. I agree with the
sentiment that Scala poses
considerable maintenance overhead and key improvements
(like 2.13 or 2.12.8
supports) are hanging stale. With that said before we
make this move we
should attempt to understand the userbase affected.
A quick Slack and user mailing list search does return
quite a bit of
results for scala (admittedly a cursory look at them
suggest that many of
them have to do with missing features in Scala that
exist in Java or Scala
versions). I would love to see some polls on this
topic, we could also use
the Flink twitter handle to ask the community about this.

I am aware of users having large existing Scala
codebases for Flink. This
move would pose a very large effort on them, as they
would need to rewrite
much of their existing code. What are the alternatives
in your 

RE: Re:Question about Flink Broadcast State event ordering

2022-10-05 Thread Qing Lim
Hi, thanks for answering my question.

Is there anyway to make the order reflecting the upstream? I wish to broadcast 
messages that has deletion semantic, so ordering matters here.
I guess worst case I can use some logical timestamp to reason about order at 
downstream.

From: xljtswf2022 
Sent: 05 October 2022 03:02
To: Qing Lim 
Cc: User 
Subject: Re:Question about Flink Broadcast State event ordering

Hi Qing:
> I think this is refering to the order between broadcasted element and non 
> broadcasted element, right?
  No, as broadcast and nonbroadcast stream are different streams, they will 
usually transfer with different tcp connection, we can not control the order of 
elements in different connections.
> The broadcasted element should arrive in the same order across all tasks, 
> right?
no. image the broadcast stream has 2 partitions ,say p1 and p2. and each 
partition has elements with index 1, 2, 3
then one downstream task may see the broadcast stream p1-1, p1-2 . p2-1, 
p2-2...
and another will see p1-1, p2-1,p1-2,p2-2
ps: for elements usually come in bulk, the index is just for explanation.

Best regards!



At 2022-10-04 21:54:23, "Qing Lim" mailto:q@mwam.com>> 
wrote:
Hi Flink user group,

I have a question around broadcast.

Reading the docs 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#important-considerations,
 it says the following:

> Order of events in Broadcast State may differ across tasks: Although 
> broadcasting the elements of a stream guarantees that all elements will 
> (eventually) go to all downstream tasks, elements may arrive in a different 
> order to each task. So the state updates for each incoming element MUST NOT 
> depend on the ordering of the incoming events.

I think this is refering to the order between broadcasted element and non 
broadcasted element, right?
The broadcasted element should arrive in the same order across all tasks, right?

For example, given a broadcasted stream A, and a non-broadcasted stream B

When joining A and B, elements from A should always reach all tasks in the same 
order right? Its just the interleaving of A and B that might differ across 
tasks, did I understand it correctly? I wasn't sure because its not clear to me 
by just reading the doc, happy to update the doc once its clarified here.

Kind regards.



This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. If you are 
not the intended recipient of this e-mail you are hereby notified that any 
dissemination, distribution, or copying of its content is strictly prohibited. 
If you have received this message in error, please notify the sender by return 
e-mail and destroy the message and all copies in your possession.

To find out more details about how we may collect, use and share your personal 
information, please see https://www.mwam.com/privacy-policy. This includes 
details of how calls you make to us may be recorded in order for us to comply 
with our legal and regulatory obligations.

To the extent that the contents of this email constitutes a financial 
promotion, please note that it is issued only to and/or directed only at 
persons who are professional clients or eligible counterparties as defined in 
the FCA Rules. Any investment products or services described in this email are 
available only to professional clients and eligible counterparties. Persons who 
are not professional clients or eligible counterparties should not rely or act 
on the contents of this email.

Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated or related person of 
Marshall Wace North America L.P. ("MWNA"), which is registered with the US 
Securities and Exchange Commission ("SEC") as an investment adviser.  
Registration with the SEC does not imply that MWNA or its employees possess a 
certain level of skill or training.