Re: Any lessons-learned with boundless (infinite) joins, especially at scale?

2022-05-25 Thread Yaroslav Tkachenko
Hi Devin,

I gave a talk called "Storing State Forever: Why It Can Be Good For Your
Analytics", which may be very relevant:
- https://www.youtube.com/watch?v=tiGxEGPyqCg
-
https://www.slideshare.net/sap1ens/storing-state-forever-why-it-can-be-good-for-your-analytics

On Wed, May 25, 2022 at 8:04 PM Devin Bost  wrote:

> Hi,
>
> I'm wondering if anyone could share if they've tried using infinite joins
> to join large amounts of state in real-time. If so, how did it go? What was
> the scale? Were there any "gotchas" or things that needed to be tuned?
> We're considering trying this at scale, and I'd love to hear some words of
> wisdom from someone who has tried this.
>
> Much thanks!
>
> Devin G. Bost
>


Any lessons-learned with boundless (infinite) joins, especially at scale?

2022-05-25 Thread Devin Bost
Hi,

I'm wondering if anyone could share if they've tried using infinite joins
to join large amounts of state in real-time. If so, how did it go? What was
the scale? Were there any "gotchas" or things that needed to be tuned?
We're considering trying this at scale, and I'd love to hear some words of
wisdom from someone who has tried this.

Much thanks!

Devin G. Bost


Re: LinkedMap ClassCastException issue

2022-05-25 Thread Shengkai Fang
Hi.

Could you tell us the version of the Flink you are using? What's the
version of commons-collections:commons-collections:jar when you compile the
sql and the version in the cluster? It's possible you compile the sql and
submit with the different version.

I am not sure how you submit your flink sql job. Do you submit your job
with sql client or use jars to execute?

Best,
Shengkai

wang <24248...@163.com> 于2022年5月25日周三 15:04写道:

> Hi dear engineers,
>
> Resently I encountered another issue, after I submited a flink sql job, it
> throws an exception:
>
>
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> org.apache.commons.collections.map.LinkedMap to field 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
>  of type org.apache.commons.collections.map.LinkedMap in instance of 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
>   at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
>  ~[?:1.8.0_162]
>   at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) 
> ~[?:1.8.0_162]
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) 
> ~[?:1.8.0_162]
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) 
> ~[?:1.8.0_162]
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) 
> ~[?:1.8.0_162]
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) 
> ~[?:1.8.0_162]
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:159)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_162]
>
>
>
>
> Then I searched the web, got one answer that can solve this issue:
> config "classloader.resolve-order: parent-first" in flink-conf.yaml
> Indeed, it works for this issue.
>
> But unfortunately, I'm not allowed to change the classloader.resolve-order
> to parent-first, it must be clild-first. As  parent-first will brought me
> other classload related issues.
>
> Then I tried below configuration in flink-conf.yaml:
> classloader.parent-first-patterns.additional:
> org.apache.commons.collections
>
> It can solve that exception, but it's very wired this could cause other
> issues.
>
> So my question is, is there other ways to solve the exception above?
>  Thanks so much for you help!
>
>
> Thanks && Regards,
> Hunk
>
>
>
>
>
>
>


Re: LinkedMap ClassCastException issue

2022-05-25 Thread Shengkai Fang
Hi.

Could you tell us the version of the Flink you are using? What's the
version of commons-collections:commons-collections:jar when you compile the
sql and the version in the cluster? It's possible you compile the sql and
submit with the different version.

I am not sure how you submit your flink sql job. Do you submit your job
with sql client or use jars to execute?

Best,
Shengkai

wang <24248...@163.com> 于2022年5月25日周三 15:04写道:

> Hi dear engineers,
>
> Resently I encountered another issue, after I submited a flink sql job, it
> throws an exception:
>
>
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> org.apache.commons.collections.map.LinkedMap to field 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
>  of type org.apache.commons.collections.map.LinkedMap in instance of 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
>   at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
>  ~[?:1.8.0_162]
>   at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) 
> ~[?:1.8.0_162]
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) 
> ~[?:1.8.0_162]
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) 
> ~[?:1.8.0_162]
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) 
> ~[?:1.8.0_162]
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) 
> ~[?:1.8.0_162]
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) 
> ~[?:1.8.0_162]
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:159)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>   at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_162]
>
>
>
>
> Then I searched the web, got one answer that can solve this issue:
> config "classloader.resolve-order: parent-first" in flink-conf.yaml
> Indeed, it works for this issue.
>
> But unfortunately, I'm not allowed to change the classloader.resolve-order
> to parent-first, it must be clild-first. As  parent-first will brought me
> other classload related issues.
>
> Then I tried below configuration in flink-conf.yaml:
> classloader.parent-first-patterns.additional:
> org.apache.commons.collections
>
> It can solve that exception, but it's very wired this could cause other
> issues.
>
> So my question is, is there other ways to solve the exception above?
>  Thanks so much for you help!
>
>
> Thanks && Regards,
> Hunk
>
>
>
>
>
>
>


Re: length value for some classes extending LogicalType.

2022-05-25 Thread Shengkai Fang
Hi.

It will also influence how Flink serialize/deserialize the RowData. For
example, Flink will build the TimestampDataSerializer with specified
precision in the type. You can see it only extract the expected part to
serialize[1]. But for char/varchar type, the serializer will not truncate
the string if it exceeds the specified length when serialize/deserialize
the RowData.

Best,
Shengkai

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TimestampDataSerializer.java#L108

yuxia  于2022年5月25日周三 20:58写道:

> IMO, the behaviors depends on how you convert your string data from extern
> system to Flink's intern data or, conversely.
>
> I think it's more like a hint to tell how to convert the string data
> between extern system including source and sink.
>
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"Krzysztof Chmielewski" 
> *收件人: *"User" 
> *发送时间: *星期三, 2022年 5 月 25日 下午 5:29:10
> *主题: *length value for some classes extending LogicalType.
>
> Hi,
> some classes extending LogicalType.java such as VarCharType, BinaryType,
> CharType and few others have an optional argument "length". If not
> specified, length is set to default value which is 1.
>
> I would like to ask, what are the implications of that? What can happen if
> I use the default length value 1 but the actual length of the data will be
> bigger than 1?
>
> For example:
> RowType.of("col1", new CharType()) <- this will use default length value 1.
>
> Regards,
> Krzysztof Chmielewski
>
>


Re: 1.13.5版本sql大小64k限制bug

2022-05-25 Thread Yun Tang
Hi

请使用英文在dev社区发送邮件。另外关于使用方面的问题,建议向user-zh 频道发送,已经帮你转发到相关邮件列表了。


祝好
唐云

From: Lose control ./ <286296...@qq.com.INVALID>
Sent: Tuesday, May 24, 2022 9:15
To: dev 
Subject: 1.13.5版本sql大小64k限制bug

请问各位大神,1.13.5版本sql大小64k限制如何修改啊?谢谢


Flink metrics flattened after Job restart

2022-05-25 Thread Sahil Aulakh
Hi Flink Community

We are using Flink version 1.13.5 for our application and every time the
job restarts, Flink Job metrics are flattened following the restart.
For e.g. we are using lastCheckpointDuration and on 05/05 our job restarted
and at the same time the checkpoint duration metric flattened. Is it a
known issue? If there is any workaround, please let me know.

Thanks
Sahil Aulakh


how does a slow FaaS affect the Flink StateFun cluster?

2022-05-25 Thread Marco Villalobos
If the performance of a stateful function (FaaS) is very slow, how does this 
impact performance on the Flink StateFun Cluster?

I am trying to figure out what is too slow for a FaaS.  I expect the Flink 
StateFun Cluster to receive about 2000 events per a minute, but some, not all 
FaaS might take many seconds, even a few minutes to complete.

I'd appreciate any advice.

Thank you.

Flink 1.14.4 -> 1.15.0 Upgrade Problem

2022-05-25 Thread Clayton Wohl
I have a Flink job that has been running with Flink 1.14.4 perfectly for a
few months.

I tried upgrading to Flink 1.15.0. There are no error messages or
exceptions, it runs perfectly fine for several hours, but after a few hours
the Flink app starts to lag in processing an input Kafka topic. I can see
the lag grow linearly in my Grafana dashboards that track Kafka lag. The
lag continues to grow indefinitely until I manually restart the Flink job,
then the Flink job will catch up with old data, the lag will drop to zero,
the application will run fine for several hours, and then the lag issue
will happen again and lag will steadily grow until I manually restart the
Flink job.

When I revert the application back to Flink 1.14.4, this lag issue
completely goes away. I see no runtime errors or exceptions.

A few quick environment details:
- The Kafka brokers are running Kafka 2.8.1
- The Flink app is running on Kubernetes with the Spotify Flink Operator
- The Flink code is Java using the newer KafkaSource/KafkaSink API, not the
older KafkaConsumer/KafkaProduer API.

The Flink app consumes from seven input Kafka topics, and for each distinct
input topic, writes output values to a distinct output topic. Most of the
processing happens within a RichAsyncFunction which does some processing
against an external database. The lag issue mentioned here happens on
different topics. And if I let the app run long enough, it will happen on
multiple topics. Also, when the lag issue is happening, the app is still
processing records on the affected topics. For some reason it's processing
fewer record slower than the incoming message rate, which is the definition
of lag. But clearly, the lag isn't caused by resources, but by a software
bug within Flink.

I intend to keep this job running Flink 1.14.4 until a Flink 1.15.1 patch
comes out that supposedly addresses this issue. This job is not using or
requiring any new Flink 1.15.0 functionality. However, we prefer to use the
newest versions when we can. Switching Flink versions is just changing
Maven dependencies, changing the base Flink Docker image version, and the
Flink version tag specified to the Kubernetes Spotify Operator.

I was hoping this report would help the flink developers with a heads up
that there is a new bug introduced in 1.15.0. If there is anything I should
try, let me know. Thanks :)


Re: Flink DataStream and remote Stateful Functions interoperability

2022-05-25 Thread Tymur Yarosh
Hi Himanshu,

The short answer is you should configure Stateful Functions in your job. Here 
is an example 
https://github.com/f1xmAn/era-locator/blob/34dc4f77539195876124fe604cf64c61ced4e5da/src/main/java/com/github/f1xman/era/StreamingJob.java#L68.

Check out this article on Flink DataStream and Stateful Functions 
interoperability 
https://medium.com/devoops-and-universe/realtime-detection-of-russian-crypto-phone-era-with-flink-datastream-and-stateful-functions-e77794fedc2a.

Best,
Tymur Yarosh
On 24 May 2022, 21:16 +0300, Himanshu Sareen , 
wrote:
> Team,
>
> I'm working on a POC where our existing Stateful Functions ( remote ) can 
> interact with Datastream API.
> https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/sdk/flink-datastream/
>
> I started Flink cluster - ./bin/start-cluster.sh
> Then I submitted the .jar to Flink.
>
> However, on submitting only Embedded function is called by Datastream code.
>
> I'm unable to invoke stateful functions as module.yaml is not loaded.
>
> Can someone help me in understanding how can we deploy Stateful function code 
> (module.yaml) and Datastream api code parllely on Flink cluster.
>
>
> Regards
> Himanshu
>


Unsubscribe

2022-05-25 Thread Beni Bilme
Unsubscribe



Re: Flink Kubernetes Operator: Specifying env variables from ConfigMaps

2022-05-25 Thread Mads Ellersgaard Kalør
Hi,

Thanks a lot for your clarifications. It makes perfect sense.

Mads

From: Gyula Fóra 
Sent: 23 May 2022 15:39
To: Mads Ellersgaard Kalør 
Cc: user@flink.apache.org 
Subject: Re: Flink Kubernetes Operator: Specifying env variables from ConfigMaps

Hi Mads!

I think you need to use the podTemplate for this. You can either do it in the 
top level spec or customize it for tm/jm respectively.

Keep in mind that pod templates are merged with the base flink template so it's 
enough to specify the fields relevant for you (in these case the env variables 
for the main container).
So it should be fairly simple :)

Let us know if you hit any issues.

Cheers
Gyula

On Mon, May 23, 2022 at 6:20 AM Mads Ellersgaard Kalør 
mailto:m...@kaunt.com>> wrote:
Hi,

We use a number of environment variables to configure our Flink pipelines, such 
as Kafka connection info, hostnames for external services etc. This works well 
when running a standalone Kubernetes deployment or on a local environment in 
Docker, but I cannot find any documentation about how to specify environment 
variables (from ConfigMaps or Secrets) in the Flink Kubernetes Operator (I 
would expect it to be in the JobSpec part of the FlinkDeploymentSpec).

Do I have to create a PodTemplate, or is there a simpler way?


Thanks,

Mads


Re: length value for some classes extending LogicalType.

2022-05-25 Thread yuxia
IMO, the behaviors depends on how you convert your string data from extern 
system to Flink's intern data or, conversely. 

I think it's more like a hint to tell how to convert the string data between 
extern system including source and sink. 


Best regards, 
Yuxia 


发件人: "Krzysztof Chmielewski"  
收件人: "User"  
发送时间: 星期三, 2022年 5 月 25日 下午 5:29:10 
主题: length value for some classes extending LogicalType. 

Hi, 
some classes extending LogicalType.java such as VarCharType, BinaryType, 
CharType and few others have an optional argument "length". If not specified, 
length is set to default value which is 1. 

I would like to ask, what are the implications of that? What can happen if I 
use the default length value 1 but the actual length of the data will be bigger 
than 1? 

For example: 
RowType.of("col1", new CharType()) <- this will use default length value 1. 

Regards, 
Krzysztof Chmielewski 



Re:Re: flink sql api, exception when setting "table.exec.state.ttl"

2022-05-25 Thread 李诗君



I have feagured this out.
It was because I put a flink-connector-tidb-cdc.jar in my Flink's lib folder 
earlier, and it is shipped with scala 2.11, while my flink is shipped with 
scala2.12.
Some how when I submit a job with GroupAggregate operator, it needs to load 
keyed rocksdb states, and here come into a conflict.
I will look into it and give a solution.










At 2022-05-23 20:55:39, "Chesnay Schepler"  wrote:

You're probably mixing Flink versions.


From the stack trace we can see that Flink classes are being loaded from 2 
different jars (rocketmq-flink-1.0.0-SNAPSHOT.jar/flink-dist_2.12-1.13.5.jar); 
I'd suggest to resolve that first and see if the error persists.



On 23/05/2022 14:32, 李诗君 wrote:

flink version: 1.13.5


java code:


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);env.enableCheckpointing(6);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6);
env.getCheckpointConfig().setCheckpointTimeout(6);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(

CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
//
env.getCheckpointConfig().setCheckpointStorage("hdfs://test-wh-hadoop-1:9000/flink-checkpoints");

env.setStateBackend(new 
RocksDBStateBackend("hdfs://test-wh-hadoop-1:9000/flink-checkpoints",
true));

tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
Configuration configuration = tableEnv.getConfig().getConfiguration();
//
configuration.setString("table.exec.resource.default-parallelism","16");
configuration.setString("table.exec.state.ttl","720");


and when I submit this job , I got this:


Sink: Sink(table=[default_catalog.default_database.rts_board_trans_compute], 
fields=[mchnt_id, time_hour, channel, trans_count, trans_amount, 
average_amount]) (1/1) (f8649f8434775cbda10bcedce96c9ae3) switched 
fromINITIALIZING to FAILEDon container_1647420330066_0473_01_02 @ 
test-wh-hadoop-1 (dataPort=38604).
java.lang.UnsatisfiedLinkError: 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J
at 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(NativeMethod)
 ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:13) 
~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:107)
 ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:133)
 ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:167)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:144)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:208)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:143)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:130)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)

Re: Source vs SourceFunction and testing

2022-05-25 Thread Qingsheng Ren
Glad to see you have resolved the issue! 

If you want to learn more about the Source API, the Flink document [1] has a 
detailed description about it. The original proposal FLIP-27 [2] is also a good 
reference. 

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/sources/
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

Cheers, 

Qingsheng

> On May 25, 2022, at 17:54, Piotr Domagalski  wrote:
> 
> Thank you Qingsheng, this context helps a lot!
> 
> And once again thank you all for being such a helpful community!
> 
> P.S. I actually struggled for a bit trying to understand why my refactored 
> solution which accepts DataStream<> wouldn't work ("no operators defined in 
> the streaming topology"). Turns out, my assumption that I can call 
> StreamExecutionEnvironment.getExecutionEnvironment() multiple times and get 
> the same environment, was wrong. I had env.addSource and env.fromSource calls 
> using one instance of the environment, but then called env.execute() on 
> another instance :facepalm:
> 
> On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren  wrote:
> Hi Piotr,
> 
> I’d like to share my understanding about this. Source and SourceFunction are 
> both interfaces to data sources. SourceFunction was designed and introduced 
> earlier and as the project evolved, many shortcomings emerged. Therefore, the 
> community re-designed the source interface and introduced the new Source API 
> in FLIP-27 [1]. 
> 
> Finally we will deprecate the SourceFunction and use Source as the only 
> interface for all data sources, but considering the huge cost of migration 
> you’ll see SourceFunction and Source co-exist for some time, like the 
> ParallelTestSource you mentioned is still on SourceFunction, and KafkaSource 
> as a pioneer has already migrated to the new Source API.
> 
> I think the API to end users didn't change a lot: both 
> env.addSource(SourceFunction) and env.fromSource(Source) return a DataStream, 
> and you could apply downstream transformations onto it. 
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>  
> 
> Cheers,
> 
> Qingsheng
> 
> > On May 25, 2022, at 03:19, Piotr Domagalski  wrote:
> > 
> > Hi Ken,
> > 
> > Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, 
> > navigating the type system and being still confused about differences 
> > between Source, SourceFunction, DataStream, DataStreamOperator, etc. 
> > 
> > I think the DataStream<> type is what I'm looking for? That is, then I can 
> > use:
> > 
> > DataStream source = env.fromSource(getKafkaSource(params), 
> > watermarkStrategy, "Kafka");
> > when using KafkaSource in the normal setup
> > 
> > and
> > DataStream s = env.addSource(new ParallelTestSource<>(...));
> > when using the testing source [1]
> > 
> > Does that sound right?
> > 
> > [1] 
> > https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26
> > 
> > On Tue, May 24, 2022 at 7:57 PM Ken Krugler  
> > wrote:
> > Hi Piotr,
> > 
> > The way I handle this is via a workflow class that uses a builder approach 
> > to specifying inputs, outputs, and any other configuration settings.
> > 
> > The inputs are typically DataStream.
> > 
> > This way I can separate out the Kafka inputs, and use testing sources that 
> > give me very precise control over the inputs (e.g. I can hold up on right 
> > side data to ensure my stateful left join junction is handling deferred 
> > joins properly). I can also use Kafka unit test support (either kafka-junit 
> > or Spring embedded Kafka) if needed.
> > 
> > Then in the actual tool class (with a main method) I’ll wire up the real 
> > Kafka sources, with whatever logic is required to convert the consumer 
> > records to what the workflow is expecting.
> > 
> > — Ken
> > 
> >> On May 24, 2022, at 8:34 AM, Piotr Domagalski  wrote:
> >> 
> >> Hi,
> >> 
> >> I'm wondering: what ithe recommended way to structure the job which one 
> >> would like to test later on with `MiniCluster`.
> >> 
> >> I've looked at the flink-training repository examples [1] and they tend to 
> >> expose the main job as a class that accepts a `SourceFunction` and a 
> >> `SinkFunction`, which make sense. But then, my job is normally constructed 
> >> with `KafkaSource` which is then passed to `env.fromSource(...`.
> >> 
> >> Is there any recommended way of handling these discrepancies, ie. having 
> >> to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> >> 
> >> [1] 
> >> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
> >> 
> >> -- 
> >> Piotr Domagalski
> > 
> > --
> > Ken Krugler
> > http://www.scaleunlimited.com
> > Custom big data 

Re: Source vs SourceFunction and testing

2022-05-25 Thread Piotr Domagalski
Thank you Qingsheng, this context helps a lot!

And once again thank you all for being such a helpful community!

P.S. I actually struggled for a bit trying to understand why my refactored
solution which accepts DataStream<> wouldn't work ("no operators defined in
the streaming topology"). Turns out, my assumption that I can
call StreamExecutionEnvironment.getExecutionEnvironment() multiple times
and get the same environment, was wrong. I had env.addSource and
env.fromSource calls using one instance of the environment, but then called
env.execute() on another instance :facepalm:

On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren  wrote:

> Hi Piotr,
>
> I’d like to share my understanding about this. Source and SourceFunction
> are both interfaces to data sources. SourceFunction was designed and
> introduced earlier and as the project evolved, many shortcomings emerged.
> Therefore, the community re-designed the source interface and introduced
> the new Source API in FLIP-27 [1].
>
> Finally we will deprecate the SourceFunction and use Source as the only
> interface for all data sources, but considering the huge cost of migration
> you’ll see SourceFunction and Source co-exist for some time, like the
> ParallelTestSource you mentioned is still on SourceFunction, and
> KafkaSource as a pioneer has already migrated to the new Source API.
>
> I think the API to end users didn't change a lot: both
> env.addSource(SourceFunction) and env.fromSource(Source) return a
> DataStream, and you could apply downstream transformations onto it.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
> Cheers,
>
> Qingsheng
>
> > On May 25, 2022, at 03:19, Piotr Domagalski 
> wrote:
> >
> > Hi Ken,
> >
> > Thanks Ken. I guess the problem I had was, as a complete newbie to
> Flink, navigating the type system and being still confused about
> differences between Source, SourceFunction, DataStream, DataStreamOperator,
> etc.
> >
> > I think the DataStream<> type is what I'm looking for? That is, then I
> can use:
> >
> > DataStream source = env.fromSource(getKafkaSource(params),
> watermarkStrategy, "Kafka");
> > when using KafkaSource in the normal setup
> >
> > and
> > DataStream s = env.addSource(new ParallelTestSource<>(...));
> > when using the testing source [1]
> >
> > Does that sound right?
> >
> > [1]
> https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26
> >
> > On Tue, May 24, 2022 at 7:57 PM Ken Krugler 
> wrote:
> > Hi Piotr,
> >
> > The way I handle this is via a workflow class that uses a builder
> approach to specifying inputs, outputs, and any other configuration
> settings.
> >
> > The inputs are typically DataStream.
> >
> > This way I can separate out the Kafka inputs, and use testing sources
> that give me very precise control over the inputs (e.g. I can hold up on
> right side data to ensure my stateful left join junction is handling
> deferred joins properly). I can also use Kafka unit test support (either
> kafka-junit or Spring embedded Kafka) if needed.
> >
> > Then in the actual tool class (with a main method) I’ll wire up the real
> Kafka sources, with whatever logic is required to convert the consumer
> records to what the workflow is expecting.
> >
> > — Ken
> >
> >> On May 24, 2022, at 8:34 AM, Piotr Domagalski 
> wrote:
> >>
> >> Hi,
> >>
> >> I'm wondering: what ithe recommended way to structure the job which one
> would like to test later on with `MiniCluster`.
> >>
> >> I've looked at the flink-training repository examples [1] and they tend
> to expose the main job as a class that accepts a `SourceFunction` and a
> `SinkFunction`, which make sense. But then, my job is normally constructed
> with `KafkaSource` which is then passed to `env.fromSource(...`.
> >>
> >> Is there any recommended way of handling these discrepancies, ie.
> having to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> >>
> >> [1]
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
> >>
> >> --
> >> Piotr Domagalski
> >
> > --
> > Ken Krugler
> > http://www.scaleunlimited.com
> > Custom big data solutions
> > Flink, Pinot, Solr, Elasticsearch
> >
> >
> >
> >
> >
> > --
> > Piotr Domagalski
>
>

-- 
Piotr Domagalski


length value for some classes extending LogicalType.

2022-05-25 Thread Krzysztof Chmielewski
Hi,
some classes extending LogicalType.java such as VarCharType, BinaryType,
CharType and few others have an optional argument "length". If not
specified, length is set to default value which is 1.

I would like to ask, what are the implications of that? What can happen if
I use the default length value 1 but the actual length of the data will be
bigger than 1?

For example:
RowType.of("col1", new CharType()) <- this will use default length value 1.

Regards,
Krzysztof Chmielewski


LinkedMap ClassCastException issue

2022-05-25 Thread wang
Hi dear engineers,


Resently I encountered another issue, after I submited a flink sql job, it 
throws an exception:




Caused by: java.lang.ClassCastException: cannot assign instance of 
org.apache.commons.collections.map.LinkedMap to field 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
 of type org.apache.commons.collections.map.LinkedMap in instance of 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
 ~[?:1.8.0_162]
at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) 
~[?:1.8.0_162]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) 
~[?:1.8.0_162]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) 
~[?:1.8.0_162]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) 
~[?:1.8.0_162]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) 
~[?:1.8.0_162]
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:159)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_162]






Then I searched the web, got one answer that can solve this issue:  config 
"classloader.resolve-order: parent-first" in flink-conf.yaml
Indeed, it works for this issue.


But unfortunately, I'm not allowed to change the classloader.resolve-order to 
parent-first, it must be clild-first. As  parent-first will brought me other 
classload related issues.


Then I tried below configuration in flink-conf.yaml:
classloader.parent-first-patterns.additional: org.apache.commons.collections


It can solve that exception, but it's very wired this could cause other issues.


So my question is, is there other ways to solve the exception above?   Thanks 
so much for you help!




Thanks && Regards,
Hunk




 

LinkedMap ClassCastException issue

2022-05-25 Thread wang
Hi dear engineers,


Resently I encountered another issue, after I submited a flink sql job, it 
throws an exception:




Caused by: java.lang.ClassCastException: cannot assign instance of 
org.apache.commons.collections.map.LinkedMap to field 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
 of type org.apache.commons.collections.map.LinkedMap in instance of 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
 ~[?:1.8.0_162]
at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) 
~[?:1.8.0_162]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) 
~[?:1.8.0_162]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) 
~[?:1.8.0_162]
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) 
~[?:1.8.0_162]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) 
~[?:1.8.0_162]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) 
~[?:1.8.0_162]
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:159)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_162]






Then I searched the web, got one answer that can solve this issue:  config 
"classloader.resolve-order: parent-first" in flink-conf.yaml
Indeed, it works for this issue.


But unfortunately, I'm not allowed to change the classloader.resolve-order to 
parent-first, it must be clild-first. As  parent-first will brought me other 
classload related issues.


Then I tried below configuration in flink-conf.yaml:
classloader.parent-first-patterns.additional: org.apache.commons.collections


It can solve that exception, but it's very wired this could cause other issues.


So my question is, is there other ways to solve the exception above?   Thanks 
so much for you help!




Thanks && Regards,
Hunk