Re: Can temporal table functions only be registered using the table API?

2022-10-04 Thread Salva Alcántara
I've found more examples here:

https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance

where a fact table is enriched using several dimension tables, but again
the temporal table functions are registered using Table API like so:

```java
tEnv.registerFunction(
"dimension_table1",
tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
"id"));```

It's not exactly the same application, since this example covers a lookup
join, but the SQL query is also relying on the LATERAL TABLE + temporal
table functions:

```
SELECT
D1.col1 AS A,
D1.col2 AS B,
FROM
fact_table,
LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
WHERE
fact_table.dim1 = D1.id
```

In particular, this produces a job which is equivalent to

```
  private abstract static class AbstractFactDimTableJoin
  extends CoProcessFunction {
private static final long serialVersionUID = 1L;

protected transient ValueState dimState;

@Override
public void processElement1(IN1 value, Context ctx, Collector out)
throws Exception {
  Dimension dim = dimState.value();
  if (dim == null) {
return;
  }
  out.collect(join(value, dim));
}

abstract OUT join(IN1 value, Dimension dim);

@Override
public void processElement2(Dimension value, Context ctx,
Collector out) throws Exception {
  dimState.update(value);
}

@Override
public void open(Configuration parameters) throws Exception {
  super.open(parameters);
  ValueStateDescriptor dimStateDesc =
  new ValueStateDescriptor<>("dimstate", Dimension.class);
  this.dimState = getRuntimeContext().getState(dimStateDesc);
}
  }
```

I'm basically interested in rewriting these types of DIY joins (based on
CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
possible, otherwise I would like to know which limitations there are.

Regards,

Salva

On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara 
wrote:

> By looking at the docs for older versions of Flink, e.g.,
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>
> it seems that it's possible to rewrite this query
>
> ```
> SELECT
>   o.amount * r.rate AS amount
> FROM
>   Orders AS o,
>   LATERAL TABLE (Rates(o.rowtime)) AS r
> WHERE r.currency = o.currency
> ```
>
> as
>
> ```
> SELECT
>   SUM(o.amount * r.rate) AS amount
> FROM Orders AS o,
>   RatesHistory AS r
> WHERE r.currency = o.currency
> AND r.rowtime = (
>   SELECT MAX(rowtime)
>   FROM RatesHistory AS r2
>   WHERE r2.currency = o.currency
>   AND r2.rowtime <= o.rowtime);
> ```
>
> This would be a way to accomplish this task in SQL without using a
> temporal table function.
>
> Would this rewrite be equivalent in terms of the final generated job?
> Obviously I very much prefer the LATERAL TABLE query but this requires
> using a temporal table function which can only be registered using the
> Table API (apparently).
>
> Regards,
>
> Salva
>
> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara 
> wrote:
>
>> It doesn't seem the case with processing time unless I'm mistaken:
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>>
>> This case seems to require a different syntax based on LATERAL TABLE and
>> a temporal table function (FOR SYSTEM_TIME is not supported). From the docs
>> too, it seems that temporal table functions can only be registered via the
>> table API. Am I missing/misunderstanding something?
>>
>> Salva
>>
>> On Tue, Oct 4, 2022, 19:26 Martijn Visser 
>> wrote:
>>
>>> Hi Salva,
>>>
>>> The examples for temporal table joins can be found at
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>>> Your example is definitely possible with just using SQL.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara 
>>> wrote:
>>>
 Based on this:


 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/

 It seems that the only way of registering temporal table functions is
 via the Table API.

 If that is the case, is there a way to make this example work

 ```
 SELECT
   SUM(amount * rate) AS amount
 FROM
   orders,
   LATERAL TABLE (rates(order_time))
 WHERE
   rates.currency = orders.currency
 ```

 without the Table API, just using SQL? E.g., is it possible to deploy
 the temporal table function to the cluster (by packaging it in a jar file)
 and then run the above query from the Flink SQL CLI?

 Thanks in advance,

 Salva




Re: ClassNotFoundException when loading protobuf message class in Flink SQL

2022-10-04 Thread Benchao Li
Hi James,

Your steps seem right. Could you check your jar file
'~/repos/simple_protobuf/SimpleTest/SimpleTest.jar'
that it does contain 'com.example.SimpleTest.class'?

Besides that, to use Kafka connector in sql-client, you should use
'flink-sql-connector-kafka' instead of
'flink-connector-kafka'.


James McGuire via user  于2022年10月5日周三 07:21写道:

> Hi Flink Community,
> I am trying to prove out the new protobuf functionality added to 1.16
> ([1]).  I have built master locally and have attempted following the
> Protobuf Format doc ([2]) to create a table with the kafka connector using
> the protobuf format.
>
> I compiled the sample .proto file using protoc version 3.2.0, compiled the
> .java output files using javac, linking to protobuf-java-3.5.1.jar (using
> earlier versions gives me compiler errors about UnusedPrivateParameter) and
> packaged the resulting class files into SimpleTest.jar.
>
> However, when I try to select the table, I get the following error:
> % ./sql-client.sh --jar ~/repos/simple_protobuf/SimpleTest/SimpleTest.jar
> --jar
> ~/repos/flink/flink-connectors/flink-connector-kafka/target/flink-connector-kafka-1.17-SNAPSHOT.jar
> --jar
> ~/repos/flink/flink-formats/flink-sql-protobuf/target/flink-sql-protobuf-1.17-SNAPSHOT.jar
> Flink SQL> CREATE TABLE simple_test (
> >   uid BIGINT,
> >   name STRING,
> >   category_type INT,
> >   content BINARY,
> >   price DOUBLE,
> >   value_map map>,
> >   value_arr array>,
> >   corpus_int INT,
> >   corpus_str STRING
> > ) WITH (
> >  'connector' = 'kafka',
> >  'topic' = 'user_behavior',
> >  'properties.bootstrap.servers' = 'localhost:9092',
> >  'properties.group.id' = 'testGroup',
> >  'format' = 'protobuf',
> >  'protobuf.message-class-name' = 'com.example.SimpleTest',
> >  'protobuf.ignore-parse-errors' = 'true'
> > )
> > ;
> [INFO] Execute statement succeed.
>
> Flink SQL> select * from simple_test;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: com.example.SimpleTest
>
> Flink SQL>
>
> Any advice greatly appreciated, thank you.
>
> [1]
> https://github.com/apache/flink/commit/5c87b69b5300e8678629aa8b769d60ec2fdbf3d1
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/protobuf/
>


-- 

Best,
Benchao Li


Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-04 Thread Yaroslav Tkachenko
Hey Martijn,

Well, as a user I think that Scala API still adds a tremendous value, with
all its issues. But I'm not a committer and I don't know what effort it
takes to keep maintaining it... so I prepare for the worst :)

Regarding the proposed timeline, I don't know all the specifics around
breaking compatibility changes, so it's hard for me to say, but I'd love to
see Java 17 before Scala is fully removed from the project.




On Tue, Oct 4, 2022 at 3:07 PM Martijn Visser 
wrote:

> Hi Yaroslav,
>
> If I could summarize your suggestion, would it mean that you would only be
> in favour of dropping Scala API support if we introduce Java 17 support
> exactly at the same time (say Flink 2.0). I was first thinking that an
> alternative would be to have a Flink 2.0 which supports Java 17 while
> keeping the Scala APIs (and postpone dropping those in Flink 3.0), but that
> wouldn't make much sense since we would have to break savepoint
> compatibility for Scala users then anyway.
>
> However, since we're already talking about when/how the APIs could be
> dropped, does that mean that you agree with the idea to deprecate and
> remove the Scala APIs?
>
> Best regards,
>
> Martijn
>
>
> On Tue, Oct 4, 2022 at 2:41 PM Yaroslav Tkachenko 
> wrote:
>
>> Hi Martijn,
>>
>> The 2.0 argument makes sense (I agree it's easier to introduce more
>> breaking changes in one major release), but I think my comment about Java
>> 17 also makes sense in this case: 1) easier to introduce because breaking
>> changes are possible 2) you'd need to give some syntax sugar as an
>> alternative after removing Scala.
>>
>> On Tue, Oct 4, 2022 at 11:21 AM Martijn Visser 
>> wrote:
>>
>>> Hi Yaroslav,
>>>
>>> Thanks for the feedback, that's much appreciated! Regarding Java 17 as a
>>> prerequisite, we would have to break compatibility already since Scala
>>> 2.12.7 doesn't compile on Java 17 [1].
>>>
>>> Given that we can only remove Scala APIs with the next major Flink (2.0)
>>> version, would that still impact you a lot? I do imagine that if we get to
>>> a Flink 2.0 version there would be more breaking involved anyway. The
>>> biggest consequence of deprecating support for Scala in Flink 1.x would be
>>> that new APIs would only be available in Java, but since these don't exist
>>> yet there would be no refactoring involved. I can imagine that we might
>>> change something in an existing API, but that would have certain
>>> compatibility guarantees already (depending if it's
>>> Public/PublicEvolving/Experimental). If a change would happen there, I
>>> think it would be smaller refactoring.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-25000
>>>
>>> On Tue, Oct 4, 2022 at 10:58 AM Yaroslav Tkachenko 
>>> wrote:
>>>
 Hi Martijn,

 As a Scala user, this change would affect me a lot and I'm not looking
 forward to rewriting my codebase, and it's not even a very large one :)

 I'd like to suggest supporting Java 17 as a prerequisite (
 https://issues.apache.org/jira/browse/FLINK-15736). Things like switch
 expressions and records could simplify the migration quite a bit. Would you
 consider adding it to the FLIP?

 On Tue, Oct 4, 2022 at 10:50 AM Jing Ge  wrote:

> Hi Martijn,
>
> Thanks for bringing this up. It is generally a great idea, so +1.
>
> Since both scala extension projects mentioned in the FLIP are still
> very young and I don't think they will attract more scala developers as
> Flink could just because they are external projects. It will be a big 
> issue
> for users who have to rewrite their large codebases. Those users should be
> aware of the effort from now on and would better not count on those scala
> extension projects and prepare their migration plan before Flink 2.0.
>
> Best regards,
> Jing
>
>
> On Tue, Oct 4, 2022 at 1:59 PM Martijn Visser <
> martijnvis...@apache.org> wrote:
>
>> Hi Marton,
>>
>> You're making a good point, I originally wanted to include already
>> the User mailing list to get their feedback but forgot to do so. I'll do
>> some more outreach via other channels as well.
>>
>> @Users of Flink, I've made a proposal to deprecate and remove Scala
>> API support in a future version of Flink. Your feedback on this topic is
>> very much appreciated.
>>
>> Regarding the large Scala codebase for Flink, a potential alternative
>> could be to have a wrapper for all Java APIs that makes them available as
>> Scala APIs. However, this still requires Scala maintainers and I don't
>> think that we currently have those in our community. The easiest solution
>> for them would be to use the Java APIs directly. Yes it would involve 
>> work,
>> but we won't actually be able to remove the Scala APIs until Flink 2.0 so
>> there's still time for that :)
>>
>> Best regards,

Re:Question about Flink Broadcast State event ordering

2022-10-04 Thread xljtswf2022
Hi Qing:
> I think this is refering to the order between broadcasted element and non 
> broadcasted element, right?
  No, as broadcast and nonbroadcast stream are different streams, they will 
usually transfer with different tcp connection, we can not control the order of 
elements in different connections.
> The broadcasted element should arrive in the same order across all tasks, 
> right?
no. image the broadcast stream has 2 partitions ,say p1 and p2. and each 
partition has elements with index 1, 2, 3
then one downstream task may see the broadcast stream p1-1, p1-2 . p2-1, 
p2-2...
and another will see p1-1, p2-1,p1-2,p2-2
ps: for elements usually come in bulk, the index is just for explanation.


Best regards!




At 2022-10-04 21:54:23, "Qing Lim"  wrote:

Hi Flink user group,

 

I have a question around broadcast.

 

Reading the docs 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#important-considerations,
 it says the following:

 

> Order of events in Broadcast State may differ across tasks: Although 
> broadcasting the elements of a stream guarantees that all elements will 
> (eventually) go to all downstream tasks, elements may arrive in a different 
> order to each task. So the state updates for each incoming element MUST NOT 
> depend on the ordering of the incoming events.

 

I think this is refering to the order between broadcasted element and non 
broadcasted element, right?

The broadcasted element should arrive in the same order across all tasks, right?

 

For example, given a broadcasted stream A, and a non-broadcasted stream B

 

When joining A and B, elements from A should always reach all tasks in the same 
order right? Its just the interleaving of A and B that might differ across 
tasks, did I understand it correctly? I wasn’t sure because its not clear to me 
by just reading the doc, happy to update the doc once its clarified here.

 

Kind regards.

 

This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. If you are 
not the intended recipient of this e-mail you are hereby notified that any 
dissemination, distribution, or copying of its content is strictly prohibited. 
If you have received this message in error, please notify the sender by return 
e-mail and destroy the message and all copies in your possession.


To find out more details about how we may collect, use and share your personal 
information, please see https://www.mwam.com/privacy-policy. This includes 
details of how calls you make to us may be recorded in order for us to comply 
with our legal and regulatory obligations.


To the extent that the contents of this email constitutes a financial 
promotion, please note that it is issued only to and/or directed only at 
persons who are professional clients or eligible counterparties as defined in 
the FCA Rules. Any investment products or services described in this email are 
available only to professional clients and eligible counterparties. Persons who 
are not professional clients or eligible counterparties should not rely or act 
on the contents of this email.


Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated or related person of 
Marshall Wace North America L.P. ("MWNA"), which is registered with the US 
Securities and Exchange Commission ("SEC") as an investment adviser.  
Registration with the SEC does not imply that MWNA or its employees possess a 
certain level of skill or training.

ClassNotFoundException when loading protobuf message class in Flink SQL

2022-10-04 Thread James McGuire via user
Hi Flink Community,
I am trying to prove out the new protobuf functionality added to 1.16
([1]).  I have built master locally and have attempted following the
Protobuf Format doc ([2]) to create a table with the kafka connector using
the protobuf format.

I compiled the sample .proto file using protoc version 3.2.0, compiled the
.java output files using javac, linking to protobuf-java-3.5.1.jar (using
earlier versions gives me compiler errors about UnusedPrivateParameter) and
packaged the resulting class files into SimpleTest.jar.

However, when I try to select the table, I get the following error:
% ./sql-client.sh --jar ~/repos/simple_protobuf/SimpleTest/SimpleTest.jar
--jar
~/repos/flink/flink-connectors/flink-connector-kafka/target/flink-connector-kafka-1.17-SNAPSHOT.jar
--jar
~/repos/flink/flink-formats/flink-sql-protobuf/target/flink-sql-protobuf-1.17-SNAPSHOT.jar
Flink SQL> CREATE TABLE simple_test (
>   uid BIGINT,
>   name STRING,
>   category_type INT,
>   content BINARY,
>   price DOUBLE,
>   value_map map>,
>   value_arr array>,
>   corpus_int INT,
>   corpus_str STRING
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'protobuf',
>  'protobuf.message-class-name' = 'com.example.SimpleTest',
>  'protobuf.ignore-parse-errors' = 'true'
> )
> ;
[INFO] Execute statement succeed.

Flink SQL> select * from simple_test;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: com.example.SimpleTest

Flink SQL>

Any advice greatly appreciated, thank you.

[1]
https://github.com/apache/flink/commit/5c87b69b5300e8678629aa8b769d60ec2fdbf3d1
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/protobuf/


Re: Flink KafkaSource still referencing deleted topic

2022-10-04 Thread Martijn Visser
Hi Mason,

Definitely! Feel free to open a PR and ping me for a review.

Cheers, Martijn

On Tue, Oct 4, 2022 at 3:51 PM Mason Chen  wrote:

> Hi Martjin,
>
> I notice that this question comes up quite often. Would this be a good
> addition to the KafkaSource documentation? I'd be happy to contribute to
> the documentation.
>
> Best,
> Mason
>
> On Tue, Oct 4, 2022 at 11:23 AM Martijn Visser 
> wrote:
>
>> Hi Robert,
>>
>> Based on
>> https://stackoverflow.com/questions/72870074/apache-flink-restoring-state-from-checkpoint-with-changes-kafka-topic
>> I think you'll need to change the UID for your KafkaSource and restart your
>> job with allowNonRestoredState enabled.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Tue, Oct 4, 2022 at 12:40 PM Robert Cullen 
>> wrote:
>>
>>> We've changed the KafkaSource to ingest from a new topic but the old
>>> name is still being referenced:
>>>
>>> 2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global failure 
>>> triggered by OperatorCoordinator for 'Source: Grokfailures' (operator 
>>> feca28aff5a3958840bee985ee7de4d3).at 
>>> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553)
>>>   at 
>>> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
>>>at 
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
>>>   at 
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298)
>>>  at 
>>> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
>>>at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>   at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>   at java.lang.Thread.run(Thread.java:748)Caused by: 
>>> org.apache.flink.util.FlinkRuntimeException: Failed to handle partition 
>>> splits change due to at 
>>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239)
>>>  at 
>>> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86)
>>>  at 
>>> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>>>... 3 moreCaused by: java.lang.RuntimeException: Failed to get topic 
>>> metadata.  at 
>>> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59)
>>>at 
>>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212)
>>>  at 
>>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158)
>>>   at 
>>> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
>>>  at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
>>>  at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>> at 
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>... 3 moreCaused by: java.util.concurrent.ExecutionException: 
>>> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
>>> server does not host this topic-partition.  at 
>>> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>>>   at 
>>> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>>> at 
>>> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>>> at 
>>> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>>>   at 
>>> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57)
>>>... 10 moreCaused by: 
>>> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
>>> server does not host this topic-partition.
>>>
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>


Re: Flink KafkaSource still referencing deleted topic

2022-10-04 Thread Mason Chen
Hi Martjin,

I notice that this question comes up quite often. Would this be a good
addition to the KafkaSource documentation? I'd be happy to contribute to
the documentation.

Best,
Mason

On Tue, Oct 4, 2022 at 11:23 AM Martijn Visser 
wrote:

> Hi Robert,
>
> Based on
> https://stackoverflow.com/questions/72870074/apache-flink-restoring-state-from-checkpoint-with-changes-kafka-topic
> I think you'll need to change the UID for your KafkaSource and restart your
> job with allowNonRestoredState enabled.
>
> Best regards,
>
> Martijn
>
> On Tue, Oct 4, 2022 at 12:40 PM Robert Cullen 
> wrote:
>
>> We've changed the KafkaSource to ingest from a new topic but the old name
>> is still being referenced:
>>
>> 2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global failure 
>> triggered by OperatorCoordinator for 'Source: Grokfailures' (operator 
>> feca28aff5a3958840bee985ee7de4d3). at 
>> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553)
>>   at 
>> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
>>at 
>> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
>>   at 
>> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298)
>>  at 
>> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
>>at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>   at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>   at java.lang.Thread.run(Thread.java:748)Caused by: 
>> org.apache.flink.util.FlinkRuntimeException: Failed to handle partition 
>> splits change due to at 
>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239)
>>  at 
>> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86)
>>  at 
>> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>>... 3 moreCaused by: java.lang.RuntimeException: Failed to get topic 
>> metadata.  at 
>> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59)
>>at 
>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212)
>>  at 
>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158)
>>   at 
>> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
>>  at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>... 3 moreCaused by: java.util.concurrent.ExecutionException: 
>> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
>> does not host this topic-partition.  at 
>> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>>   at 
>> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>> at 
>> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>> at 
>> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>>   at 
>> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57)
>>... 10 moreCaused by: 
>> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
>> does not host this topic-partition.
>>
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>


Support avro-confluent format for FileSystem SQL connector

2022-10-04 Thread liuxiangcao
Hi Flink community,

According to flink doc, avro-confluent([1]) is only supported for kafka sql
connector and upsert kafka sql connector.

I'm wondering if there is any reason this format is not supported for
Filesystem sql connector ([2])  ?

We are looking to use FileSystem sink to write to s3 in avro format and
would like to keep the sink to be in sync with avro schema registry. Is
there any gotchas that we should be aware in implementing avro-confluent
format for file system sink?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/avro-confluent/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/

-- 
Best Wishes & Regards
Shawn Xiangcao Liu


Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-04 Thread Martijn Visser
Hi Yaroslav,

If I could summarize your suggestion, would it mean that you would only be
in favour of dropping Scala API support if we introduce Java 17 support
exactly at the same time (say Flink 2.0). I was first thinking that an
alternative would be to have a Flink 2.0 which supports Java 17 while
keeping the Scala APIs (and postpone dropping those in Flink 3.0), but that
wouldn't make much sense since we would have to break savepoint
compatibility for Scala users then anyway.

However, since we're already talking about when/how the APIs could be
dropped, does that mean that you agree with the idea to deprecate and
remove the Scala APIs?

Best regards,

Martijn


On Tue, Oct 4, 2022 at 2:41 PM Yaroslav Tkachenko 
wrote:

> Hi Martijn,
>
> The 2.0 argument makes sense (I agree it's easier to introduce more
> breaking changes in one major release), but I think my comment about Java
> 17 also makes sense in this case: 1) easier to introduce because breaking
> changes are possible 2) you'd need to give some syntax sugar as an
> alternative after removing Scala.
>
> On Tue, Oct 4, 2022 at 11:21 AM Martijn Visser 
> wrote:
>
>> Hi Yaroslav,
>>
>> Thanks for the feedback, that's much appreciated! Regarding Java 17 as a
>> prerequisite, we would have to break compatibility already since Scala
>> 2.12.7 doesn't compile on Java 17 [1].
>>
>> Given that we can only remove Scala APIs with the next major Flink (2.0)
>> version, would that still impact you a lot? I do imagine that if we get to
>> a Flink 2.0 version there would be more breaking involved anyway. The
>> biggest consequence of deprecating support for Scala in Flink 1.x would be
>> that new APIs would only be available in Java, but since these don't exist
>> yet there would be no refactoring involved. I can imagine that we might
>> change something in an existing API, but that would have certain
>> compatibility guarantees already (depending if it's
>> Public/PublicEvolving/Experimental). If a change would happen there, I
>> think it would be smaller refactoring.
>>
>> Best regards,
>>
>> Martijn
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-25000
>>
>> On Tue, Oct 4, 2022 at 10:58 AM Yaroslav Tkachenko 
>> wrote:
>>
>>> Hi Martijn,
>>>
>>> As a Scala user, this change would affect me a lot and I'm not looking
>>> forward to rewriting my codebase, and it's not even a very large one :)
>>>
>>> I'd like to suggest supporting Java 17 as a prerequisite (
>>> https://issues.apache.org/jira/browse/FLINK-15736). Things like switch
>>> expressions and records could simplify the migration quite a bit. Would you
>>> consider adding it to the FLIP?
>>>
>>> On Tue, Oct 4, 2022 at 10:50 AM Jing Ge  wrote:
>>>
 Hi Martijn,

 Thanks for bringing this up. It is generally a great idea, so +1.

 Since both scala extension projects mentioned in the FLIP are still
 very young and I don't think they will attract more scala developers as
 Flink could just because they are external projects. It will be a big issue
 for users who have to rewrite their large codebases. Those users should be
 aware of the effort from now on and would better not count on those scala
 extension projects and prepare their migration plan before Flink 2.0.

 Best regards,
 Jing


 On Tue, Oct 4, 2022 at 1:59 PM Martijn Visser 
 wrote:

> Hi Marton,
>
> You're making a good point, I originally wanted to include already the
> User mailing list to get their feedback but forgot to do so. I'll do some
> more outreach via other channels as well.
>
> @Users of Flink, I've made a proposal to deprecate and remove Scala
> API support in a future version of Flink. Your feedback on this topic is
> very much appreciated.
>
> Regarding the large Scala codebase for Flink, a potential alternative
> could be to have a wrapper for all Java APIs that makes them available as
> Scala APIs. However, this still requires Scala maintainers and I don't
> think that we currently have those in our community. The easiest solution
> for them would be to use the Java APIs directly. Yes it would involve 
> work,
> but we won't actually be able to remove the Scala APIs until Flink 2.0 so
> there's still time for that :)
>
> Best regards,
>
> Martijn
>
> On Tue, Oct 4, 2022 at 1:26 AM Márton Balassi <
> balassi.mar...@gmail.com> wrote:
>
>> Hi Martjin,
>>
>> Thanks for compiling the FLIP. I agree with the sentiment that Scala
>> poses
>> considerable maintenance overhead and key improvements (like 2.13 or
>> 2.12.8
>> supports) are hanging stale. With that said before we make this move
>> we
>> should attempt to understand the userbase affected.
>> A quick Slack and user mailing list search does return quite a bit of
>> results for scala (admittedly a cursory look at them suggest that
>> many of
>> 

Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-04 Thread Yaroslav Tkachenko
Hi Martijn,

The 2.0 argument makes sense (I agree it's easier to introduce more
breaking changes in one major release), but I think my comment about Java
17 also makes sense in this case: 1) easier to introduce because breaking
changes are possible 2) you'd need to give some syntax sugar as an
alternative after removing Scala.

On Tue, Oct 4, 2022 at 11:21 AM Martijn Visser 
wrote:

> Hi Yaroslav,
>
> Thanks for the feedback, that's much appreciated! Regarding Java 17 as a
> prerequisite, we would have to break compatibility already since Scala
> 2.12.7 doesn't compile on Java 17 [1].
>
> Given that we can only remove Scala APIs with the next major Flink (2.0)
> version, would that still impact you a lot? I do imagine that if we get to
> a Flink 2.0 version there would be more breaking involved anyway. The
> biggest consequence of deprecating support for Scala in Flink 1.x would be
> that new APIs would only be available in Java, but since these don't exist
> yet there would be no refactoring involved. I can imagine that we might
> change something in an existing API, but that would have certain
> compatibility guarantees already (depending if it's
> Public/PublicEvolving/Experimental). If a change would happen there, I
> think it would be smaller refactoring.
>
> Best regards,
>
> Martijn
>
> [1] https://issues.apache.org/jira/browse/FLINK-25000
>
> On Tue, Oct 4, 2022 at 10:58 AM Yaroslav Tkachenko 
> wrote:
>
>> Hi Martijn,
>>
>> As a Scala user, this change would affect me a lot and I'm not looking
>> forward to rewriting my codebase, and it's not even a very large one :)
>>
>> I'd like to suggest supporting Java 17 as a prerequisite (
>> https://issues.apache.org/jira/browse/FLINK-15736). Things like switch
>> expressions and records could simplify the migration quite a bit. Would you
>> consider adding it to the FLIP?
>>
>> On Tue, Oct 4, 2022 at 10:50 AM Jing Ge  wrote:
>>
>>> Hi Martijn,
>>>
>>> Thanks for bringing this up. It is generally a great idea, so +1.
>>>
>>> Since both scala extension projects mentioned in the FLIP are still very
>>> young and I don't think they will attract more scala developers as Flink
>>> could just because they are external projects. It will be a big issue for
>>> users who have to rewrite their large codebases. Those users should be
>>> aware of the effort from now on and would better not count on those scala
>>> extension projects and prepare their migration plan before Flink 2.0.
>>>
>>> Best regards,
>>> Jing
>>>
>>>
>>> On Tue, Oct 4, 2022 at 1:59 PM Martijn Visser 
>>> wrote:
>>>
 Hi Marton,

 You're making a good point, I originally wanted to include already the
 User mailing list to get their feedback but forgot to do so. I'll do some
 more outreach via other channels as well.

 @Users of Flink, I've made a proposal to deprecate and remove Scala API
 support in a future version of Flink. Your feedback on this topic is very
 much appreciated.

 Regarding the large Scala codebase for Flink, a potential alternative
 could be to have a wrapper for all Java APIs that makes them available as
 Scala APIs. However, this still requires Scala maintainers and I don't
 think that we currently have those in our community. The easiest solution
 for them would be to use the Java APIs directly. Yes it would involve work,
 but we won't actually be able to remove the Scala APIs until Flink 2.0 so
 there's still time for that :)

 Best regards,

 Martijn

 On Tue, Oct 4, 2022 at 1:26 AM Márton Balassi 
 wrote:

> Hi Martjin,
>
> Thanks for compiling the FLIP. I agree with the sentiment that Scala
> poses
> considerable maintenance overhead and key improvements (like 2.13 or
> 2.12.8
> supports) are hanging stale. With that said before we make this move we
> should attempt to understand the userbase affected.
> A quick Slack and user mailing list search does return quite a bit of
> results for scala (admittedly a cursory look at them suggest that many
> of
> them have to do with missing features in Scala that exist in Java or
> Scala
> versions). I would love to see some polls on this topic, we could also
> use
> the Flink twitter handle to ask the community about this.
>
> I am aware of users having large existing Scala codebases for Flink.
> This
> move would pose a very large effort on them, as they would need to
> rewrite
> much of their existing code. What are the alternatives in your opinion,
> Martjin?
>
> On Tue, Oct 4, 2022 at 6:22 AM Martijn Visser <
> martijnvis...@apache.org>
> wrote:
>
> > Hi everyone,
> >
> > I would like to open a discussion thread on FLIP-265 Deprecate and
> remove
> > Scala API support. Please take a look at
> >
> >
> 

Re: Can temporal table functions only be registered using the table API?

2022-10-04 Thread Salva Alcántara
By looking at the docs for older versions of Flink, e.g.,

https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html

it seems that it's possible to rewrite this query

```
SELECT
  o.amount * r.rate AS amount
FROM
  Orders AS o,
  LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency
```

as

```
SELECT
  SUM(o.amount * r.rate) AS amount
FROM Orders AS o,
  RatesHistory AS r
WHERE r.currency = o.currency
AND r.rowtime = (
  SELECT MAX(rowtime)
  FROM RatesHistory AS r2
  WHERE r2.currency = o.currency
  AND r2.rowtime <= o.rowtime);
```

This would be a way to accomplish this task in SQL without using a temporal
table function.

Would this rewrite be equivalent in terms of the final generated job?
Obviously I very much prefer the LATERAL TABLE query but this requires
using a temporal table function which can only be registered using the
Table API (apparently).

Regards,

Salva

On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara 
wrote:

> It doesn't seem the case with processing time unless I'm mistaken:
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>
> This case seems to require a different syntax based on LATERAL TABLE and a
> temporal table function (FOR SYSTEM_TIME is not supported). From the docs
> too, it seems that temporal table functions can only be registered via the
> table API. Am I missing/misunderstanding something?
>
> Salva
>
> On Tue, Oct 4, 2022, 19:26 Martijn Visser 
> wrote:
>
>> Hi Salva,
>>
>> The examples for temporal table joins can be found at
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>> Your example is definitely possible with just using SQL.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara 
>> wrote:
>>
>>> Based on this:
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>>
>>> It seems that the only way of registering temporal table functions is
>>> via the Table API.
>>>
>>> If that is the case, is there a way to make this example work
>>>
>>> ```
>>> SELECT
>>>   SUM(amount * rate) AS amount
>>> FROM
>>>   orders,
>>>   LATERAL TABLE (rates(order_time))
>>> WHERE
>>>   rates.currency = orders.currency
>>> ```
>>>
>>> without the Table API, just using SQL? E.g., is it possible to deploy
>>> the temporal table function to the cluster (by packaging it in a jar file)
>>> and then run the above query from the Flink SQL CLI?
>>>
>>> Thanks in advance,
>>>
>>> Salva
>>>
>>>


Re: Can temporal table functions only be registered using the table API?

2022-10-04 Thread Salva Alcántara
It doesn't seem the case with processing time unless I'm mistaken:

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join

This case seems to require a different syntax based on LATERAL TABLE and a
temporal table function (FOR SYSTEM_TIME is not supported). From the docs
too, it seems that temporal table functions can only be registered via the
table API. Am I missing/misunderstanding something?

Salva

On Tue, Oct 4, 2022, 19:26 Martijn Visser  wrote:

> Hi Salva,
>
> The examples for temporal table joins can be found at
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
> Your example is definitely possible with just using SQL.
>
> Best regards,
>
> Martijn
>
> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara 
> wrote:
>
>> Based on this:
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>
>> It seems that the only way of registering temporal table functions is via
>> the Table API.
>>
>> If that is the case, is there a way to make this example work
>>
>> ```
>> SELECT
>>   SUM(amount * rate) AS amount
>> FROM
>>   orders,
>>   LATERAL TABLE (rates(order_time))
>> WHERE
>>   rates.currency = orders.currency
>> ```
>>
>> without the Table API, just using SQL? E.g., is it possible to deploy the
>> temporal table function to the cluster (by packaging it in a jar file) and
>> then run the above query from the Flink SQL CLI?
>>
>> Thanks in advance,
>>
>> Salva
>>
>>


Re: Flink KafkaSource still referencing deleted topic

2022-10-04 Thread Martijn Visser
Hi Robert,

Based on
https://stackoverflow.com/questions/72870074/apache-flink-restoring-state-from-checkpoint-with-changes-kafka-topic
I think you'll need to change the UID for your KafkaSource and restart your
job with allowNonRestoredState enabled.

Best regards,

Martijn

On Tue, Oct 4, 2022 at 12:40 PM Robert Cullen  wrote:

> We've changed the KafkaSource to ingest from a new topic but the old name
> is still being referenced:
>
> 2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global failure 
> triggered by OperatorCoordinator for 'Source: Grokfailures' (operator 
> feca28aff5a3958840bee985ee7de4d3).  at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553)
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
>at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298)
>  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)Caused by: 
> org.apache.flink.util.FlinkRuntimeException: Failed to handle partition 
> splits change due to at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239)
>  at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86)
>  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>... 3 moreCaused by: java.lang.RuntimeException: Failed to get topic 
> metadata.  at 
> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59)
>at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212)
>  at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158)
>   at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>... 3 moreCaused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>   at 
> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57)
>... 10 moreCaused by: 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
>
>
> --
> Robert Cullen
> 240-475-4490
>


Flink KafkaSource still referencing deleted topic

2022-10-04 Thread Robert Cullen
We've changed the KafkaSource to ingest from a new topic but the old name
is still being referenced:

2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global
failure triggered by OperatorCoordinator for 'Source: Grokfailures'
(operator feca28aff5a3958840bee985ee7de4d3).at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553)
 at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
  at
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
 at
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298)
at
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at
java.lang.Thread.run(Thread.java:748)Caused by:
org.apache.flink.util.FlinkRuntimeException: Failed to handle
partition splits change due to  at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239)
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86)
at
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
  ...
3 moreCaused by: java.lang.RuntimeException: Failed to get topic
metadata.   at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59)
   at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212)
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158)
 at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266)at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
   at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  ...
3 moreCaused by: java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This
server does not host this topic-partition.  at
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
 at
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
   at
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
   at
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) 
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57)
  ...
10 moreCaused by:
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This
server does not host this topic-partition.


-- 
Robert Cullen
240-475-4490


Re: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout

2022-10-04 Thread Martijn Visser
Hi Ori,

Thanks for reaching out! I do fear that there's not much that we can help
out with. As you mentioned, it looks like there's a network issue which
would be on the Google side of issues. I'm assuming that the mentioned
Flink version corresponds with Flink 1.12 [1], which isn't supported in the
Flink community anymore. Are you restarting the job from a savepoint or
starting fresh without state at all?

Best regards,

Martijn

[1]
https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0

On Sun, Oct 2, 2022 at 3:38 AM Ori Popowski  wrote:

> Hi,
>
> We're using Flink 2.10.2 on Google Dataproc.
>
> Lately we experience a very unusual problem: the job fails and when it's
> trying to recover we get this error:
>
> Slot request bulk is not fulfillable! Could not allocate the required slot
> within slot request timeout
>
> I investigated what happened and I saw that the failure is caused by a
> heartbeat timeout to one of the containers. I looked at the container's
> logs and I saw something unusual:
>
>1. Eight minutes before the heartbeat timeout the logs show connection
>problems to the Confluent Kafka topic and also to Datadog, which means
>there's a network issue with the whole node or just the specific container.
>2. The container logs disappear at this point, but the node logs show
>multiple Garbage Collection pauses, ranging from 10 seconds to 215 (!)
>seconds.
>
> It looks like right after the network issue the node itself gets into an
> endless GC phase, and my theory is that the slots are not fulfillable
> because the node itself is not available because it gets into an endless GC.
>
> I want to note that we've been running this job for months without any
> issues. The issues started one month ago arbitrarily, not following a Flink
> version upgrade, job code upgrade, change in amount or type of data being
> processed, and neither a Dataproc image version change.
>
> Attached are job manager jogs, container logs, and node logs.
>
> How can we recover from this issue?
>
> Thanks!
>
>


Can temporal table functions only be registered using the table API?

2022-10-04 Thread Salva Alcántara
Based on this:

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/

It seems that the only way of registering temporal table functions is via
the Table API.

If that is the case, is there a way to make this example work

```
SELECT
  SUM(amount * rate) AS amount
FROM
  orders,
  LATERAL TABLE (rates(order_time))
WHERE
  rates.currency = orders.currency
```

without the Table API, just using SQL? E.g., is it possible to deploy the
temporal table function to the cluster (by packaging it in a jar file) and
then run the above query from the Flink SQL CLI?

Thanks in advance,

Salva


Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-04 Thread Martijn Visser
Hi Yaroslav,

Thanks for the feedback, that's much appreciated! Regarding Java 17 as a
prerequisite, we would have to break compatibility already since Scala
2.12.7 doesn't compile on Java 17 [1].

Given that we can only remove Scala APIs with the next major Flink (2.0)
version, would that still impact you a lot? I do imagine that if we get to
a Flink 2.0 version there would be more breaking involved anyway. The
biggest consequence of deprecating support for Scala in Flink 1.x would be
that new APIs would only be available in Java, but since these don't exist
yet there would be no refactoring involved. I can imagine that we might
change something in an existing API, but that would have certain
compatibility guarantees already (depending if it's
Public/PublicEvolving/Experimental). If a change would happen there, I
think it would be smaller refactoring.

Best regards,

Martijn

[1] https://issues.apache.org/jira/browse/FLINK-25000

On Tue, Oct 4, 2022 at 10:58 AM Yaroslav Tkachenko 
wrote:

> Hi Martijn,
>
> As a Scala user, this change would affect me a lot and I'm not looking
> forward to rewriting my codebase, and it's not even a very large one :)
>
> I'd like to suggest supporting Java 17 as a prerequisite (
> https://issues.apache.org/jira/browse/FLINK-15736). Things like switch
> expressions and records could simplify the migration quite a bit. Would you
> consider adding it to the FLIP?
>
> On Tue, Oct 4, 2022 at 10:50 AM Jing Ge  wrote:
>
>> Hi Martijn,
>>
>> Thanks for bringing this up. It is generally a great idea, so +1.
>>
>> Since both scala extension projects mentioned in the FLIP are still very
>> young and I don't think they will attract more scala developers as Flink
>> could just because they are external projects. It will be a big issue for
>> users who have to rewrite their large codebases. Those users should be
>> aware of the effort from now on and would better not count on those scala
>> extension projects and prepare their migration plan before Flink 2.0.
>>
>> Best regards,
>> Jing
>>
>>
>> On Tue, Oct 4, 2022 at 1:59 PM Martijn Visser 
>> wrote:
>>
>>> Hi Marton,
>>>
>>> You're making a good point, I originally wanted to include already the
>>> User mailing list to get their feedback but forgot to do so. I'll do some
>>> more outreach via other channels as well.
>>>
>>> @Users of Flink, I've made a proposal to deprecate and remove Scala API
>>> support in a future version of Flink. Your feedback on this topic is very
>>> much appreciated.
>>>
>>> Regarding the large Scala codebase for Flink, a potential alternative
>>> could be to have a wrapper for all Java APIs that makes them available as
>>> Scala APIs. However, this still requires Scala maintainers and I don't
>>> think that we currently have those in our community. The easiest solution
>>> for them would be to use the Java APIs directly. Yes it would involve work,
>>> but we won't actually be able to remove the Scala APIs until Flink 2.0 so
>>> there's still time for that :)
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Tue, Oct 4, 2022 at 1:26 AM Márton Balassi 
>>> wrote:
>>>
 Hi Martjin,

 Thanks for compiling the FLIP. I agree with the sentiment that Scala
 poses
 considerable maintenance overhead and key improvements (like 2.13 or
 2.12.8
 supports) are hanging stale. With that said before we make this move we
 should attempt to understand the userbase affected.
 A quick Slack and user mailing list search does return quite a bit of
 results for scala (admittedly a cursory look at them suggest that many
 of
 them have to do with missing features in Scala that exist in Java or
 Scala
 versions). I would love to see some polls on this topic, we could also
 use
 the Flink twitter handle to ask the community about this.

 I am aware of users having large existing Scala codebases for Flink.
 This
 move would pose a very large effort on them, as they would need to
 rewrite
 much of their existing code. What are the alternatives in your opinion,
 Martjin?

 On Tue, Oct 4, 2022 at 6:22 AM Martijn Visser >>> >
 wrote:

 > Hi everyone,
 >
 > I would like to open a discussion thread on FLIP-265 Deprecate and
 remove
 > Scala API support. Please take a look at
 >
 >
 https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support
 > and provide your feedback.
 >
 > Best regards,
 >
 > Martijn
 > https://twitter.com/MartijnVisser82
 > https://github.com/MartijnVisser
 >

>>>


Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-04 Thread Clayton Wohl
+1

At my employer, we maintain several Flink jobs in Scala. We've been writing
newer jobs in Java, and we'd be fine with porting our Scala jobs over to
the Java API.

I'd like to request Java 17 support. Specifically, Java records is a
feature our Flink code would use a lot of and make the Java syntax much
nicer.


Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-04 Thread Yaroslav Tkachenko
Hi Martijn,

As a Scala user, this change would affect me a lot and I'm not looking
forward to rewriting my codebase, and it's not even a very large one :)

I'd like to suggest supporting Java 17 as a prerequisite (
https://issues.apache.org/jira/browse/FLINK-15736). Things like switch
expressions and records could simplify the migration quite a bit. Would you
consider adding it to the FLIP?

On Tue, Oct 4, 2022 at 10:50 AM Jing Ge  wrote:

> Hi Martijn,
>
> Thanks for bringing this up. It is generally a great idea, so +1.
>
> Since both scala extension projects mentioned in the FLIP are still very
> young and I don't think they will attract more scala developers as Flink
> could just because they are external projects. It will be a big issue for
> users who have to rewrite their large codebases. Those users should be
> aware of the effort from now on and would better not count on those scala
> extension projects and prepare their migration plan before Flink 2.0.
>
> Best regards,
> Jing
>
>
> On Tue, Oct 4, 2022 at 1:59 PM Martijn Visser 
> wrote:
>
>> Hi Marton,
>>
>> You're making a good point, I originally wanted to include already the
>> User mailing list to get their feedback but forgot to do so. I'll do some
>> more outreach via other channels as well.
>>
>> @Users of Flink, I've made a proposal to deprecate and remove Scala API
>> support in a future version of Flink. Your feedback on this topic is very
>> much appreciated.
>>
>> Regarding the large Scala codebase for Flink, a potential alternative
>> could be to have a wrapper for all Java APIs that makes them available as
>> Scala APIs. However, this still requires Scala maintainers and I don't
>> think that we currently have those in our community. The easiest solution
>> for them would be to use the Java APIs directly. Yes it would involve work,
>> but we won't actually be able to remove the Scala APIs until Flink 2.0 so
>> there's still time for that :)
>>
>> Best regards,
>>
>> Martijn
>>
>> On Tue, Oct 4, 2022 at 1:26 AM Márton Balassi 
>> wrote:
>>
>>> Hi Martjin,
>>>
>>> Thanks for compiling the FLIP. I agree with the sentiment that Scala
>>> poses
>>> considerable maintenance overhead and key improvements (like 2.13 or
>>> 2.12.8
>>> supports) are hanging stale. With that said before we make this move we
>>> should attempt to understand the userbase affected.
>>> A quick Slack and user mailing list search does return quite a bit of
>>> results for scala (admittedly a cursory look at them suggest that many of
>>> them have to do with missing features in Scala that exist in Java or
>>> Scala
>>> versions). I would love to see some polls on this topic, we could also
>>> use
>>> the Flink twitter handle to ask the community about this.
>>>
>>> I am aware of users having large existing Scala codebases for Flink. This
>>> move would pose a very large effort on them, as they would need to
>>> rewrite
>>> much of their existing code. What are the alternatives in your opinion,
>>> Martjin?
>>>
>>> On Tue, Oct 4, 2022 at 6:22 AM Martijn Visser 
>>> wrote:
>>>
>>> > Hi everyone,
>>> >
>>> > I would like to open a discussion thread on FLIP-265 Deprecate and
>>> remove
>>> > Scala API support. Please take a look at
>>> >
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support
>>> > and provide your feedback.
>>> >
>>> > Best regards,
>>> >
>>> > Martijn
>>> > https://twitter.com/MartijnVisser82
>>> > https://github.com/MartijnVisser
>>> >
>>>
>>


Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-04 Thread Jing Ge
Hi Martijn,

Thanks for bringing this up. It is generally a great idea, so +1.

Since both scala extension projects mentioned in the FLIP are still very
young and I don't think they will attract more scala developers as Flink
could just because they are external projects. It will be a big issue for
users who have to rewrite their large codebases. Those users should be
aware of the effort from now on and would better not count on those scala
extension projects and prepare their migration plan before Flink 2.0.

Best regards,
Jing


On Tue, Oct 4, 2022 at 1:59 PM Martijn Visser 
wrote:

> Hi Marton,
>
> You're making a good point, I originally wanted to include already the
> User mailing list to get their feedback but forgot to do so. I'll do some
> more outreach via other channels as well.
>
> @Users of Flink, I've made a proposal to deprecate and remove Scala API
> support in a future version of Flink. Your feedback on this topic is very
> much appreciated.
>
> Regarding the large Scala codebase for Flink, a potential alternative
> could be to have a wrapper for all Java APIs that makes them available as
> Scala APIs. However, this still requires Scala maintainers and I don't
> think that we currently have those in our community. The easiest solution
> for them would be to use the Java APIs directly. Yes it would involve work,
> but we won't actually be able to remove the Scala APIs until Flink 2.0 so
> there's still time for that :)
>
> Best regards,
>
> Martijn
>
> On Tue, Oct 4, 2022 at 1:26 AM Márton Balassi 
> wrote:
>
>> Hi Martjin,
>>
>> Thanks for compiling the FLIP. I agree with the sentiment that Scala poses
>> considerable maintenance overhead and key improvements (like 2.13 or
>> 2.12.8
>> supports) are hanging stale. With that said before we make this move we
>> should attempt to understand the userbase affected.
>> A quick Slack and user mailing list search does return quite a bit of
>> results for scala (admittedly a cursory look at them suggest that many of
>> them have to do with missing features in Scala that exist in Java or Scala
>> versions). I would love to see some polls on this topic, we could also use
>> the Flink twitter handle to ask the community about this.
>>
>> I am aware of users having large existing Scala codebases for Flink. This
>> move would pose a very large effort on them, as they would need to rewrite
>> much of their existing code. What are the alternatives in your opinion,
>> Martjin?
>>
>> On Tue, Oct 4, 2022 at 6:22 AM Martijn Visser 
>> wrote:
>>
>> > Hi everyone,
>> >
>> > I would like to open a discussion thread on FLIP-265 Deprecate and
>> remove
>> > Scala API support. Please take a look at
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support
>> > and provide your feedback.
>> >
>> > Best regards,
>> >
>> > Martijn
>> > https://twitter.com/MartijnVisser82
>> > https://github.com/MartijnVisser
>> >
>>
>


Question about Flink Broadcast State event ordering

2022-10-04 Thread Qing Lim
Hi Flink user group,

I have a question around broadcast.

Reading the docs 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#important-considerations,
 it says the following:

> Order of events in Broadcast State may differ across tasks: Although 
> broadcasting the elements of a stream guarantees that all elements will 
> (eventually) go to all downstream tasks, elements may arrive in a different 
> order to each task. So the state updates for each incoming element MUST NOT 
> depend on the ordering of the incoming events.

I think this is refering to the order between broadcasted element and non 
broadcasted element, right?
The broadcasted element should arrive in the same order across all tasks, right?

For example, given a broadcasted stream A, and a non-broadcasted stream B

When joining A and B, elements from A should always reach all tasks in the same 
order right? Its just the interleaving of A and B that might differ across 
tasks, did I understand it correctly? I wasn't sure because its not clear to me 
by just reading the doc, happy to update the doc once its clarified here.

Kind regards.

This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. If you are 
not the intended recipient of this e-mail you are hereby notified that any 
dissemination, distribution, or copying of its content is strictly prohibited. 
If you have received this message in error, please notify the sender by return 
e-mail and destroy the message and all copies in your possession.

To find out more details about how we may collect, use and share your personal 
information, please see https://www.mwam.com/privacy-policy. This includes 
details of how calls you make to us may be recorded in order for us to comply 
with our legal and regulatory obligations.

To the extent that the contents of this email constitutes a financial 
promotion, please note that it is issued only to and/or directed only at 
persons who are professional clients or eligible counterparties as defined in 
the FCA Rules. Any investment products or services described in this email are 
available only to professional clients and eligible counterparties. Persons who 
are not professional clients or eligible counterparties should not rely or act 
on the contents of this email.

Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated or related person of 
Marshall Wace North America L.P. (“MWNA”), which is registered with the US 
Securities and Exchange Commission (“SEC”) as an investment adviser.  
Registration with the SEC does not imply that MWNA or its employees possess a 
certain level of skill or training.


Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-04 Thread Martijn Visser
Hi Marton,

You're making a good point, I originally wanted to include already the User
mailing list to get their feedback but forgot to do so. I'll do some more
outreach via other channels as well.

@Users of Flink, I've made a proposal to deprecate and remove Scala API
support in a future version of Flink. Your feedback on this topic is very
much appreciated.

Regarding the large Scala codebase for Flink, a potential alternative could
be to have a wrapper for all Java APIs that makes them available as Scala
APIs. However, this still requires Scala maintainers and I don't think that
we currently have those in our community. The easiest solution for them
would be to use the Java APIs directly. Yes it would involve work, but we
won't actually be able to remove the Scala APIs until Flink 2.0 so there's
still time for that :)

Best regards,

Martijn

On Tue, Oct 4, 2022 at 1:26 AM Márton Balassi 
wrote:

> Hi Martjin,
>
> Thanks for compiling the FLIP. I agree with the sentiment that Scala poses
> considerable maintenance overhead and key improvements (like 2.13 or 2.12.8
> supports) are hanging stale. With that said before we make this move we
> should attempt to understand the userbase affected.
> A quick Slack and user mailing list search does return quite a bit of
> results for scala (admittedly a cursory look at them suggest that many of
> them have to do with missing features in Scala that exist in Java or Scala
> versions). I would love to see some polls on this topic, we could also use
> the Flink twitter handle to ask the community about this.
>
> I am aware of users having large existing Scala codebases for Flink. This
> move would pose a very large effort on them, as they would need to rewrite
> much of their existing code. What are the alternatives in your opinion,
> Martjin?
>
> On Tue, Oct 4, 2022 at 6:22 AM Martijn Visser 
> wrote:
>
> > Hi everyone,
> >
> > I would like to open a discussion thread on FLIP-265 Deprecate and remove
> > Scala API support. Please take a look at
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support
> > and provide your feedback.
> >
> > Best regards,
> >
> > Martijn
> > https://twitter.com/MartijnVisser82
> > https://github.com/MartijnVisser
> >
>


Re: Does kubernetes operator support manually triggering savepoint with canceling the job?

2022-10-04 Thread Őrhidi Mátyás
This should do the trick:

job:
  upgradeMode: savepoint
  state: suspended

In the CR you should see something similar, after applying the above change.

lastSavepoint:
  formatType: CANONICAL
  location: file:/flink-data/savepoints/savepoint-fc61e1-b9cf089c260e
  timeStamp: 1664872178110
  triggerNonce: 0
  triggerType: UPGRADE


On Tue, Oct 4, 2022 at 7:57 AM Geng Biao  wrote:

> Hi liting,
>
> Maybe you can check codes of deleteClusterDeployment. When savepoint is
> finished, the operator will delete the job. Is the job not deleted as
> expected?
>
> Best,
> Bias Geng
>
> 获取 Outlook for iOS 
> --
> *发件人:* Liting Liu (litiliu) 
> *发送时间:* Tuesday, October 4, 2022 12:53:45 PM
> *收件人:* user 
> *主题:* Does kubernetes operator support manually triggering savepoint with
> canceling the job?
>
> Hello Flink community:
> I want to manually trigger the savepoint with the help of kubernetes
> operator. But seems kubernetes operator hasn't provided an option for
> whether cancling the job when triggering savepoint. Because the
> `cancelJob` parameter was hard coded to false in latest code
> AbstractFlinkService.java#L299
> 
> .
>   Do i have to watch the savepoint finish myself, then cancel this job
> ASAP?  And do we have a plan to support this option?
>


Re: 提交新的flink-sql的最佳方案是什么?

2022-10-04 Thread Geng Biao
Hi,
能麻烦问下你之前使用过哪个支持动态修改SQL的框架或系统吗?

Best,
Biao Geng

获取 Outlook for iOS

发件人: Zeguang ZHANG 
发送时间: Friday, September 30, 2022 3:26:22 PM
收件人: user-zh@flink.apache.org 
主题: 提交新的flink-sql的最佳方案是什么?

您好,
  我们在使用flink-13版本,使用flink-sql时候,每当更改sql,需要先cancel job然后 submit 新的 sql。
请问怎么做能在提交job时候不先取消job?


张泽光


Internal