Flink-Iceberg Table Sink failing with org.apache.hadoop.fs.s3a.S3AStorageStatistics Cast exception

2022-04-28 Thread Terry Heathcote
Hi

We are running a Flink job that delivers Kafka data to an Iceberg table.
The job uses the *org.apache.iceberg.flink.CatalogLoader* and
*org.apache.iceberg.flink.TableLoader
*interfaces in combination with *org.apache.iceberg.flink.sink.FlinkSink *where
the catalog type is Hive.

We have had success in running multiple jobs to respective tables that are
stored in the same s3 bucket but recently, when attempting to write to
tables that are stored in separate s3 buckets, we have run into issues. The
first jobs submitted to the cluster run fine, however, when submitting more
jobs for sink tables with the same name (in separate database schemas and
s3 buckets), we run into a class cast exception as well as
an org.apache.hadoop.metrics2.MetricsException error stating: Metrics
source S3AMetrics{bucket-name} already exists!

Attached are both the error logs as well as the main code snippets and pom
files for better context. Any help would be greatly appreciated.

The Flink cluster version is 12.7 and we have enabled the
flink-s3-fs-hadoop jar
plugin so as to be able to write to s3 files.
ᐧ
java.lang.ClassCastException: org.apache.hadoop.fs.s3a.S3AStorageStatistics 
cannot be cast to org.apache.hadoop.fs.s3a.S3AStorageStatistics
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.createStorageStatistics(S3AFileSystem.java:636)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initializeStatisticsBinding(S3AFileSystem.java:578)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:401)
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.iceberg.hadoop.Util.getFs(Util.java:51)
at 
org.apache.iceberg.hadoop.HadoopInputFile.fromLocation(HadoopInputFile.java:54)
at 
org.apache.iceberg.hadoop.HadoopFileIO.newInputFile(HadoopFileIO.java:59)
at 
org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:252)
at 
org.apache.iceberg.BaseMetastoreTableOperations.lambda$refreshFromMetadataLocation$0(BaseMetastoreTableOperations.java:179)
at 
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:405)
at 
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:214)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:198)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:190)
at 
org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:178)
at 
org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:160)
at 
org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:200)
at 
org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:94)
at 
org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:77)
at 
org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:93)
at 
org.apache.iceberg.flink.TableLoader$CatalogTableLoader.loadTable(TableLoader.java:113)
at 
org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:125)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:432)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)

http://maven.apache.org/POM/4.0.0;
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;>

   

4.0.0

org.matrix
Java
1.0

jar


8
8
3.1.0
3.7.0
3.0.0-M5
3.0.2
1.6.0




Re: [EXT] Vertica jdbc sink error

2022-04-28 Thread Jasmin Redzepovic
Hi Martin,

here is a Jira ticket I created: 
https://issues.apache.org/jira/browse/FLINK-27429
I guess you now assign it to me, right? :)

Best Regards,
Jasmin

On 19.04.2022., at 22:34, Jasmin Redzepovic 
mailto:jasmin.redzepo...@superbet.com>> wrote:

Hi Martin,

Thanks for your answer. Regarding my contribution, I will for sure check the 
contributing guide and get familiar with Flink source code. I hope it will end 
up well and I will be able to write that functionality.

Best regards,
Jasmin

On 19.04.2022., at 09:39, Martijn Visser 
mailto:martijnvis...@apache.org>> wrote:


[CAUTION] This email comes from an external organization. Do not click links or 
open attachments unless you recognize the sender and know the content is safe.

Hi Jasmin,

Vertica is not implemented as a JDBC dialect, which is a requirement for Flink 
to support this. You can find an overview of the currently supported JDBC 
dialects in the documentation [1]. It would be interesting if you could 
contribute support for Vertica towards Flink.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/


On Fri, 15 Apr 2022 at 14:27, Jasmin Redzepovic 
mailto:jasmin.redzepo...@superbet.com>> wrote:
Hello Flink community,

I am getting this error when writing data to Vertica table:

Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Unable to create a sink for writing table 
'default_catalog.default_database.VerticaSink’.
...
Caused by: java.lang.IllegalStateException: Cannot handle such jdbc url: 
jdbc:vertica://:5433/sbtverticadb01


This is the code for creating sink table and inserting into it:

tEnv.executeSql("""
CREATE TABLE VerticaSink (
TICKET_ID STRING,
TICKET_CODE STRING,
BUSINESS_MARKET_CODE STRING,
BUSINESS_LINE_CODE STRING,
PRIMARY KEY (TICKET_ID) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:vertica://:5433/sbtverticadb01',
'table-name' = 'SBX_DE.FLINK_SINK2',
'username' = ‘username',
'password' = ‘password'
)
"”")

tEnv.executeSql("""
INSERT INTO VerticaSink
SELECT TICKET_ID, TICKET_CODE, BUSINESS_MARKET_CODE, BUSINESS_LINE_CODE from 
Transformation2
"”")

I downloaded jdbc driver for Vertica and added it into ./lib folder.
Does anyone have idea what could it be? I googled the error, but didn’t find 
anything helpful.

Thanks and best regards,
Jasmin

This email is confidential and intended solely for the use of the individual or 
entity to whom it is addressed. If you received this e-mail by mistake, please 
notify the sender immediately by e-mail and delete this e-mail from your 
system. Please be informed that if you are not the intended recipient, you 
should not disseminate, distribute, disclose, copy or use this e-mail in any 
way, the act of dissemination, distribution, disclosure, copying or taking any 
action in reliance on the contents of this information being strictly 
prohibited. This e-mail is sent by a Superbet Group company. Any views 
expressed by the sender of this email are not necessarily those of Superbet 
Group. Please note that computer viruses can be transmitted by email. You are 
advised to check this email and any attachments for the presence of viruses. 
Superbet Group cannot accept any responsibility for any viruses transmitted by 
this email and/or any attachments.




Re: AvroRowDeserializationSchema

2022-04-28 Thread Dian Fu
Yes, I think so~

On Thu, Apr 28, 2022 at 11:00 AM lan tran  wrote:

> Hi Dian,
>
> Sorry for missing your mail, so if I did as your suggestion and the Flink
> somehow crashed and we have to restart the service, does the Flink job know
> the offset where does it read from Kafka ?
>
>
>
> Sent from Mail  for
> Windows
>
>
>
> *From: *Dian Fu 
> *Sent: *Tuesday, April 26, 2022 7:54 AM
> *To: *lan tran 
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> The same code in my last reply showed how to set the UID for the source
> operator generated using Table API. I meant that you could firstly create a
> source using Table API, then convert it to a DataStream API and set uid for
> the source operator using the same code above, then perform operations with
> DataStream API.
>
> Regards,
> Dian
>
>
>
> On Mon, Apr 25, 2022 at 9:27 PM lan tran  wrote:
>
> Hi Dian,
>
> Thank again for fast response.
>
> As your suggestion above, we can apply to set the UID for only for the
> DataStream state (as you suggest to convert from table to data stream).
>
> However, at the first phase which is collecting the data from Kafka (
> having Debezium format), the UID cannot be set since we are using Table API
> (auto generate the UID).
>
> Therefore, if there is some crashed or needed revert using SavePoint, we
> cannot use it in the first phase since we cannot set the UID for this => so
> how can we revert it ?.
>
> As a result of that, we want to use DebeziumAvroRowDeserializationSchema
> and DebeziumJsonRowDeserializationSchema in the DataStream job to be able
> to use the Savepoint for the whole full flow.
>
> Best,
> Quynh
>
>
>
> Sent from Mail  for
> Windows
>
>
>
> *From: *Dian Fu 
> *Sent: *Monday, April 25, 2022 7:46 PM
> *To: *lan tran 
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> You could try the following code (also it may be a little hacky):
> ```
>
> def set_uid_for_source(ds: DataStream, uid: str):
>
> transformation = ds._j_data_stream.getTransformation()
>
>
>
> source_transformation = transformation
>
> while not source_transformation.getInputs().isEmpty():
>
> source_transformation = source_transformation.getInputs().get(0)
>
>
>
> source_transformation.setUid(uid)
>
> ```
>
> Besides, could you describe your use case a bit and also how you want to
> use DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for
> the sources with these formats, it will send UPDATE messages to downstream
> operators.
>
> Regards
> Dian
>
>
>
> On Mon, Apr 25, 2022 at 12:31 PM lan tran 
> wrote:
>
> Yeah, I already tried that way. However, if we did not use DataStream at
> first. We cannot implement the Savepoint since through the doc if we use
> TableAPI (SQL API), the uid is generated automatically which means we
> cannot revert if the system is crashed.
>
> Best,
> Quynh
>
>
>
> Sent from Mail  for
> Windows
>
>
>
> *From: *Dian Fu 
> *Sent: *Monday, April 25, 2022 11:04 AM
> *To: *lan tran 
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema are still not supported in
> Python DataStream API.
>
> Just take a further look at the Java implementation of
> DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema,
> the results type is RowData instead of Row and so it should be not that
> easy to be directly supported in Python DataStream API. However, it
> supports conversion between Table API & DataStream API[1]. Could you
> firstly create a Table which consumes data from kafka and then convert it
> to a DataStream API?
>
> Regards,
> Dian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors
>
>
>
> On Mon, Apr 25, 2022 at 11:48 AM Dian Fu  wrote:
>
> Yes, we should support them.
>
> For now, if you want to use them, you could create ones in your own
> project. You could refer to AvroRowDeserializationSchema[1] as an example.
> It should not be complicated as it's simply a wrapper of the
> Java implementation.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308
>
>
>
> On Mon, Apr 25, 2022 at 11:27 AM lan tran 
> wrote:
>
> Thank Dian !! Very appreciate this.
>
> However, I have another questions related to this. In current version or
> any updating in future, does DataStream support
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the
> documentation and seem it is not supported yet.
>
> Best,
> Quynh
>
> Sent from Mail 

Temporal join fails with "unexpected correlate variable $cor0 in the plan"

2022-04-28 Thread Matthew Brown
Hi all,

I'm trying to join the following two tables using a temporal join:

*table_1*
(
  `f0` STRING NOT NULL,
  `f1` DOUBLE NOT NULL,
  `rowtime` TIMESTAMP(3) METADATA,
  WATERMARK FOR `rowtime`: TIMESTAMP(3) AS rowtime - INTERVAL '10' SECOND
)

*table_2:*
(
  `f0` STRING NOT NULL,
  `f1` DOUBLE NOT NULL,
  `rowtime` TIMESTAMP(3) METADATA,
  WATERMARK FOR `rowtime`: TIMESTAMP(3) AS rowtime - INTERVAL '10' SECOND,
  CONSTRAINT `PK_f0` PRIMARY KEY (`f0`) NOT ENFORCED
)

using the following query:

--
*SELECT*
*  table_1.f0,*
*  table_1.f1 AS table_1_value,*
*  table_2.f1 AS table_2_value,*
*FROM table_1*
*JOIN table_2 FOR SYSTEM_TIME AS OF table_1.rowtime ON table_1.f0 =
table_2.f0*
--

and it's raising the following exception

---
Exception in thread "main" org.apache.flink.table.api.TableException:
unexpected correlate variable $cor0 in the plan
at
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
---

Has anybody come across this before? Any debugging tips?

Cheers,
Matt.

--

--
AU: +61 459 493 730
UK: +44 7927 618921
@mnbbrown


Using the official flink operator and kubernetes secrets

2022-04-28 Thread Francis Conroy
Hi all,

I'm trying to use a kubernetes secret as a command line argument in my job
and the text replacement doesn't seem to be happening. I've verified
passing the custom args via the command line on my local flink cluster but
can't seem to get the environment var replacement to work.

apiVersion: flink.apache.org/v1alpha1
kind: FlinkDeployment
metadata:
  namespace: default
  name: http-over-mqtt
spec:
  image: flink:1.14.4-scala_2.12-java11
  flinkVersion: v1_14
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
kubernetes.env.secretKeyRef:
"env:DJANGO_TOKEN,secret:switchdin-django-token,key:token"
#containerized.taskmanager.env.DJANGO_TOKEN: "$DJANGO_TOKEN"
  serviceAccount: flink
  jobManager:
replicas: 1
resource:
  memory: "1024m"
  cpu: 1
  taskManager:
resource:
  memory: "1024m"
  cpu: 1
  podTemplate:
spec:
  serviceAccount: flink
  containers:
- name: flink-main-container
  volumeMounts:
- mountPath: /flink-job
  name: flink-jobs
  env:
- name: DJANGO_TOKEN  # kubectl create secret generic
switchdin-django-token --from-literal=token='[TOKEN]'
  valueFrom:
secretKeyRef:
  name: switchdin-django-token
  key: token
  optional: false
  initContainers:
- name: grab-mqtt-over-http-jar
  image: docker-push.k8s.local/test/switchdin/platform_flink:job-41
  command: [ "/bin/sh", "-c",
 "cp /opt/switchdin/* /tmp/job/." ]  # Copies the
jar in the init container to the flink-jobs volume
  volumeMounts:
- name: flink-jobs
  mountPath: /tmp/job
  volumes:
- name: flink-jobs
  emptyDir: { }
  job:
jarURI: local:///flink-job/switchdin-topologies-1.0-SNAPSHOT.jar
args: ["--swit-django-token", "$DJANGO_TOKEN",
   "--swit-prod","false"]
entryClass: org.switchdin.HTTPOverMQTT
parallelism: 1
upgradeMode: stateless
state: running

In the logs I can see:

2022-04-28 08:43:02,329 WARN org.switchdin.HTTPOverMQTT [] - ARGS ARE {}
2022-04-28 08:43:02,329 WARN org.switchdin.HTTPOverMQTT [] -
--swit-django-token
2022-04-28 08:43:02,330 WARN org.switchdin.HTTPOverMQTT [] - $DJANGO_TOKEN
2022-04-28 08:43:02,330 WARN org.switchdin.HTTPOverMQTT [] - --swit-prod
2022-04-28 08:43:02,330 WARN org.switchdin.HTTPOverMQTT [] - false

Anyone know how I can do this? I'm considering mounting it in a volume, but
that seems like a lot of hassle for such a small thing.

Thanks in advance!

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


RE: AvroRowDeserializationSchema

2022-04-28 Thread lan tran
Don’t expect that answer =))However, I am very appreciate everything you did Thanks again for helping me out.Best,Quynh. Sent from Mail for Windows From: Dian FuSent: Thursday, April 28, 2022 2:59 PMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Yes, I think so~ On Thu, Apr 28, 2022 at 11:00 AM lan tran  wrote:Hi Dian,Sorry for missing your mail, so if I did as your suggestion and the Flink somehow crashed and we have to restart the service, does the Flink job know the offset where does it read from Kafka ? Sent from Mail for Windows From: Dian FuSent: Tuesday, April 26, 2022 7:54 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,The same code in my last reply showed how to set the UID for the source operator generated using Table API. I meant that you could firstly create a source using Table API, then convert it to a DataStream API and set uid for the source operator using the same code above, then perform operations with DataStream API.Regards,Dian On Mon, Apr 25, 2022 at 9:27 PM lan tran  wrote:Hi Dian, Thank again for fast response.As your suggestion above, we can apply to set the UID for only for the DataStream state (as you suggest to convert from table to data stream). However, at the first phase which is collecting the data from Kafka ( having Debezium format), the UID cannot be set since we are using Table API (auto generate the UID). Therefore, if there is some crashed or needed revert using SavePoint, we cannot use it in the first phase since we cannot set the UID for this => so how can we revert it ?. As a result of that, we want to use DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in the DataStream job to be able to use the Savepoint for the whole full flow.Best,Quynh Sent from Mail for Windows From: Dian FuSent: Monday, April 25, 2022 7:46 PMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,You could try the following code (also it may be a little hacky):```def set_uid_for_source(ds: DataStream, uid: str):transformation = ds._j_data_stream.getTransformation() source_transformation = transformationwhile not source_transformation.getInputs().isEmpty():source_transformation = source_transformation.getInputs().get(0) source_transformation.setUid(uid)```Besides, could you describe your use case a bit and also how you want to use DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for the sources with these formats, it will send UPDATE messages to downstream operators. RegardsDian On Mon, Apr 25, 2022 at 12:31 PM lan tran  wrote:Yeah, I already tried that way. However, if we did not use DataStream at first. We cannot implement the Savepoint since through the doc if we use TableAPI (SQL API), the uid is generated automatically which means we cannot revert if the system is crashed. Best,Quynh Sent from Mail for Windows From: Dian FuSent: Monday, April 25, 2022 11:04 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema are still not supported in Python DataStream API. Just take a further look at the Java implementation of DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema, the results type is RowData instead of Row and so it should be not that easy to be directly supported in Python DataStream API. However, it supports conversion between Table API & DataStream API[1]. Could you firstly create a Table which consumes data from kafka and then convert it to a DataStream API?Regards,Dian[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors On Mon, Apr 25, 2022 at 11:48 AM Dian Fu  wrote:Yes, we should support them. For now, if you want to use them, you could create ones in your own project. You could refer to AvroRowDeserializationSchema[1] as an example. It should not be complicated as it's simply a wrapper of the Java implementation.Regards,Dian[1] https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308 On Mon, Apr 25, 2022 at 11:27 AM lan tran  wrote:Thank Dian !! Very appreciate this.However, I have another questions related to this. In current version or any updating in future, does DataStream support DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the documentation and seem it is not supported yet.Best,QuynhSent from Mail for Windows From: Dian FuSent: Friday, April 22, 2022 9:36 PMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,I have added an example on how to use 

Unsubscribe

2022-04-28 Thread bhawana gupta
Unsubscribe

Regards,
Bhawana


Re: Unsubscribe

2022-04-28 Thread huweihua
To unsubscribe you need to send a mail to user-unsubscr...@flink.apache.org 
.

> 2022年4月27日 下午12:03,Amit Bhatia  写道:
> 
> Unsubscribe
> 
> Regards,
> Amit Bhatia



Re: Unsubscribe

2022-04-28 Thread huweihua
To unsubscribe you need to send a mail to user-unsubscr...@flink.apache.org 
.

> 2022年4月28日 下午6:45,bhawana gupta  写道:
> 
> Unsubscribe
> 
> Regards,
> Bhawana



Re: How to debug Metaspace exception?

2022-04-28 Thread Chesnay Schepler
I think what I meant was "either add it to /lib, or [if it is already in 
/lib but also bundled in the jar] add it to the parent-first patterns."


On 28/04/2022 15:56, Chesnay Schepler wrote:

Pretty sure, even though I seemingly documented it incorrectly :)

On 28/04/2022 15:49, John Smith wrote:

You sure?

 *

/JDBC/: JDBC drivers leak references outside the user code
classloader. To ensure that these classes are only loaded once
you should either add the driver jars to Flink’s |lib/| folder,
or add the driver classes to the list of parent-first loaded
class via |classloader.parent-first-patterns-additional|

.

It says either or


On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler  
wrote:


You're misinterpreting the docs.

The parent/child-first classloading controls where Flink looks
for a class /first/, specifically whether we first load from /lib
or the user-jar.
It does not allow you to load something from the user-jar in the
parent classloader. That's just not how it works.

It must be in /lib.

On 27/04/2022 04:59, John Smith wrote:

Hi Chesnay as per the docs...

https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/

You can either put the jars in task manager lib folder or use
|classloader.parent-first-patterns-additional|



I prefer the latter like this: the dependency stays with the
user-jar and not on the task manager.

On Tue, Apr 26, 2022 at 9:52 PM John Smith
 wrote:

Ok so I should put the Apache ignite and my Microsoft
drivers in the lib folders of my task managers?

And then in my job jar only include them as compile time
dependencies?


On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler
 wrote:

JDBC drivers are well-known for leaking classloaders
unfortunately.

You have correctly identified your alternatives.

You must put the jdbc driver into /lib instead. Setting
only the parent-first pattern shouldn't affect anything.
That is only relevant if something is in both in /lib
and the user-jar, telling Flink to prioritize what is in
lib.



On 26/04/2022 15:35, John Smith wrote:

So I put classloader.parent-first-patterns.additional:
"org.apache.ignite." in the task config and so far I
don't think I'm getting "java.lang.OutOfMemoryError:
Metaspace" any more.

Or it's too early to tell.

Though now, the task managers are shutting down due to
some other failures.

So maybe because tasks were failing and reloading often
the task manager was running out of Metspace. But now
maybe it's just cleanly shutting down.

On Wed, Apr 20, 2022 at 11:35 AM John Smith
 wrote:

Or I can put in the config to treat
org.apache.ignite. classes as first class?

On Tue, Apr 19, 2022 at 10:18 PM John Smith
 wrote:

Ok, so I loaded the dump into Eclipse Mat and
followed:

https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

- On the Histogram, I got over 30 entries for:
ChildFirstClassLoader
- Then I clicked on one of them "Merge Shortest
Path..." and picked "Exclude all
phantom/weak/soft references"
- Which then gave me: SqlDriverManager > Apache
Ignite JdbcThin Driver

So i'm guessing anything JDBC based. I should
copy into the task manager libs folder and my
jobs make the dependencies as compile only?

On Tue, Apr 19, 2022 at 12:18 PM Yaroslav
Tkachenko  wrote:

Also

https://shopify.engineering/optimizing-apache-flink-applications-tips
might be helpful (has a section on
profiling, as well as classloading).

On Tue, Apr 19, 2022 at 4:35 AM Chesnay
Schepler  wrote:

We have a very rough "guide" in the
wiki (it's just the specific steps I
took to debug another leak):

https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

On 19/04/2022 12:01, huweihua 

Re: How to debug Metaspace exception?

2022-04-28 Thread John Smith
You sure?

   -

   *JDBC*: JDBC drivers leak references outside the user code classloader.
   To ensure that these classes are only loaded once you should either add the
   driver jars to Flink’s lib/ folder, or add the driver classes to the
   list of parent-first loaded class via
   classloader.parent-first-patterns-additional
   

   .

   It says either or


On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler  wrote:

> You're misinterpreting the docs.
>
> The parent/child-first classloading controls where Flink looks for a class
> *first*, specifically whether we first load from /lib or the user-jar.
> It does not allow you to load something from the user-jar in the parent
> classloader. That's just not how it works.
>
> It must be in /lib.
>
> On 27/04/2022 04:59, John Smith wrote:
>
> Hi Chesnay as per the docs...
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/
>
> You can either put the jars in task manager lib folder or use
> classloader.parent-first-patterns-additional
> 
>
> I prefer the latter like this: the dependency stays with the user-jar and
> not on the task manager.
>
> On Tue, Apr 26, 2022 at 9:52 PM John Smith  wrote:
>
>> Ok so I should put the Apache ignite and my Microsoft drivers in the lib
>> folders of my task managers?
>>
>> And then in my job jar only include them as compile time dependencies?
>>
>>
>> On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler 
>> wrote:
>>
>>> JDBC drivers are well-known for leaking classloaders unfortunately.
>>>
>>> You have correctly identified your alternatives.
>>>
>>> You must put the jdbc driver into /lib instead. Setting only the
>>> parent-first pattern shouldn't affect anything.
>>> That is only relevant if something is in both in /lib and the user-jar,
>>> telling Flink to prioritize what is in lib.
>>>
>>>
>>>
>>> On 26/04/2022 15:35, John Smith wrote:
>>>
>>> So I put classloader.parent-first-patterns.additional:
>>> "org.apache.ignite." in the task config and so far I don't think I'm
>>> getting "java.lang.OutOfMemoryError: Metaspace" any more.
>>>
>>> Or it's too early to tell.
>>>
>>> Though now, the task managers are shutting down due to some
>>> other failures.
>>>
>>> So maybe because tasks were failing and reloading often the task manager
>>> was running out of Metspace. But now maybe it's just cleanly shutting down.
>>>
>>> On Wed, Apr 20, 2022 at 11:35 AM John Smith 
>>> wrote:
>>>
 Or I can put in the config to treat org.apache.ignite. classes as first
 class?

 On Tue, Apr 19, 2022 at 10:18 PM John Smith 
 wrote:

> Ok, so I loaded the dump into Eclipse Mat and followed:
> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>
> - On the Histogram, I got over 30 entries for: ChildFirstClassLoader
> - Then I clicked on one of them "Merge Shortest Path..." and picked
> "Exclude all phantom/weak/soft references"
> - Which then gave me: SqlDriverManager > Apache Ignite JdbcThin Driver
>
> So i'm guessing anything JDBC based. I should copy into the task
> manager libs folder and my jobs make the dependencies as compile only?
>
> On Tue, Apr 19, 2022 at 12:18 PM Yaroslav Tkachenko <
> yaros...@goldsky.io> wrote:
>
>> Also
>> https://shopify.engineering/optimizing-apache-flink-applications-tips
>> might be helpful (has a section on profiling, as well as classloading).
>>
>> On Tue, Apr 19, 2022 at 4:35 AM Chesnay Schepler 
>> wrote:
>>
>>> We have a very rough "guide" in the wiki (it's just the specific
>>> steps I took to debug another leak):
>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>>>
>>> On 19/04/2022 12:01, huweihua wrote:
>>>
>>> Hi, John
>>>
>>> Sorry for the late reply. You can use MAT[1] to analyze the dump
>>> file. Check whether have too many loaded classes.
>>>
>>> [1] https://www.eclipse.org/mat/
>>>
>>> 2022年4月18日 下午9:55,John Smith  写道:
>>>
>>> Hi, can anyone help with this? I never looked at a dump file before.
>>>
>>> On Thu, Apr 14, 2022 at 11:59 AM John Smith 
>>> wrote:
>>>
 Hi, so I have a dump file. What do I look for?

 On Thu, Mar 31, 2022 at 3:28 PM John Smith 
 wrote:

> Ok so if there's a leak, if I manually stop the job and restart it
> from the UI multiple times, I won't see the issue because because the
> classes are unloaded correctly?
>
>
> On Thu, Mar 31, 2022 at 9:20 AM huweihua 
> wrote:
>
>>
>> The difference is that manually canceling 

Re: How to debug Metaspace exception?

2022-04-28 Thread Chesnay Schepler

Pretty sure, even though I seemingly documented it incorrectly :)

On 28/04/2022 15:49, John Smith wrote:

You sure?

 *

/JDBC/: JDBC drivers leak references outside the user code
classloader. To ensure that these classes are only loaded once you
should either add the driver jars to Flink’s |lib/| folder, or add
the driver classes to the list of parent-first loaded class via
|classloader.parent-first-patterns-additional|

.

It says either or


On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler  
wrote:


You're misinterpreting the docs.

The parent/child-first classloading controls where Flink looks for
a class /first/, specifically whether we first load from /lib or
the user-jar.
It does not allow you to load something from the user-jar in the
parent classloader. That's just not how it works.

It must be in /lib.

On 27/04/2022 04:59, John Smith wrote:

Hi Chesnay as per the docs...

https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/

You can either put the jars in task manager lib folder or use
|classloader.parent-first-patterns-additional|



I prefer the latter like this: the dependency stays with the
user-jar and not on the task manager.

On Tue, Apr 26, 2022 at 9:52 PM John Smith
 wrote:

Ok so I should put the Apache ignite and my Microsoft drivers
in the lib folders of my task managers?

And then in my job jar only include them as compile time
dependencies?


On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler
 wrote:

JDBC drivers are well-known for leaking classloaders
unfortunately.

You have correctly identified your alternatives.

You must put the jdbc driver into /lib instead. Setting
only the parent-first pattern shouldn't affect anything.
That is only relevant if something is in both in /lib and
the user-jar, telling Flink to prioritize what is in lib.



On 26/04/2022 15:35, John Smith wrote:

So I put classloader.parent-first-patterns.additional:
"org.apache.ignite." in the task config and so far I
don't think I'm getting "java.lang.OutOfMemoryError:
Metaspace" any more.

Or it's too early to tell.

Though now, the task managers are shutting down due to
some other failures.

So maybe because tasks were failing and reloading often
the task manager was running out of Metspace. But now
maybe it's just cleanly shutting down.

On Wed, Apr 20, 2022 at 11:35 AM John Smith
 wrote:

Or I can put in the config to treat
org.apache.ignite. classes as first class?

On Tue, Apr 19, 2022 at 10:18 PM John Smith
 wrote:

Ok, so I loaded the dump into Eclipse Mat and
followed:

https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

- On the Histogram, I got over 30 entries for:
ChildFirstClassLoader
- Then I clicked on one of them "Merge Shortest
Path..." and picked "Exclude all
phantom/weak/soft references"
- Which then gave me: SqlDriverManager > Apache
Ignite JdbcThin Driver

So i'm guessing anything JDBC based. I should
copy into the task manager libs folder and my
jobs make the dependencies as compile only?

On Tue, Apr 19, 2022 at 12:18 PM Yaroslav
Tkachenko  wrote:

Also

https://shopify.engineering/optimizing-apache-flink-applications-tips
might be helpful (has a section on
profiling, as well as classloading).

On Tue, Apr 19, 2022 at 4:35 AM Chesnay
Schepler  wrote:

We have a very rough "guide" in the wiki
(it's just the specific steps I took to
debug another leak):

https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

On 19/04/2022 12:01, huweihua wrote:

Hi, John

Sorry for the late reply. You can use
MAT[1] to analyze the dump file. Check
   

Re: Using the official flink operator and kubernetes secrets

2022-04-28 Thread Őrhidi Mátyás
Also,

just declaring it in the flink configs should be sufficient, no need to
define it in the pod templates:

flinkConfiguration:
kubernetes.env.secretKeyRef:
"env:DJANGO_TOKEN,secret:switchdin-django-token,key:token"


Best,
Matyas

On Thu, Apr 28, 2022 at 1:17 PM Őrhidi Mátyás 
wrote:

> Hi Francis,
>
> I suggest accessing the environment variables directly, no need to pass
> them as command arguments I guess.
>
> Best,
> Matyas
>
> On Thu, Apr 28, 2022 at 11:31 AM Francis Conroy <
> francis.con...@switchdin.com> wrote:
>
>> Hi all,
>>
>> I'm trying to use a kubernetes secret as a command line argument in my
>> job and the text replacement doesn't seem to be happening. I've verified
>> passing the custom args via the command line on my local flink cluster but
>> can't seem to get the environment var replacement to work.
>>
>> apiVersion: flink.apache.org/v1alpha1
>> kind: FlinkDeployment
>> metadata:
>>   namespace: default
>>   name: http-over-mqtt
>> spec:
>>   image: flink:1.14.4-scala_2.12-java11
>>   flinkVersion: v1_14
>>   flinkConfiguration:
>> taskmanager.numberOfTaskSlots: "2"
>> kubernetes.env.secretKeyRef: 
>> "env:DJANGO_TOKEN,secret:switchdin-django-token,key:token"
>> #containerized.taskmanager.env.DJANGO_TOKEN: "$DJANGO_TOKEN"
>>   serviceAccount: flink
>>   jobManager:
>> replicas: 1
>> resource:
>>   memory: "1024m"
>>   cpu: 1
>>   taskManager:
>> resource:
>>   memory: "1024m"
>>   cpu: 1
>>   podTemplate:
>> spec:
>>   serviceAccount: flink
>>   containers:
>> - name: flink-main-container
>>   volumeMounts:
>> - mountPath: /flink-job
>>   name: flink-jobs
>>   env:
>> - name: DJANGO_TOKEN  # kubectl create secret generic 
>> switchdin-django-token --from-literal=token='[TOKEN]'
>>   valueFrom:
>> secretKeyRef:
>>   name: switchdin-django-token
>>   key: token
>>   optional: false
>>   initContainers:
>> - name: grab-mqtt-over-http-jar
>>   image: docker-push.k8s.local/test/switchdin/platform_flink:job-41
>>   command: [ "/bin/sh", "-c",
>>  "cp /opt/switchdin/* /tmp/job/." ]  # Copies the jar in 
>> the init container to the flink-jobs volume
>>   volumeMounts:
>> - name: flink-jobs
>>   mountPath: /tmp/job
>>   volumes:
>> - name: flink-jobs
>>   emptyDir: { }
>>   job:
>> jarURI: local:///flink-job/switchdin-topologies-1.0-SNAPSHOT.jar
>> args: ["--swit-django-token", "$DJANGO_TOKEN",
>>"--swit-prod","false"]
>> entryClass: org.switchdin.HTTPOverMQTT
>> parallelism: 1
>> upgradeMode: stateless
>> state: running
>>
>> In the logs I can see:
>>
>> 2022-04-28 08:43:02,329 WARN org.switchdin.HTTPOverMQTT [] - ARGS ARE {}
>> 2022-04-28 08:43:02,329 WARN org.switchdin.HTTPOverMQTT [] -
>> --swit-django-token
>> 2022-04-28 08:43:02,330 WARN org.switchdin.HTTPOverMQTT [] -
>> $DJANGO_TOKEN
>> 2022-04-28 08:43:02,330 WARN org.switchdin.HTTPOverMQTT [] - --swit-prod
>> 2022-04-28 08:43:02,330 WARN org.switchdin.HTTPOverMQTT [] - false
>>
>> Anyone know how I can do this? I'm considering mounting it in a volume,
>> but that seems like a lot of hassle for such a small thing.
>>
>> Thanks in advance!
>>
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>


Re: Using the official flink operator and kubernetes secrets

2022-04-28 Thread Őrhidi Mátyás
Hi Francis,

I suggest accessing the environment variables directly, no need to pass
them as command arguments I guess.

Best,
Matyas

On Thu, Apr 28, 2022 at 11:31 AM Francis Conroy <
francis.con...@switchdin.com> wrote:

> Hi all,
>
> I'm trying to use a kubernetes secret as a command line argument in my job
> and the text replacement doesn't seem to be happening. I've verified
> passing the custom args via the command line on my local flink cluster but
> can't seem to get the environment var replacement to work.
>
> apiVersion: flink.apache.org/v1alpha1
> kind: FlinkDeployment
> metadata:
>   namespace: default
>   name: http-over-mqtt
> spec:
>   image: flink:1.14.4-scala_2.12-java11
>   flinkVersion: v1_14
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "2"
> kubernetes.env.secretKeyRef: 
> "env:DJANGO_TOKEN,secret:switchdin-django-token,key:token"
> #containerized.taskmanager.env.DJANGO_TOKEN: "$DJANGO_TOKEN"
>   serviceAccount: flink
>   jobManager:
> replicas: 1
> resource:
>   memory: "1024m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "1024m"
>   cpu: 1
>   podTemplate:
> spec:
>   serviceAccount: flink
>   containers:
> - name: flink-main-container
>   volumeMounts:
> - mountPath: /flink-job
>   name: flink-jobs
>   env:
> - name: DJANGO_TOKEN  # kubectl create secret generic 
> switchdin-django-token --from-literal=token='[TOKEN]'
>   valueFrom:
> secretKeyRef:
>   name: switchdin-django-token
>   key: token
>   optional: false
>   initContainers:
> - name: grab-mqtt-over-http-jar
>   image: docker-push.k8s.local/test/switchdin/platform_flink:job-41
>   command: [ "/bin/sh", "-c",
>  "cp /opt/switchdin/* /tmp/job/." ]  # Copies the jar in 
> the init container to the flink-jobs volume
>   volumeMounts:
> - name: flink-jobs
>   mountPath: /tmp/job
>   volumes:
> - name: flink-jobs
>   emptyDir: { }
>   job:
> jarURI: local:///flink-job/switchdin-topologies-1.0-SNAPSHOT.jar
> args: ["--swit-django-token", "$DJANGO_TOKEN",
>"--swit-prod","false"]
> entryClass: org.switchdin.HTTPOverMQTT
> parallelism: 1
> upgradeMode: stateless
> state: running
>
> In the logs I can see:
>
> 2022-04-28 08:43:02,329 WARN org.switchdin.HTTPOverMQTT [] - ARGS ARE {}
> 2022-04-28 08:43:02,329 WARN org.switchdin.HTTPOverMQTT [] -
> --swit-django-token
> 2022-04-28 08:43:02,330 WARN org.switchdin.HTTPOverMQTT [] - $DJANGO_TOKEN
> 2022-04-28 08:43:02,330 WARN org.switchdin.HTTPOverMQTT [] - --swit-prod
> 2022-04-28 08:43:02,330 WARN org.switchdin.HTTPOverMQTT [] - false
>
> Anyone know how I can do this? I'm considering mounting it in a volume,
> but that seems like a lot of hassle for such a small thing.
>
> Thanks in advance!
>
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>


Re:Temporal join fails with "unexpected correlate variable $cor0 in the plan"

2022-04-28 Thread Xuyang
Hi, can you provide the version of the Flink with this exception?
I test your SQL in Flink/master and it works. My test SQL is the following.

create table table_1 (
`f0` STRING NOT NULL,
`f1` DOUBLE NOT NULL,
`rowtime` TIMESTAMP(3),
WATERMARK FOR `rowtime` AS rowtime - INTERVAL '10' SECOND
)
with(
...
)

create table table_2 (
`f0` STRING NOT NULL,
`f1` DOUBLE NOT NULL,
`rowtime` TIMESTAMP(3),
WATERMARK FOR `rowtime` AS rowtime - INTERVAL '10' SECOND,
CONSTRAINT `PK_f0` PRIMARY KEY (`f0`) NOT ENFORCED
)
with(
...
)

SELECT
table_1.f0,
table_1.f1 AS table_1_value,
table_2.f1 AS table_2_value
FROM table_1
JOIN table_2 FOR SYSTEM_TIME AS OF table_1.rowtime
ON table_1.f0 = table_2.f0



At 2022-04-28 14:01:17, "Matthew Brown"  wrote:

Hi all,


I'm trying to join the following two tables using a temporal join:


table_1
(
  `f0` STRING NOT NULL,
  `f1` DOUBLE NOT NULL,
  `rowtime` TIMESTAMP(3) METADATA,
  WATERMARK FOR `rowtime`: TIMESTAMP(3) AS rowtime - INTERVAL '10' SECOND
)


table_2:
(
  `f0` STRING NOT NULL,
  `f1` DOUBLE NOT NULL,
  `rowtime` TIMESTAMP(3) METADATA,
  WATERMARK FOR `rowtime`: TIMESTAMP(3) AS rowtime - INTERVAL '10' SECOND,
  CONSTRAINT `PK_f0` PRIMARY KEY (`f0`) NOT ENFORCED
)


using the following query:


--
SELECT
  table_1.f0,
  table_1.f1 AS table_1_value,
  table_2.f1 AS table_2_value,
FROM table_1
JOIN table_2 FOR SYSTEM_TIME AS OF table_1.rowtime ON table_1.f0 = table_2.f0
--


and it's raising the following exception


---
Exception in thread "main" org.apache.flink.table.api.TableException: 
unexpected correlate variable $cor0 in the plan
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
---


Has anybody come across this before? Any debugging tips?


Cheers,
Matt.


--
 

--
AU: +61 459 493 730
UK: +44 7927 618921
@mnbbrown

Re: How to debug Metaspace exception?

2022-04-28 Thread John Smith
Ok so to summarize...

- Build my job jar and have the JDBC driver as a compile only
dependency and copy the JDBC driver to flink lib folder.

Or

- Build my job jar and include JDBC driver in the shadow, plus copy the
JDBC driver in the flink lib folder, plus  make an entry in config for
classloader.parent-first-patterns-additional



On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler 
wrote:

> I think what I meant was "either add it to /lib, or [if it is already in
> /lib but also bundled in the jar] add it to the parent-first patterns."
>
> On 28/04/2022 15:56, Chesnay Schepler wrote:
>
> Pretty sure, even though I seemingly documented it incorrectly :)
>
> On 28/04/2022 15:49, John Smith wrote:
>
> You sure?
>
>-
>
>*JDBC*: JDBC drivers leak references outside the user code
>classloader. To ensure that these classes are only loaded once you should
>either add the driver jars to Flink’s lib/ folder, or add the driver
>classes to the list of parent-first loaded class via
>classloader.parent-first-patterns-additional
>
> 
>.
>
>It says either or
>
>
> On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler 
> wrote:
>
>> You're misinterpreting the docs.
>>
>> The parent/child-first classloading controls where Flink looks for a
>> class *first*, specifically whether we first load from /lib or the
>> user-jar.
>> It does not allow you to load something from the user-jar in the parent
>> classloader. That's just not how it works.
>>
>> It must be in /lib.
>>
>> On 27/04/2022 04:59, John Smith wrote:
>>
>> Hi Chesnay as per the docs...
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/
>>
>> You can either put the jars in task manager lib folder or use
>> classloader.parent-first-patterns-additional
>> 
>>
>> I prefer the latter like this: the dependency stays with the user-jar and
>> not on the task manager.
>>
>> On Tue, Apr 26, 2022 at 9:52 PM John Smith 
>> wrote:
>>
>>> Ok so I should put the Apache ignite and my Microsoft drivers in the lib
>>> folders of my task managers?
>>>
>>> And then in my job jar only include them as compile time dependencies?
>>>
>>>
>>> On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler 
>>> wrote:
>>>
 JDBC drivers are well-known for leaking classloaders unfortunately.

 You have correctly identified your alternatives.

 You must put the jdbc driver into /lib instead. Setting only the
 parent-first pattern shouldn't affect anything.
 That is only relevant if something is in both in /lib and the user-jar,
 telling Flink to prioritize what is in lib.



 On 26/04/2022 15:35, John Smith wrote:

 So I put classloader.parent-first-patterns.additional:
 "org.apache.ignite." in the task config and so far I don't think I'm
 getting "java.lang.OutOfMemoryError: Metaspace" any more.

 Or it's too early to tell.

 Though now, the task managers are shutting down due to some
 other failures.

 So maybe because tasks were failing and reloading often the task
 manager was running out of Metspace. But now maybe it's just
 cleanly shutting down.

 On Wed, Apr 20, 2022 at 11:35 AM John Smith 
 wrote:

> Or I can put in the config to treat org.apache.ignite. classes as
> first class?
>
> On Tue, Apr 19, 2022 at 10:18 PM John Smith 
> wrote:
>
>> Ok, so I loaded the dump into Eclipse Mat and followed:
>> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>>
>> - On the Histogram, I got over 30 entries for: ChildFirstClassLoader
>> - Then I clicked on one of them "Merge Shortest Path..." and picked
>> "Exclude all phantom/weak/soft references"
>> - Which then gave me: SqlDriverManager > Apache Ignite JdbcThin
>> Driver
>>
>> So i'm guessing anything JDBC based. I should copy into the task
>> manager libs folder and my jobs make the dependencies as compile only?
>>
>> On Tue, Apr 19, 2022 at 12:18 PM Yaroslav Tkachenko <
>> yaros...@goldsky.io> wrote:
>>
>>> Also
>>> https://shopify.engineering/optimizing-apache-flink-applications-tips
>>> might be helpful (has a section on profiling, as well as classloading).
>>>
>>> On Tue, Apr 19, 2022 at 4:35 AM Chesnay Schepler 
>>> wrote:
>>>
 We have a very rough "guide" in the wiki (it's just the specific
 steps I took to debug another leak):

 https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

 

Re: How to debug Metaspace exception?

2022-04-28 Thread John Smith
I assume you will take action on your side to track and fix the doc? :)

On Thu, Apr 28, 2022 at 11:12 AM John Smith  wrote:

> Ok so to summarize...
>
> - Build my job jar and have the JDBC driver as a compile only
> dependency and copy the JDBC driver to flink lib folder.
>
> Or
>
> - Build my job jar and include JDBC driver in the shadow, plus copy the
> JDBC driver in the flink lib folder, plus  make an entry in config for
> classloader.parent-first-patterns-additional
> 
>
>
> On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler 
> wrote:
>
>> I think what I meant was "either add it to /lib, or [if it is already in
>> /lib but also bundled in the jar] add it to the parent-first patterns."
>>
>> On 28/04/2022 15:56, Chesnay Schepler wrote:
>>
>> Pretty sure, even though I seemingly documented it incorrectly :)
>>
>> On 28/04/2022 15:49, John Smith wrote:
>>
>> You sure?
>>
>>-
>>
>>*JDBC*: JDBC drivers leak references outside the user code
>>classloader. To ensure that these classes are only loaded once you should
>>either add the driver jars to Flink’s lib/ folder, or add the driver
>>classes to the list of parent-first loaded class via
>>classloader.parent-first-patterns-additional
>>
>> 
>>.
>>
>>It says either or
>>
>>
>> On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler 
>> wrote:
>>
>>> You're misinterpreting the docs.
>>>
>>> The parent/child-first classloading controls where Flink looks for a
>>> class *first*, specifically whether we first load from /lib or the
>>> user-jar.
>>> It does not allow you to load something from the user-jar in the parent
>>> classloader. That's just not how it works.
>>>
>>> It must be in /lib.
>>>
>>> On 27/04/2022 04:59, John Smith wrote:
>>>
>>> Hi Chesnay as per the docs...
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/
>>>
>>> You can either put the jars in task manager lib folder or use
>>> classloader.parent-first-patterns-additional
>>> 
>>>
>>> I prefer the latter like this: the dependency stays with the user-jar
>>> and not on the task manager.
>>>
>>> On Tue, Apr 26, 2022 at 9:52 PM John Smith 
>>> wrote:
>>>
 Ok so I should put the Apache ignite and my Microsoft drivers in the
 lib folders of my task managers?

 And then in my job jar only include them as compile time dependencies?


 On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler 
 wrote:

> JDBC drivers are well-known for leaking classloaders unfortunately.
>
> You have correctly identified your alternatives.
>
> You must put the jdbc driver into /lib instead. Setting only the
> parent-first pattern shouldn't affect anything.
> That is only relevant if something is in both in /lib and the
> user-jar, telling Flink to prioritize what is in lib.
>
>
>
> On 26/04/2022 15:35, John Smith wrote:
>
> So I put classloader.parent-first-patterns.additional:
> "org.apache.ignite." in the task config and so far I don't think I'm
> getting "java.lang.OutOfMemoryError: Metaspace" any more.
>
> Or it's too early to tell.
>
> Though now, the task managers are shutting down due to some
> other failures.
>
> So maybe because tasks were failing and reloading often the task
> manager was running out of Metspace. But now maybe it's just
> cleanly shutting down.
>
> On Wed, Apr 20, 2022 at 11:35 AM John Smith 
> wrote:
>
>> Or I can put in the config to treat org.apache.ignite. classes as
>> first class?
>>
>> On Tue, Apr 19, 2022 at 10:18 PM John Smith 
>> wrote:
>>
>>> Ok, so I loaded the dump into Eclipse Mat and followed:
>>> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>>>
>>> - On the Histogram, I got over 30 entries for: ChildFirstClassLoader
>>> - Then I clicked on one of them "Merge Shortest Path..." and picked
>>> "Exclude all phantom/weak/soft references"
>>> - Which then gave me: SqlDriverManager > Apache Ignite JdbcThin
>>> Driver
>>>
>>> So i'm guessing anything JDBC based. I should copy into the task
>>> manager libs folder and my jobs make the dependencies as compile only?
>>>
>>> On Tue, Apr 19, 2022 at 12:18 PM Yaroslav Tkachenko <
>>> yaros...@goldsky.io> wrote:
>>>
 Also
 https://shopify.engineering/optimizing-apache-flink-applications-tips
 might be helpful (has a section on profiling, as well as classloading).

 On Tue, Apr 19, 2022 at 4:35 AM 

Re:How to dynamically modify the schema information of a table

2022-04-28 Thread Xuyang
I tried this and found that the comment information will be lost forever when 
using "fromDataStream". I think you can create a issue to report this bug.



At 2022-04-22 10:44:47, "草莓"  wrote:

The following is the Java code


@Test
public void test(){
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream dataStream = env.fromElements("Alice", "Bob", "John");
Schema.Builder builder = Schema.newBuilder();
builder.column("f0",DataTypes.of(String.class)).withComment("this is a 
comment");
Table table = tableEnv.fromDataStream(dataStream, 
builder.build()).as("user_name");
table.getResolvedSchema();
table.printSchema();
}


Its output is:
(
  `user_name` STRING
)


My question is, if the comment is lost, what should I do to display the comment 
information?


What I need is that the result is
(
  `user_name` STRING COMMENT 'this is a comment'
)



Broadcast State + Stateful Operator + Async IO

2022-04-28 Thread Vishal Surana
Hello,
My application has a stateful operator which leverages RocksDB to store a
large amount of state. It, along with other operators receive configuration
as a broadcast stream (KeyedBroadcastProcessFunction). The operator depends
upon another input stream that triggers some communication with external
services whose results are then combined to yield the state that gets
stored in RocksDB.

In order to make the application more efficient, I am going to switch to
asynchronous IO but as the result is ultimately going to be a (Scala)
Future, I will have to block once to get the result. I was hoping to
leverage the Async IO operator but that apparently doesn't support RocksDB
based state storage. Am I correct in saying
that KeyedBroadcastProcessFunction is the only option I have? If so, then I
want to understand how registering a future's callbacks (via onComplete)
works with a synchronous operator such as KeyedBroadcastProcessFunction.
Will the thread executing the function simply relinquish control to some
other subtask while the results of the external services are being awaited?
Will the callback eventually be triggered automatically or will I have to
explicitly block on the result future like so: Await.result(f, timeout).

-- 
Regards,
Vishal


RE: Flink-Iceberg Table Sink failing with org.apache.hadoop.fs.s3a.S3AStorageStatistics Cast exception

2022-04-28 Thread Terry Heathcote
I solved this problem by removing the Hadoop classpath from Flink cluster
deployment.

On 2022/04/28 09:04:50 Terry Heathcote wrote:
> Hi
>
> We are running a Flink job that delivers Kafka data to an Iceberg table.
> The job uses the *org.apache.iceberg.flink.CatalogLoader* and
> *org.apache.iceberg.flink.TableLoader
> *interfaces in combination with *org.apache.iceberg.flink.sink.FlinkSink
*where
> the catalog type is Hive.
>
> We have had success in running multiple jobs to respective tables that are
> stored in the same s3 bucket but recently, when attempting to write to
> tables that are stored in separate s3 buckets, we have run into issues.
The
> first jobs submitted to the cluster run fine, however, when submitting
more
> jobs for sink tables with the same name (in separate database schemas and
> s3 buckets), we run into a class cast exception as well as
> an org.apache.hadoop.metrics2.MetricsException error stating: Metrics
> source S3AMetrics{bucket-name} already exists!
>
> Attached are both the error logs as well as the main code snippets and pom
> files for better context. Any help would be greatly appreciated.
>
> The Flink cluster version is 12.7 and we have enabled the
> flink-s3-fs-hadoop jar
> plugin so as to be able to write to s3 files.
> ᐧ
>
ᐧ


Re: Flink team staffing

2022-04-28 Thread Nathan Fisher
Thousands of events per second depending on their size and complexity isn’t
a big overhead. If you’re in AWS you might want to look at Kinesis Data
Analytics.

On Wed, Apr 27, 2022 at 19:32, Wei Liu  wrote:

> Hi everyone,
>
> I've been thinking about running some production-critical applications
> using Flink. The scale is small, to begin with (thousands of events per
> second), but we do want to keep the uptime as high as possible.
>
> What does a common team around this type of system look like? We have a
> couple of Big Data and a couple of MLE engineers. Are we in good shape?
> Would love to hear your thoughts.
>
> -Wei
>
> --
> 206-430-3317
>


Checkpoint Timeout Troubleshooting

2022-04-28 Thread Sam Ch
Hello,

I am running into checkpoint timeouts and am looking for guidance on
troubleshooting. What should I be looking at? What configuration parameters
would affect this? I am afraid I am a Flink newbie so I am still picking up
the concepts. Additional notes are below, anything else I can provide?
Thanks.


The checkpoint size is small (less than 100kB)
Multiple flink apps are running on a cluster, only one is running into
checkpoint timeouts
Timeout is set to 10 mins
Tried aligned and unaligned checkpoints
Tried clearing checkpoints to start fresh
Plenty of disk space
Dataflow: kafka source -> flink app -> kafka sink


Re: Broadcast State + Stateful Operator + Async IO

2022-04-28 Thread Guowei Ma
Hi Vishal

I want to understand your needs first. Your requirements are: After a
stateful operator receives a notification, it needs to traverse all the
data stored in the operator state, communicate with an external system
during the traversal process (maybe similar to join?). In order to improve
the efficiency of  this behavior, you want to take an asynchronous
approach. That is, if you modify the state of different keys, do not block
each other due to external communication.
If I understand correctly, according to the existing function of
KeyedBroadcastProcessFunction, it is really impossible.
As for whether there are other solutions, it may depend on specific
scenarios, such as what kind of external system. So could you describe in
detail what scenario has this requirement, and what are the external
systems it depends on?

Best,
Guowei


On Fri, Apr 29, 2022 at 12:42 AM Vishal Surana  wrote:

> Hello,
> My application has a stateful operator which leverages RocksDB to store a
> large amount of state. It, along with other operators receive configuration
> as a broadcast stream (KeyedBroadcastProcessFunction). The operator depends
> upon another input stream that triggers some communication with external
> services whose results are then combined to yield the state that gets
> stored in RocksDB.
>
> In order to make the application more efficient, I am going to switch to
> asynchronous IO but as the result is ultimately going to be a (Scala)
> Future, I will have to block once to get the result. I was hoping to
> leverage the Async IO operator but that apparently doesn't support RocksDB
> based state storage. Am I correct in saying
> that KeyedBroadcastProcessFunction is the only option I have? If so, then I
> want to understand how registering a future's callbacks (via onComplete)
> works with a synchronous operator such as KeyedBroadcastProcessFunction.
> Will the thread executing the function simply relinquish control to some
> other subtask while the results of the external services are being awaited?
> Will the callback eventually be triggered automatically or will I have to
> explicitly block on the result future like so: Await.result(f, timeout).
>
> --
> Regards,
> Vishal
>


Re: AvroRowDeserializationSchema

2022-04-28 Thread Dian Fu
Oh, I just missed your last question, sorry for that. The offset is stored
in the checkpoint and it will recover the offset from the checkpoint when
the job failover.

Things which you may need to pay attention to:
1) Enable the checkpoint and configure it if necessary [1]
2) Specify the start up mode via `scan.startup.mode` for Kafka connector
which works when the job start from scratch when there is no offset
available for use
3) It will restore from the latest checkpoint when the job failovers.
However, when you manually suspend/start a job, then you need to specify
the checkpoint/savepoint manually. see [2][3][4] for more details.

Things done in Flink (so you don't need to care):
1) The offset checkpoint and restoring

Regards,
Dian

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/#creating-a-savepoint
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint
[4]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/#starting-a-job-from-a-savepoint



On Thu, Apr 28, 2022 at 5:45 PM lan tran  wrote:

> Don’t expect that answer =))
> However, I am very appreciate everything you did
> Thanks again for helping me out.
>
> Best,
> Quynh.
>
>
>
> Sent from Mail  for
> Windows
>
>
>
> *From: *Dian Fu 
> *Sent: *Thursday, April 28, 2022 2:59 PM
> *To: *lan tran 
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Yes, I think so~
>
>
>
> On Thu, Apr 28, 2022 at 11:00 AM lan tran 
> wrote:
>
> Hi Dian,
>
> Sorry for missing your mail, so if I did as your suggestion and the Flink
> somehow crashed and we have to restart the service, does the Flink job know
> the offset where does it read from Kafka ?
>
>
>
> Sent from Mail  for
> Windows
>
>
>
> *From: *Dian Fu 
> *Sent: *Tuesday, April 26, 2022 7:54 AM
> *To: *lan tran 
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> The same code in my last reply showed how to set the UID for the source
> operator generated using Table API. I meant that you could firstly create a
> source using Table API, then convert it to a DataStream API and set uid for
> the source operator using the same code above, then perform operations with
> DataStream API.
>
> Regards,
> Dian
>
>
>
> On Mon, Apr 25, 2022 at 9:27 PM lan tran  wrote:
>
> Hi Dian,
>
> Thank again for fast response.
>
> As your suggestion above, we can apply to set the UID for only for the
> DataStream state (as you suggest to convert from table to data stream).
>
> However, at the first phase which is collecting the data from Kafka (
> having Debezium format), the UID cannot be set since we are using Table API
> (auto generate the UID).
>
> Therefore, if there is some crashed or needed revert using SavePoint, we
> cannot use it in the first phase since we cannot set the UID for this => so
> how can we revert it ?.
>
> As a result of that, we want to use DebeziumAvroRowDeserializationSchema
> and DebeziumJsonRowDeserializationSchema in the DataStream job to be able
> to use the Savepoint for the whole full flow.
>
> Best,
> Quynh
>
>
>
> Sent from Mail  for
> Windows
>
>
>
> *From: *Dian Fu 
> *Sent: *Monday, April 25, 2022 7:46 PM
> *To: *lan tran 
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> You could try the following code (also it may be a little hacky):
> ```
>
> def set_uid_for_source(ds: DataStream, uid: str):
>
> transformation = ds._j_data_stream.getTransformation()
>
>
>
> source_transformation = transformation
>
> while not source_transformation.getInputs().isEmpty():
>
> source_transformation = source_transformation.getInputs().get(0)
>
>
>
> source_transformation.setUid(uid)
>
> ```
>
> Besides, could you describe your use case a bit and also how you want to
> use DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for
> the sources with these formats, it will send UPDATE messages to downstream
> operators.
>
> Regards
> Dian
>
>
>
> On Mon, Apr 25, 2022 at 12:31 PM lan tran 
> wrote:
>
> Yeah, I already tried that way. However, if we did not use DataStream at
> first. We cannot implement the Savepoint since through the doc if we use
> TableAPI (SQL API), the uid is generated automatically which means we
> cannot revert if the system is crashed.
>
> Best,
> Quynh
>
>
>
> Sent from Mail  for
> Windows
>
>
>
> *From: *Dian Fu 
> *Sent: *Monday, April 25, 2022 11:04 AM
> *To: *lan tran 
> *Cc: *user@flink.apache.org
> *Subject: *Re: 

Re: Checkpoint Timeout Troubleshooting

2022-04-28 Thread Guowei Ma
Hi Sam

I think the first step is to see which part of your Flink APP is blocking
the completion of Checkpoint. Specifically, you can refer to the
"Checkpoint Details" section of the document [1]. Using these methods, you
should be able to observe where the checkpoint is blocked, for example, it
may be an agg operator of the app, or it may be blocked on the sink of
kafka.
Once you know which operator is blocking, you can use FlameGraph [2] to see
where the bottleneck of the operator is. Then do specific operations.

Hope these help!
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/#checkpoint-details
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/

Best,
Guowei


On Fri, Apr 29, 2022 at 2:10 AM Sam Ch  wrote:

> Hello,
>
> I am running into checkpoint timeouts and am looking for guidance on
> troubleshooting. What should I be looking at? What configuration parameters
> would affect this? I am afraid I am a Flink newbie so I am still picking up
> the concepts. Additional notes are below, anything else I can provide?
> Thanks.
>
>
> The checkpoint size is small (less than 100kB)
> Multiple flink apps are running on a cluster, only one is running into
> checkpoint timeouts
> Timeout is set to 10 mins
> Tried aligned and unaligned checkpoints
> Tried clearing checkpoints to start fresh
> Plenty of disk space
> Dataflow: kafka source -> flink app -> kafka sink
>


Re: NO jobmanager.log for yarn-per-job submitted with sql-client.sh

2022-04-28 Thread yu'an huang
Hi ruiyun,


Is there any log4j configuration file or logback.xml in your Flink 
configuration directory? Yarn should check whether the file exists first and 
set relative log options.




> On 26 Apr 2022, at 5:21 PM, ruiyun wan  wrote:
> 
> Flink Version (1.13)
> There is not jobmanager.log file when submit yarn-per-job with 
> sql-client.sh。(set execution.target = yarn-per-job;) There are not -Dlog.file 
> and -Dlog4j.configuration properties argument in the auto-generated 
> launch_container.sh on YARN cluster。e.g. the last line of launch_container.sh 
> is :
> exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx50331648 -Xms50331648 \
>   -XX:MaxMetaspaceSize=134217728 \
>   org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint .
> 
> However,When use yarn-session.sh create a session cluster,the last line of 
> auto generated launch_container.sh is 
> exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1530082096 -Xms1530082096 \
>  -XX:MaxMetaspaceSize=268435456 
>  
> -Dlog.file="/mnt/disk3/log/hadoop-yarn/containers/application_1640228329925_0008/container_e05_1640228329925_0008_01_01/jobmanager.log"
>  \
>  -Dlog4j.configuration=file:log4j.properties \
>  -Dlog4j.configurationFile=file:log4j.properties \
>  org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint .
> which contained   -Dlog.file and   -Dlog4j.configuration 
> 
> The question is :how to configure sql-client.sh to produce jobmanager.log for 
> yarn-per-job?



Re: How to dynamically modify the schema information of a table

2022-04-28 Thread yu'an huang
Thank you for your reporting.

I created a ticket in JIRA. You can track this problem in this link: 
https://issues.apache.org/jira/browse/FLINK-27449 
.



> On 22 Apr 2022, at 10:44 AM, 草莓  wrote:
> 
> The following is the Java code
> 
> @Test
> public void test(){
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> DataStream dataStream = env.fromElements("Alice", "Bob", "John");
> Schema.Builder builder = Schema.newBuilder();
> builder.column("f0",DataTypes.of(String.class)).withComment("this is a 
> comment");
> Table table = tableEnv.fromDataStream(dataStream, 
> builder.build()).as("user_name");
> table.getResolvedSchema();
> table.printSchema();
> }
> 
> Its output is:
> (
>   `user_name` STRING
> )
> 
> My question is, if the comment is lost, what should I do to display the 
> comment information?
> 
> What I need is that the result is
> (
>   `user_name` STRING COMMENT 'this is a comment'
> )
> 
> 



Re: How to define event time and Watermark for intermediary joined table in FlinkSQL?

2022-04-28 Thread liuxiangcao
Hi Shengkai,

Thank you for the reply.

The UDF getEventTimeInNS uses timestamps of both streamA and streamB to
calculate the true event time for streamB events.

For illustrating purpose, we can consider it to be like this:

public Long eval(
Long baseTimeStampFromA,
Long timestampA
Long timestampB) {
  return baseTimeStampFromA + timestampB - timestampA;
}

Basically I need to redefine the event timestamp and watermark for the
output stream of a join operator.

You are right. Ideally I hope FlinkSQL can support defining a watermark on
a view.  Do you know if this was discussed in the Flink community before?
Wondering whether this may be supported in future.

On Thu, Apr 21, 2022 at 2:44 AM Shengkai Fang  wrote:

> Hi,
>
> The watermark of the join operator is the minimum of the watermark of the
> input streams.
>
> ```
> JoinOperator.watermark = min(left.watermark, right.watermark);
> ```
>
> I think it's enough for most cases.  Could you share more details about
> the logic in the UDF getEventTimeInNS?
>
> I think the better solution comparing to the intermediate table is to
> define the watermark on the VIEW. But Flink doesn't support it now.
>
> Best,
> Shengkai
>
>
>
>
> liuxiangcao  于2022年4月16日周六 03:07写道:
>
>> Hi Flink community,
>>
>> *Here is the context: *
>> Theoretically, I would like to write following query but it won't work
>> since we can only define the WATERMARK in a table DDL:
>>
>> INSERT into tableC
>> select tableA.field1
>>  SUM(1) as `count`,
>>  time_ltz AS getEventTimeInNS(tableA.timestamp, tableB.timestamp),
>>  WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
>> from tableA join tableB
>> on tableA.joinCol == tableB.joinCol
>> group by TUMBLE(time_ltz, INTERVAL '30' SECOND), tableA.field1
>> (note: getEventTimeInNS is a UDF that calculates event time using 
>> tableA.timestamp and tableB.timestamp)
>>
>>
>> so I have to define a intermediary table to store the results from
>> joining, and defining event time and watermark in the table DDL, then
>> performs tumbling windowing on the intermediary table:
>>
>> CREATE TABLE IntermediaryTable (
>>field1,
>>   `eventTimestampInNanoseconds`  BIGINT,
>>time_ltz AS TO_TIMESTAMP_LTZ(eventTimestampInNanoseconds/100, 3),
>>WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
>> ) WITH (
>>   'connector' = 'kafka',
>>   'topic' = 'IntermediaryTable',
>>   'properties.bootstrap.servers' = 'xx',
>>   'properties.group.id' = 'contextevent-streaming-sql',
>>   'format' = 'avro'
>> );
>>
>> INSERT INTO IntermediaryTable
>> select tableA.field1
>>   tableB.field2,
>>   getEventTimeInNS(tableA.timestamp, tableB.timestamp),
>> from tableA join tableB
>> on tableA.joinCol == tableB.joinCol;
>>
>> Then, I can perform tumbling window aggregation on the IntermediaryTable:
>>
>> INSERT INTO countTable
>> (select event.field1
>> SUM(1) as `count`
>>  from IntermediaryTable event
>>  GROUP BY
>>   TUMBLE(event.time_ltz, INTERVAL '30' SECOND),
>>   event.field1
>> );
>>
>>
>> This is not convenient because the IntermediaryTable writes to another
>> kafka topic that is only used by the tumbling window aggregation. When I
>> try to group the two INSERT INTO statements within "BEGIN STATEMENT SET;
>> END;", it will fail complaining the topic does not exist. I either have to
>> first create this kafka topic beforehand, or run a separate job to INSERT
>> INTO IntermediaryTable.
>>
>> In Java DataStream API, you can easily do so within flink topology
>> without having to create a separate kafka topic:
>>
>> final DataStream joinedStream =
>>  StreamA.join(StreamB)
>>  .where()
>>  .equalTo()
>>  .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
>>  .apply(aggregation);
>>
>>
>> *Question:*
>> Does the Flink community have any suggestions on how to do this in
>> FlinkSQL in a friendly way? Would it be a good idea for FlinkSQL to support
>> defining eventtime and watermark on the fly without a table ddl? Would love
>> to hear any suggestions. Thanks a lot in advance.
>>
>> --
>> Best Wishes & Regards
>> Shawn Xiangcao Liu
>>
>

-- 
Best Wishes & Regards
Shawn Xiangcao Liu


Flink native kubernetes 下没有足够的 slot 问题

2022-04-28 Thread Pan Junxun
您好,

我最近在尝试使用 native kubernetes 方式部署 flink 集群。我根据官方文档使用 session 
模式部署了一个集群,并在上面提交了一个并行度为 5 的测试 Job,参数设置了 
-D-Dtaskmanager.numberOfTaskSlots=3。提交后可以在前端看到创建了两个 slot 数量为 3 的 
Taskmanager,并且其中有一个 Taskmanager 显示 1 slot free。但是 Job 无法正常启动,得到了

has no more allocated slots for job


Re: 对Flink Table Store咨询

2022-04-28 Thread Jingsong Li
哈喽,感谢你的关注

Tison是对的,Table Store是一个库。我们目前并没有发布maven依赖。

目前你可以有两种方式来调试:

1.在Table Store的工程里调试
2.在拿到flink-table-store-dist下的 pre bundled jar,放到你工程的classpath下来调试

入口类的话是通过SQL的方式:

TableEnvironment.executeSql("CREATE TABLE XX (...)");

当classpath下包含 table store 的 jar 时,会服务发现到 TableStore 的 factory,进而走到
table store 的代码。

你的需求是合理的,我们后续会考虑提供官方的maven依赖支持,且提供DataStream API

Best,
Jingsong

On Sun, Apr 24, 2022 at 10:32 AM tison  wrote:
>
> Flink Table Store 不是应用,而是库。我理解是要配合 Flink
> 来使用的,断点调试的话,看你的需求,如果只是对一段代码有疑问,跑测试打断点就行了。
>
> Best,
> tison.
>
>
> 陈卓宇 <2572805...@qq.com.invalid> 于2022年4月24日周日 09:59写道:
>
> > 您好大佬:
> >  我对Flink Table
> > Store非常感兴趣,想请教您一下怎么结合flink做断点调试,因为看了一下没有找到入口类
> >
> > 陈卓宇
> >
> >
> > 


Re: Flink native kubernetes 下没有足够的 slot 问题

2022-04-28 Thread Pan Junxun
万分抱歉!邮件客户端误触发送出去了。。。

请问为什么 slot 总数充足的情况下会出现 slot 不足的异常?如何在 native kubernetes 下正确启动一个多并行度任务?
感谢大家抽空看我的邮件!
Best,
潘

On Apr 28 2022, at 5:28 pm, Pan Junxun  wrote:
> 您好,
>
> 我最近在尝试使用 native kubernetes 方式部署 flink 集群。我根据官方文档使用 session 
> 模式部署了一个集群,并在上面提交了一个并行度为 5 的测试 Job,参数设置了 
> -D-Dtaskmanager.numberOfTaskSlots=3。提交后可以在前端看到创建了两个 slot 数量为 3 的 
> Taskmanager,并且其中有一个 Taskmanager 显示 1 slot free。但是 Job 无法正常启动,得到了
>
> has no more allocated slots for job



?????? ??Flink Table Store????

2022-04-28 Thread ??????

-- set root path to session config SET 'table-store.path' = '/tmp/table_store';
??flink??config
table sql ??demosql cli


??







----
??: 
   "user-zh"



?????? ??Flink Table Store????

2022-04-28 Thread ??????

??
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;

public class sql {

public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
//.inBatchMode()
.build();

TableEnvironment tenv = TableEnvironment.create(settings);

TableConfig config = tenv.getConfig();

config.set("table-store.path","/Users/zhuoyuchen/Desktop/table_storge/temp");
config.set("execution.checkpointing.interval","10 s");

tenv.executeSql("CREATE TABLE  IF NOT EXISTS word_count (\n" +
"word STRING PRIMARY KEY NOT ENFORCED,\n" +
"cnt BIGINT\n" +
") WITH (\n" +
");");

tenv.executeSql("CREATE TABLE word_table (\n" +
"word STRING\n" +
") WITH (\n" +
"'connector' = 'datagen',\n" +
"'fields.word.length' = '1'\n" +
");");

tenv.executeSql("INSERT INTO word_count SELECT word, COUNT(*) FROM 
word_table GROUP BY word;");
}
}



flink1.15
flink-table-store-dist-0.2
flink-shaded-hadoop-2-uber-2.4.1-10.0

?????? ??Flink Table Store????

2022-04-28 Thread ??????

??
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;

public class sql {

public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
//.inBatchMode()
.build();

TableEnvironment tenv = TableEnvironment.create(settings);

TableConfig config = tenv.getConfig();

config.set("table-store.path","/Users/zhuoyuchen/Desktop/table_storge/temp");
config.set("execution.checkpointing.interval","10 s");

tenv.executeSql("CREATE TABLE  IF NOT EXISTS word_count (\n" +
"word STRING PRIMARY KEY NOT ENFORCED,\n" +
"cnt BIGINT\n" +
") WITH (\n" +
");");

tenv.executeSql("CREATE TABLE word_table (\n" +
"word STRING\n" +
") WITH (\n" +
"'connector' = 'datagen',\n" +
"'fields.word.length' = '1'\n" +
");");

tenv.executeSql("INSERT INTO word_count SELECT word, COUNT(*) FROM 
word_table GROUP BY word;");
}
}



flink1.15
flink-table-store-dist-0.2
flink-shaded-hadoop-2-uber-2.4.1-10.0

Re: Flink native kubernetes 下没有足够的 slot 问题

2022-04-28 Thread huweihua
当 SlotManager 向 TaskExecutor 为作业申请 Slot 后,TaskExecutor 会向 JobMaster offer这些 
Slots。从 TaskExecutor 接受到 SlotManager 的请求后会注册一个定时器,如果在定时器计时结束时仍然没有将 Slots offer 
给 JobMaster,会触发这个问题。

Slot timeout 的时间配置项为taskmanager.slot.timeout,如果没有单独配置,则使用 akka.ask.timeout 
的值(默认为 10s)。

可以尝试增加 taskmanager.slot.timeout 超时来避免这个问题,如果仍然有问题,需要进一步通过 
JobManager/TaskManager 日志进行分析。

> 2022年4月28日 下午8:04,Pan Junxun  写道:
> 
> 感谢回复!
> 
> 日志内容如下:
> 
> 2022-04-28 19:58:20
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
> at 
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
> at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
> at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1140)
> at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1080)
> at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
> at 
> org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
> at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
> at 
> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
> at 
> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
> at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497)
> at sun.reflect.GeneratedMethodAccessor591.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> 

?????? ??Flink Table Store????

2022-04-28 Thread ??????



??







--  --
??: 
   "user-zh"



Re: Flink native kubernetes 下没有足够的 slot 问题

2022-04-28 Thread huweihua
Hi, Junxun

按照你的说法前半部分是符合预期的,并行度为 5 的作业需要 2 个 slot 数量为 3 的 TaskManager。
这里没看到具体的报错日志,方便提供下完成日志吗?以及对应的 flink 版本信息。

> 2022年4月28日 下午5:28,Pan Junxun  写道:
> 
> 您好,
> 
> 我最近在尝试使用 native kubernetes 方式部署 flink 集群。我根据官方文档使用 session 
> 模式部署了一个集群,并在上面提交了一个并行度为 5 的测试 Job,参数设置了 
> -D-Dtaskmanager.numberOfTaskSlots=3。提交后可以在前端看到创建了两个 slot 数量为 3 的 
> Taskmanager,并且其中有一个 Taskmanager 显示 1 slot free。但是 Job 无法正常启动,得到了
> 
> has no more allocated slots for job



Re: Flink native kubernetes 下没有足够的 slot 问题

2022-04-28 Thread Pan Junxun
感谢回复!

日志内容如下:

2022-04-28 19:58:20
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
at 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1140)
at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1080)
at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783)
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
at 
org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
at 
java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
at 
java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
at 
org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361)
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
at 
org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497)
at sun.reflect.GeneratedMethodAccessor591.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.util.FlinkException: TaskExecutor 
akka.tcp://flink@10.100.1.127:6122/user/rpc/taskmanager_0 has no more allocated 
slots for job 4e6bdfa51b6a7fd61d595db0177bc6e8.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnectionIfNoAllocatedResources(TaskExecutor.java:1941)
at 

Re: Flink native kubernetes 下没有足够的 slot 问题

2022-04-28 Thread Pan Junxun
忘记说了,Flink 版本是 1.13.1

On Apr 28 2022, at 8:04 pm, Pan Junxun  wrote:
> 感谢回复!
>
> 日志内容如下:
>
> 2022-04-28 19:58:20
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
> at 
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
> at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
> at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1140)
> at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1080)
> at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
> at 
> org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
> at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
> at 
> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
> at 
> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
> at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497)
> at sun.reflect.GeneratedMethodAccessor591.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.util.FlinkException: TaskExecutor 
> akka.tcp://flink@10.100.1.127:6122/user/rpc/taskmanager_0 has no more 
> allocated slots for job 

Re:Flink SQL??????Java code????debug

2022-04-28 Thread Xuyang
Hi??FlinkjaninoJanino[1]??org.codehaus.janino.source_debugging.enable=true??org.codehaus.janino.source_debugging.dir=mypathdebug[1]
 http://janino-compiler.github.io/janino/#debugging
?? 2022-04-25 17:04:30??"zhiyezou" <1530130...@qq.com.INVALID> ??
>
>Ideadebug??


Re: flink 任务对接k8s的第三方jar包管理问题

2022-04-28 Thread yu'an huang
你可以试试将第三方包放进你的user jar里,制作一个fat jar. 
或者你保证每一个container都存在这个第三方包,不一定是在lib下。然后提交作业的时候使用pipeline.classpath去指定这个路径。

参考https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/python/dependency_management/

> On 25 Apr 2022, at 6:21 PM, 天道酬勤 <1262420...@qq.com.INVALID> wrote:
> 
> 我的flink是通过kubernetes session 
> 模式部署,自己编写的任务中需要引用第三方的jar包,我最初的方式是将第三方的jar挂载至flink/lib包下,但是由于一些特殊的需求我希望在提交任务的时候可以动态指定第三方jar包路径,保证任务运行的时候是以此路径来加载,而不是导入第三方jar到flink/lib下,目前在官网中未找到可用的配置项。
>  希望大家能给我一些建议!



Re: Flink native kubernetes 下没有足够的 slot 问题

2022-04-28 Thread Pan Junxun
感谢大佬指点,我换到 1.14.4 后这个问题解决了

On Apr 28 2022, at 9:03 pm, huweihua  wrote:
> 当 SlotManager 向 TaskExecutor 为作业申请 Slot 后,TaskExecutor 会向 JobMaster offer这些 
> Slots。从 TaskExecutor 接受到 SlotManager 的请求后会注册一个定时器,如果在定时器计时结束时仍然没有将 Slots 
> offer 给 JobMaster,会触发这个问题。
>
> Slot timeout 的时间配置项为taskmanager.slot.timeout,如果没有单独配置,则使用 akka.ask.timeout 
> 的值(默认为 10s)。
> 可以尝试增加 taskmanager.slot.timeout 超时来避免这个问题,如果仍然有问题,需要进一步通过 
> JobManager/TaskManager 日志进行分析。
> > 2022年4月28日 下午8:04,Pan Junxun  写道:
> >
> > 感谢回复!
> >
> > 日志内容如下:
> >
> > 2022-04-28 19:58:20
> > org.apache.flink.runtime.JobException: Recovery is suppressed by 
> > NoRestartBackoffTimeStrategy
> > at 
> > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> > at 
> > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> > at 
> > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
> > at 
> > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
> > at 
> > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
> > at 
> > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
> > at 
> > org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
> > at 
> > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
> > at 
> > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1140)
> > at 
> > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1080)
> > at 
> > org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783)
> > at 
> > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
> > at 
> > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
> > at 
> > org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
> > at 
> > java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
> > at 
> > java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
> > at 
> > java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
> > at 
> > org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
> > at 
> > org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
> > at 
> > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
> > at 
> > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361)
> > at 
> > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
> > at 
> > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
> > at 
> > org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497)
> > at sun.reflect.GeneratedMethodAccessor591.invoke(Unknown Source)
> > at 
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at 
> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> > at 
> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> > at 
> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> > at 
> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> > at akka.actor.Actor.aroundReceive(Actor.scala:517)
> > at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> > at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> > at