Re: Flink throws exception when submitting a job through Jenkins and Spinnaker

2023-08-13 Thread Shammon FY
Hi,

It seems that the client can not access the right network to submit you
job, maybe the address option in k8s is wrong and you can check the error
message in k8s log

Best,
Shammon FY

On Fri, Aug 11, 2023 at 11:40 PM elakiya udhayanan 
wrote:

>
> Hi Team,
> We are using Apache Flink 1.16.1 configured as a standalone Kubernetes pod
> ,for one of our applications to read from confluent Kafka topics to do
> event correlation. We are using the flink's Table API join for the same (in
> SQL format).We are able to submit the job using the flink's UI. For our DEV
> environment , we implemented a jenkins pipeline, which downloads the jar
> that is required to submit the job and also creates the flink kubernetes
> pods and copy the downloaded jar to the flink pod's folder and uses the
> flink's run command to submit the job.The deployment step happens through
> the spinnaker webhook. We use a docker file to create the kubernetes pods,
> also have a docker-entrypoin.sh which has the flink run command to submit
> the job.
>
> Everything works fine, but when the job is getting submitted , we get the
> below exception.
>
> The flink run command used is
>
> *flink run  /opt/flink/lib/application-0.0.1.jar*
> Any help is appreciated.
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Failed to execute sql
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:843)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:240)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1087)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1165)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1165)
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:867)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:827)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:918)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
> at com.sample.SampleStreamingApp.main(SampleStreamingApp.java:157)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
> at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source)
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ... 8 more
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
> 'insert-into_default_catalog.default_database.Sample'.
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
> at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:206)
> at 
> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:850)
> ... 17 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
> 15:06:44.964 [main] ERROR org.apache.flink.client.cli.CliFrontend - Error 
> while running the command.
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Failed to execute sql
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>  ~[application-0.0.1.jar:?]
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>  ~[application-0.0.1.jar:?]
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) 
> ~[application-0.0.1.jar:?]
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:843) 
> ~[application-0.0.1.jar:?]
> at org.apache

Re: Question about serialization of java.util classes

2023-08-13 Thread liu ron
Hi,

According to the test in [1], I think Flink can recognize Pojo class which
contains java List, so I think you can refer to the related Pojo class
implementation.

[1]
https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192

Best,
Ron

 于2023年8月13日周日 22:50写道:

> Greetings,
>
> I am working on a project that needs to process around 100k events per
> second and I'm trying to improve performance.
>
> Most of the classes being used are POJOs but have a couple of fields using
> a `java.util` class, either `ArrayList`, `HashSet` or `SortedSet` etc. This
> forces Flink to use Kyro and throw these warnings:
>
> ```
> class java.util.ArrayList does not contain a setter for field size
> Class class java.util.ArrayList cannot be used as a POJO type because not
> all fields are valid POJO fields, and must be processed as GenericType.
> Please read the Flink documentation on "Data Types & Serialization" for
> details of the effect on performance and schema evolution.
> ```
>
> ```
> No fields were detected for class java.util.HashSet so it cannot be used
> as a POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance and schema evolution.
> I read through the documentation and stackoverflow and the conclusion is
> that I need to make a TypeInfoFactory and use it inside a TypeInfo
> annotation over my POJO.
> ```
>
> My question is what do I need to do to get Flink to recognize my classes
> as POJOs and use the POJO serializer for better performance?
> I read through the documentation and stackoverflow and the conclusion is
> that I need to make a TypeInfoFactory and use it inside a TypeInfo
> annotation over my POJO.
> While this seems incredibly tedious and I keep thinking "there must be a
> better way", I would be fine with this solution if I could figure out how
> to do this for the Set types I'm using.
>
> Any help would be appreciated.


FeatHub : a feature store for ETL real-time features using Flink

2023-08-13 Thread Dong Lin
Dong Lin 于2023年8月14日 周一09:02写道:

> Hi all,
>
> I am writing this email to promote our open-source feature store project (
> FeatHub ) that supports using Flink
> (production-ready) and Spark (not production-ready) to compute real-time /
> offline features with pythonic declarative feature specifications.
>
> To my best knowledge, this is the most mature open-source project that
> supports using Flink as the compute engine. And it is also the only project
> that supports multiple compute engines (e.g. Flink, Spark) wth
> engine-agonistic feature definition SDK so that you can choose the best
> compute engine that meets your needs (e.g. throughput vs. latency), without
> changing your programming code, achieving a similar design goal as Apache
> Beam.
>
> As another killer feature, we recently supported application-level metrics
> so that you can define metrics (e.g. ratio of values that is null in the
> last 10 minutes) together with your features and FeatHub can automatically
> compile/compute/export these metrics to Prometheus.
>
> Please feel free to learn more about FeatHub by reading its GitHub main
> README and doc (
> https://github.com/alibaba/feathub/tree/master/docs/content). We have
> also provided multiple demos at
> https://github.com/flink-extended/feathub-examples so that you can try
> out FeatHub using docker-compose easily.
>
> Cheers,
> Dong
>
>


ETL real-time features using Flink with application-level metrics

2023-08-13 Thread Dong Lin
Hi all,

I am writing this email to promote our open-source feature store project (
FeatHub ) that supports using Flink
(production-ready) and Spark (not production-ready) to compute real-time /
offline features with pythonic declarative feature specifications.

To my best knowledge, this is the most mature open-source project that
supports using Flink as the compute engine. And it is also the only project
that supports multiple compute engines (e.g. Flink, Spark) wth
engine-agonistic feature definition SDK so that you can choose the best
compute engine that meets your needs (e.g. throughput vs. latency), without
changing your programming code, achieving a similar design goal as Apache
Beam.

As another killer feature, we recently supported application-level metrics
so that you can define metrics (e.g. ratio of values that is null in the
last 10 minutes) together with your features and FeatHub can automatically
compile/compute/export these metrics to Prometheus.

Please feel free to learn more about FeatHub by reading its GitHub main
README and doc (https://github.com/alibaba/feathub/tree/master/docs/content).
We have also provided multiple demos at
https://github.com/flink-extended/feathub-examples so that you can try out
FeatHub using docker-compose easily.

Cheers,
Dong


Question about serialization of java.util classes

2023-08-13 Thread s
Greetings,

I am working on a project that needs to process around 100k events per second 
and I'm trying to improve performance.

Most of the classes being used are POJOs but have a couple of fields using a 
`java.util` class, either `ArrayList`, `HashSet` or `SortedSet` etc. This 
forces Flink to use Kyro and throw these warnings:

```
class java.util.ArrayList does not contain a setter for field size
Class class java.util.ArrayList cannot be used as a POJO type because not all 
fields are valid POJO fields, and must be processed as GenericType. Please read 
the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance and schema evolution.
```

```
No fields were detected for class java.util.HashSet so it cannot be used as a 
POJO type and must be processed as GenericType. Please read the Flink 
documentation on "Data Types & Serialization" for details of the effect on 
performance and schema evolution.
I read through the documentation and stackoverflow and the conclusion is that I 
need to make a TypeInfoFactory and use it inside a TypeInfo annotation over my 
POJO.
```

My question is what do I need to do to get Flink to recognize my classes as 
POJOs and use the POJO serializer for better performance?
I read through the documentation and stackoverflow and the conclusion is that I 
need to make a TypeInfoFactory and use it inside a TypeInfo annotation over my 
POJO.
While this seems incredibly tedious and I keep thinking "there must be a better 
way", I would be fine with this solution if I could figure out how to do this 
for the Set types I'm using.

Any help would be appreciated.

Re: Conversion expects insert-only records but DataStream API record contains: UPDATE_BEFORE

2023-08-13 Thread liu ron
Hi,

After deep dive into the source code, I guess you use the
StreamTableEnvironment#fromDataStream method, this method only supports the
insert-only message. According to your case, I think you should use the
StreamTableEnvironment#fromChangelogStream[1], it supports consuming update
row.

[1]
https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java#L317

Best,
Ron

完结篇 <2366123...@qq.com> 于2023年8月12日周六 02:29写道:

> Flink:1.15.2
>
> I am now going to change the data stream from *DataStream* to
> *DataStream*
>
> Already implemented (*insert only works fine*), but when
> DataStream contains *update *information
>
> The error is:
> *Caused by: org.apache.flink.util.FlinkRuntimeException: Error during
> input conversion. Conversion expects insert-only records but DataStream API
> record contains: UPDATE_BEFORE*
> at
> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:121)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> *at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:180)*
> at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:160)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103)
> at com.test.KafkaFlink$1.processElement(KafkaFlink.java:118)
> at com.test.KafkaFlink$1.processElement(KafkaFlink.java:107)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
>
> *kafkaflink.java:179-180 lines of code*
>
> Row before = ChangelogToRowUtil.extractRow(RowKind.UPDATE_BEFORE,
> beforeObject, rowTypeInfo);
> collector. collect(before);
>
> The before data output is -U[1, test, 123-456-789]
>
> I would like to know : How to convert the stream containing *update* data
> from *DataStream* to *DataStream*
>