Re: Error in /jars/upload curl request

2023-11-06 Thread Tauseef Janvekar
Hi Chen,

Now I get a different error message.
root@R914SK4W:~/learn-building-flink-applications-in-java-exercises/exercises#
curl -X POST -H "Expect:" -F "jarfile=@./target/travel-i
tinerary-0.1.jar" https://flink-nyquist.hvreaning.com/jars/upload

413 Request Entity Too Large

413 Request Entity Too Large
nginx



Thanks
Tauseef

On Tue, 7 Nov 2023 at 06:19, Chen Yu  wrote:

>  Hi Tauseef,
>
> Adding an @ sign before the path will resolve your problem.
> And I verified that both web and postman upload the jar file properly on
> the master branch code.
> If you are still having problems then you can provide some more detailed
> information.
>
> Here are some documents of curl by `man curl`.
>
>-F, --form 
>   (HTTP SMTP IMAP) For HTTP protocol family, this lets curl
> emulate a filled-in form in which a user  has
>   pressed the submit button. This causes curl to POST data
> using the Content-Type multipart/form-data ac‐
>   cording to RFC 2388.
>
>   For SMTP and IMAP protocols, this is the means to compose a
> multipart mail message to transmit.
>
>   This enables uploading of binary files etc. To force the
> 'content' part to be a file, prefix  the  file
>   name  with an @ sign. To just get the content part from a
> file, prefix the file name with the symbol <.
>   The difference between @ and < is then that @ makes a file
> get attached in the post as a  file  upload,
>   while the < makes a text field and just get the contents for
> that text field from a file.
>
>
> Best,
> Yu Chen
> --
> *发件人:* Tauseef Janvekar 
> *发送时间:* 2023年11月6日 22:27
> *收件人:* user@flink.apache.org 
> *主题:* Error in /jars/upload curl request
>
> I am using curl request to upload a jar but it throws the below error
>
> [image: image.png]
> Received unknown attribute jarfile.
>
> Not sure what is wrong here. I am following the standard documentation
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/
>
> Please let me know if I have to use some other command to upload a jar
> using "/jars/upload" endpoint
>
> I also tried to upload using webui but it hangs continuously and only
> calls GET api with 200 success- https://flink-nyquist.hvreaning.com/jars
>
> Thanks,
> Tauseef
>


[ANNOUNCE] Apache Kyuubi released 1.8.0

2023-11-06 Thread Cheng Pan
Hi all,

The Apache Kyuubi community is pleased to announce that
Apache Kyuubi 1.8.0 has been released!

Apache Kyuubi is a distributed and multi-tenant gateway to provide
serverless SQL on data warehouses and lakehouses.

Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC interface
for end-users to manipulate large-scale data with pre-programmed and
extensible Spark SQL engines.

We are aiming to make Kyuubi an "out-of-the-box" tool for data warehouses
and lakehouses.

This "out-of-the-box" model minimizes the barriers and costs for end-users
to use Spark, Flink, and other computing engines at the client side.

At the server-side, Kyuubi server and engine's multi-tenant architecture
provides the administrators a way to achieve computing resource isolation,
data security, high availability, high client concurrency, etc.

The full release notes and download links are available at:
Release Notes: https://kyuubi.apache.org/release/1.8.0.html

To learn more about Apache Kyuubi, please see
https://kyuubi.apache.org/

Kyuubi Resources:
- Issue: https://github.com/apache/kyuubi/issues
- Mailing list: d...@kyuubi.apache.org

We would like to thank all contributors of the Kyuubi community
who made this release possible!

Thanks,
On behalf of Apache Kyuubi community


Re: Connect to kerberos enabled hive cluster

2023-11-06 Thread Péter Váry
Hi Bo,

You might be interested in using delegation tokens for connecting to Hive.

The faeture was added here:
https://issues.apache.org/jira/browse/FLINK-32223

Peter


On Tue, Nov 7, 2023, 03:16 Bo <99...@qq.com> wrote:

> Hello community,
>
>
> Does anyone have succeeded in using flink with a Kerberos enabled hive
> cluster?
>
>
> I can interact with the hive cluster using a demo program, but it involves
> quite some environmental setup.
>
> Couldn't see how to do this in flink, at least within the scope of
> connector configurations.
>
>
> Any thoughts welcome.
>
>
> Best Regards,
>
>
> Bo
>
>
>


Re: Error in /jars/upload curl request

2023-11-06 Thread Junrui Lee
Hi, Tauseef


Based on the screenshot you provided, it appears that you have not included
the '@' prefix before the file path in your curl command. This prefix is
necessary to indicate to curl that the specified argument should be treated
as a file to be uploaded. Please add the '@' prefix before the file path
and try again.


Best regards, Junrui



Tauseef Janvekar  于2023年11月6日周一 22:27写道:

> I am using curl request to upload a jar but it throws the below error
>
> [image: image.png]
> Received unknown attribute jarfile.
>
> Not sure what is wrong here. I am following the standard documentation
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/
>
> Please let me know if I have to use some other command to upload a jar
> using "/jars/upload" endpoint
>
> I also tried to upload using webui but it hangs continuously and only
> calls GET api with 200 success- https://flink-nyquist.hvreaning.com/jars
>
> Thanks,
> Tauseef
>


Re: Queryable state feature in latest version!!

2023-11-06 Thread Hangxiang Yu
Hi, Puneet.
Queryable State has been deprecated in the latest version which will be
removed in Flink 2.0.
The Interface/Usage is freezed in the 1.x, so you still could reference the
documents of previous versions to use it.
BTW, Could you also share something about your scenarios using it ? That
will help a lot for us to design a better feature about querying state for
users. Thanks~


On Tue, Nov 7, 2023 at 5:53 AM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Hi All
>
> We are using flink 1.10 version which were having Queryable state for
> querying the in-memory state. we are planning to migrate our old
> applications
> to newer version of the flink ,In latest version documents I can't find
> any reference to it. can anyone highlight what was approach to query in
> memory
> state in latest versions.
>
> Thanks
>
> Puneet
> Customer Centria Enterprise Solutions Pvt. Ltd. (‘Customer Centria’) has
> made the following annotations, “This message and the attachments are
> solely for the intended recipient and may contain confidential or
> privileged information. If you are not the intended recipient, any
> disclosure, copying, use, or distribution of the information included in
> the message and any attachments is prohibited. If you have received this
> communication in error, please notify us by reply e-mail immediately and
> permanently delete this message and any attachments. Thank you.
>


-- 
Best,
Hangxiang.


Re: Auditing sink using table api

2023-11-06 Thread Bo
Hi Xuyang.


Yes, the goal is somewhat like a logging system, it is to expose data details 
to external systems for record keeping, regulation, auditing etc.


I have tried to exploit the current logging system, by putting logback, 
kafka-appender on the classpath, and modifying the jdbc connector source code, 
I could construct a special logger that sends the row data with context info 
(job id) to kafka for further processing by external systems.


This approach requires going deep into deployments and tweak logback 
configuration files. Ideally I could like to be able to configure the logging 
system from within flink-conf.yaml. Maybe I could borrow the 
configuration/initialization concept from Metrics modules, and make a light 
weight auditing module.






   
Original Email
   
 

Sender:"Xuyang"< xyzhong...@163.com ;

Sent Time:2023/11/7 10:25

To:"Bo"< 99...@qq.com ;

Cc recipient:"Chen Yu"< yuchen.e...@gmail.com ;"user"< 
user@flink.apache.org ;

Subject:Re:Re:  Auditing sink using table api


Hi, Bo.
Do you means adding a logger sink after the actual sink? IMO, that is 
impossible.


But there is another way. If the sink is provided by flink, you can modify the 
code in it like adding a INFO-level log, print a clearer exception and so on. 
Then re-build the specific connector.







--
  Best!
  Xuyang







在 2023-11-04 17:37:55,"Bo" <99...@qq.com 写道:
Hi, Yu


Thanks for the suggestion.


Ideally the data need to come from the sink being audited, adding another sink 
serves part of the purpose, but if anything goes wrong in the original sink, I 
presume it won't be reflected in the additional sink. (correct me If I'm 
mistaken)


I may have to make some custom sink based on the official ones




-- Original --
From: Chen Yu https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/print/
  
[2]https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/common/#emit-a-table
  
 
 
 
 发件人: Bo <99...@qq.com
 发送时间: 2023年11月4日 13:53
 收件人: user 

Re: Flink Job Failed With Kafka Exception

2023-11-06 Thread Madan D via user
Hello Hang/Lee,Thanks!In my usecase we listen from multiple topics but in few cases one of the topic may become inactive if producer decides to shutdown one of the topic but other topics still will be receiving data but what we observe is that if there’s one of the topic is getting in-active entire flink application is getting failed due to time out while getting metadata but we would like flink job to continue to consume data from other source topics even if one of the topic has any issue since failing entire flink application doesn’t make sense if one if the topic has issue.Regards,Madan On Nov 5, 2023, at 11:29 PM, Hang Ruan  wrote:Hi, Madan.This error seems like that there are some problems when the consumer tries to read the topic metadata. If you use the same source for these topics, the kafka connector cannot skip one of them. As you say, you need to modify the connector's default behavior.Maybe you should read the code in KafkaSourceEnumerator to skip this error.Best,HangJunrui Lee  于2023年11月6日周一 14:30写道:Hi Madan,Do you mean you want to restart only the failed tasks, rather than restarting the entire pipeline region? As far as I know, currently Flink does not support task-level restart, but requires restarting the pipeline region.Best,JunruiMadan D via user  于2023年10月11日周三 12:37写道:Hello Team, We are running the Flink pipeline by consuming data from multiple topics, but we recently encountered that if there's one topic having issues with participation, etc., the whole Flink pipeline is failing, which is affecting topics. Is there a way we can make Flink Piplein keep running even after one of the topics has an issue? We tried to handle exceptions to make sure the job wouldn't fail, but it didn't help out.Caused by: java.lang.RuntimeException: Failed to get metadata for topics 
 
Can you please provide any insights?Regards,Madan



Re: Queryable state feature in latest version!!

2023-11-06 Thread Junrui Lee
Hi, Puneet


Thank you for reaching out. In the latest release of Flink (version 1.18),
we have marked Queryable State as @Deprecated and removed the related
content from the stable documentation. This means that Queryable State is
no longer actively supported or recommended for use. More details can be
found here: https://lists.apache.org/thread/9hmwcjb3q5c24pk3qshjvybfqk62v17m


If you plan to migrate to a version prior to 1.18, such as 1.17, you can
refer to the documentation at the following link for information on how to
query in-memory state using Queryable State:

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/queryable_state/


Best regards,

Junrui

Xuyang  于2023年11月7日周二 10:28写道:

> Hi, Puneet.
>
> Do you mean this doc[1]?
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/queryable_state/
>
>
> --
> Best!
> Xuyang
>
>
> At 2023-11-07 01:36:37, "Puneet Kinra" 
> wrote:
>
> Hi All
>
> We are using flink 1.10 version which were having Queryable state for
> querying the in-memory state. we are planning to migrate our old
> applications
> to newer version of the flink ,In latest version documents I can't find
> any reference to it. can anyone highlight what was approach to query in
> memory
> state in latest versions.
>
> Thanks
>
> Puneet
> Customer Centria Enterprise Solutions Pvt. Ltd. (‘Customer Centria’) has
> made the following annotations, “This message and the attachments are
> solely for the intended recipient and may contain confidential or
> privileged information. If you are not the intended recipient, any
> disclosure, copying, use, or distribution of the information included in
> the message and any attachments is prohibited. If you have received this
> communication in error, please notify us by reply e-mail immediately and
> permanently delete this message and any attachments. Thank you.
>
>


Re:Queryable state feature in latest version!!

2023-11-06 Thread Xuyang
Hi, Puneet.


Do you mean this doc[1]?




[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/queryable_state/




--

Best!
Xuyang




At 2023-11-07 01:36:37, "Puneet Kinra"  wrote:

Hi All


We are using flink 1.10 version which were having Queryable state for querying 
the in-memory state. we are planning to migrate our old applications 
to newer version of the flink ,In latest version documents I can't find any 
reference to it. can anyone highlight what was approach to query in memory
state in latest versions.


Thanks 


Puneet
Customer Centria Enterprise Solutions Pvt. Ltd. (‘Customer Centria’) has made 
the following annotations, “This message and the attachments are solely for the 
intended recipient and may contain confidential or privileged information. If 
you are not the intended recipient, any disclosure, copying, use, or 
distribution of the information included in the message and any attachments is 
prohibited. If you have received this communication in error, please notify us 
by reply e-mail immediately and permanently delete this message and any 
attachments. Thank you.

Re:Re: Auditing sink using table api

2023-11-06 Thread Xuyang
Hi, Bo.
Do you means adding a logger sink after the actual sink? IMO, that is 
impossible. 


But there is another way. If the sink is provided by flink, you can modify the 
code in it like adding a INFO-level log, print a clearer exception and so on. 
Then re-build the specific connector. 







--

Best!
Xuyang




在 2023-11-04 17:37:55,"Bo" <99...@qq.com> 写道:

Hi, Yu


Thanks for the suggestion. 


Ideally the data need to come from the sink being audited, adding another sink 
serves part of the purpose, but if anything goes wrong in the original sink, I 
presume it won't be reflected in the additional sink. (correct me If I'm 
mistaken)


I may have to make some custom sink based on the official ones




-- Original --
From: Chen Yu 
Date: Sat,Nov 4,2023 3:31 PM
To: Bo <99...@qq.com>, user 
Subject: Re: Auditing sink using table api


Hi Bo,


How about write the data to Print Connector[1] simultaneously via 
insertInto[2]? It will print the data into Taskmanager's Log.
Of course, you can choose an appropriate connector according to your audit log 
storage.



Best,
Yu Chen


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/print/
[2]https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/common/#emit-a-table


发件人: Bo <99...@qq.com>
发送时间: 2023年11月4日 13:53
收件人: user 
主题: Auditing sink using table api
 
Hello community,


I am looking for a way to perform auditing of the various sinks (mostly 
JdbcDynamicTableSink) using the table api.
By "auditing", I mean to log details of every row data coming into the sink, 
and any anormalies when the sink write to external systems.


Does flink have some kind of auditing mechanism in place? The only way I could 
see now is to make a custom sink that supports detail logging to external 
systems.


Any thoughts/suggestions?


Regards,


Bo

Re:Re: Clear the State Backends in Flink

2023-11-06 Thread Xuyang
Hi, Arjun.
Are you using DataStream api? Maybe you can refer this doc[1] to set an 
operator-level state TTL to let the state cleared automatically.


Back to your scene, do you use state explicitl in some operators to store file 
names? If not and using a DataStream api, and I'm not mistaken, Flink will not 
store the state actively.


[1]https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl




--

Best!
Xuyang




在 2023-11-06 21:04:11,"arjun s"  写道:

Thanks for your response.
I have shared my  scenario below.

In the context of the Flink job use case, our data source is files, with three 
new files arriving in the source directory every second. The Flink job is 
responsible for reading and processing these files. To the best of my 
knowledge, the State Backend maintains a record of the file names that have 
been processed by the Flink job. Please correct me if I'm mistaken.

If the State Backend does indeed store the file names, I'm concerned about 
potential memory-related issues during long-term operation of the Flink job. If 
such issues may arise, what are the recommended best practices for managing 
this scenario?

Thanks and regards,
Arjun


On Mon, 6 Nov 2023 at 09:24, Hangxiang Yu  wrote:

Hi, Arjun.
Do you mean clearing all states stored in a user-defined state ?

IIUC, It could be done for Operator state.
But it cannot be done for Keyed state for users because every operation for it 
is binded with a specific key currently.
BTW, Could you also share your business scenario ? It could help us to rethink 
the interface. Thanks!


On Tue, Oct 31, 2023 at 12:02 AM arjun s  wrote:

Hi team,
I'm interested in understanding if there is a method available for clearing the 
State Backends in Flink. If so, could you please provide guidance on how to 
accomplish this particular use case?

Thanks and regards,
Arjun S




--

Best,
Hangxiang.

Connect to kerberos enabled hive cluster

2023-11-06 Thread Bo
Hello community,




Does anyone have succeeded in using flink with a Kerberos enabled hive cluster?




I can interact with the hive cluster using a demo program, but it involves 
quite some environmental setup.

Couldn't see how to do this in flink, at least within the scope of connector 
configurations.




Any thoughts welcome.




Best Regards,




Bo

Re: How to tell if job is being restarted in log?

2023-11-06 Thread John Smith
Ok thanks. :)

On Mon, Nov 6, 2023 at 2:58 AM Junrui Lee  wrote:

> Hi John,
>
> If you want to know more details about why your job is restarting, you can
> search for the keyword "to FAILED" in the JobManager logs. These log
> entries will show you the timing of each restart and the associated
> exception information. Additionally, you can check the exception page to
> find relevant details.
>
> Best,
> Junrui
>
> John Smith  于2023年11月4日周六 09:27写道:
>
>> Hi I'm getting metaspace issues and I understand that certain libraries
>> like JDBC don't unload properly and we need to put them in the global class
>> path of flink.
>>
>> But technically my jobs should not be restarting, so what can I look for
>> in the logs to see when the restart?
>>
>


回复: Error in /jars/upload curl request

2023-11-06 Thread Chen Yu
 Hi Tauseef,

Adding an @ sign before the path will resolve your problem.
And I verified that both web and postman upload the jar file properly on the 
master branch code.
If you are still having problems then you can provide some more detailed 
information.

Here are some documents of curl by `man curl`.

   -F, --form 
  (HTTP SMTP IMAP) For HTTP protocol family, this lets curl emulate 
a filled-in form in which a user  has
  pressed the submit button. This causes curl to POST data using 
the Content-Type multipart/form-data ac�\
  cording to RFC 2388.

  For SMTP and IMAP protocols, this is the means to compose a 
multipart mail message to transmit.

  This enables uploading of binary files etc. To force the 
'content' part to be a file, prefix  the  file
  name  with an @ sign. To just get the content part from a file, 
prefix the file name with the symbol <.
  The difference between @ and < is then that @ makes a file get 
attached in the post as a  file  upload,
  while the < makes a text field and just get the contents for that 
text field from a file.


Best,
Yu Chen

发件人: Tauseef Janvekar 
发送时间: 2023年11月6日 22:27
收件人: user@flink.apache.org 
主题: Error in /jars/upload curl request

I am using curl request to upload a jar but it throws the below error

[image.png]
Received unknown attribute jarfile.

Not sure what is wrong here. I am following the standard documentation
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/

Please let me know if I have to use some other command to upload a jar using 
"/jars/upload" endpoint

I also tried to upload using webui but it hangs continuously and only calls GET 
api with 200 success- https://flink-nyquist.hvreaning.com/jars

Thanks,
Tauseef


Re: 退订

2023-11-06 Thread Yunfeng Zhou
Hi,

请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 来取消订阅邮件。

Best
Yunfeng Zhou

On Mon, Nov 6, 2023 at 5:30 PM maozhaolin  wrote:
>
> 退订


Re: Flink operator autoscaler scaling down

2023-11-06 Thread Gyula Fóra
Hey!

Bit of a tricky problem, as it's not really possible to know that the job
will be able to start with lower parallelism in some cases. Custom plugins
may work but that would be an extremely complex solution at this point.

The Kubernetes operator has a built-in rollback mechanism that can help
with rolling back these broken scale operations, have you tried that?
Furthermore we are planning to introduce some heap/GC related metrics soon
(probably after the next release for 1.8.0) that may help us catching these
issues.

Cheers,
Gyula

On Mon, Nov 6, 2023 at 9:27 AM Yang LI  wrote:

> Dear Flink Community,
>
> I am currently working on implementing auto-scaling for my Flink
> application using the Flink operator's autoscaler. During testing, I
> encountered a "java.lang.OutOfMemoryError: Java heap space" exception when
> the autoscaler attempted to scale down. This issue arises when the incoming
> record rate decreases while the state size has not yet reduced
> correspondingly. Despite numerous tests, managing this issue has been
> difficult due to the lack of a parameter that allows for specifying a
> cooldown period(essential for processing and reducing state size)prior to
> actual scaling down. Moreover, determining an optimal duration for this
> cooldown period is also not straightforward. I believe that enhancing the
> autoscaler with a focus on memory checks or more broadly on stability
> conditions could significantly address this issue.. Here are some potential
> solutions that, in my opinion, could improve the situation:
>
>1. Integrate heap memory-related metrics into the metric collection,
>coupled with a memory safety margin check within the autoscaler's 
> algorithm.
>
>2. Introduce a plugin system and a pre-rescaling step in the Flink
>operator's autoscaler, which would allow users to implement custom plugins.
>These plugins could host listeners that activate during the pre-hook step,
>adding an additional checkpoint before the algorithm executes. So we can
>keep blocking scaling down until custom checks are passed to ensure it is
>safe to proceed with scaling down.
>
>3. Implement a parameter that establishes a stability threshold for
>heap memory usage percentage or jvm old gc (duration or count). In the
>event that the threshold is exceeded, the system would revert to the last
>stable scale in the scaling history. Then the stabilization interval would
>start to work, providing the Flink cluster with additional time to process
>and reduce the state size
>
>
>
> Let me know what you think about it! Thanks!
>
> Best,
>
> Yang LI
>


Re: Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-06 Thread Andrew Otto
> unpredictable file schema(Table API)  in the source directory

You'll probably have to write some logic that helps predict the schema :)

Are there actual schemas for the CSV files somewhere?  JSONSchema or
something of the like?At Wikimedia we use JSONSchema (not with CSV
data, but it could work), and have code that can convert from JSONSchema

to Flink Schemas, either TypeInformation or Table API DataType


Here's an example

in code docs for use with Kafka.  You could use this to build read CSV
files instead?  Something like:

TableDescriptor.forConnector("filesystem")
.schema(JsonSchemaFlinkConverter.toSchemaBuilder(jsonSchema).build())
...

If you are doing pure SQL (not table api), you'll need to have something
that translates from your schema to SQL...or start implementing a custom
Catalog
,
which uh, we kind of did
,
but it was not easy.









On Mon, Nov 6, 2023 at 1:30 PM arjun s  wrote:

> Thanks for your response.
> How should we address the issue of dealing with the unpredictable file
> schema(Table API)  in the source directory, as I previously mentioned in my
> email?
>
> Thanks and regards,
> Arjun
>
> On Mon, 6 Nov 2023 at 20:56, Chen Yu  wrote:
>
>> Hi Arjun,
>>
>> If you can filter files by a regex pattern, I think the config
>> `source.path.regex-pattern`[1] maybe what you want.
>>
>>   'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter 
>> files to read under the -- directory 
>> of `path` option. This regex pattern should be   
>>  -- matched with the absolute file path. If this option is set,  
>>   -- the connector  will recursive all files 
>> under the directory-- of `path` 
>> option
>>
>>
>> Best,
>> Yu Chen
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/filesystem/
>>
>> --
>> *发件人:* arjun s 
>> *发送时间:* 2023年11月6日 20:50
>> *收件人:* user@flink.apache.org 
>> *主题:* Handling Schema Variability and Applying Regex Patterns in Flink
>> Job Configuration
>>
>> Hi team,
>> I'm currently utilizing the Table API function within my Flink job, with
>> the objective of reading records from CSV files located in a source
>> directory. To obtain the file names, I'm creating a table and specifying
>> the schema using the Table API in Flink. Consequently, when the schema
>> matches, my Flink job successfully submits and executes as intended.
>> However, in cases where the schema does not match, the job fails to submit.
>> Given that the schema of the files in the source directory is
>> unpredictable, I'm seeking a method to handle this situation.
>> Create table query
>> =
>> CREATE TABLE sample (col1 STRING,col2 STRING,col3 STRING,col4
>> STRING,file.path` STRING NOT NULL METADATA) WITH ('connector' =
>> 'filesystem','path' = 'file:///home/techuser/inputdata','format' =
>> 'csv','source.monitor-interval' = '1')
>> =
>>
>> Furthermore, I have a question about whether there's a way to read files
>> from the source directory based on a specific regex pattern. This is
>> relevant in our situation because only file names that match a particular
>> pattern need to be processed by the Flink job.
>>
>> Thanks and Regards,
>> Arjun
>>
>


Re: Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-06 Thread arjun s
Thanks for your response.
How should we address the issue of dealing with the unpredictable file
schema(Table API)  in the source directory, as I previously mentioned in my
email?

Thanks and regards,
Arjun

On Mon, 6 Nov 2023 at 20:56, Chen Yu  wrote:

> Hi Arjun,
>
> If you can filter files by a regex pattern, I think the config
> `source.path.regex-pattern`[1] maybe what you want.
>
>   'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter 
> files to read under the -- directory 
> of `path` option. This regex pattern should be
> -- matched with the absolute file path. If this option is set,
> -- the connector  will recursive all files 
> under the directory-- of `path` option
>
>
> Best,
> Yu Chen
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/filesystem/
>
> --
> *发件人:* arjun s 
> *发送时间:* 2023年11月6日 20:50
> *收件人:* user@flink.apache.org 
> *主题:* Handling Schema Variability and Applying Regex Patterns in Flink
> Job Configuration
>
> Hi team,
> I'm currently utilizing the Table API function within my Flink job, with
> the objective of reading records from CSV files located in a source
> directory. To obtain the file names, I'm creating a table and specifying
> the schema using the Table API in Flink. Consequently, when the schema
> matches, my Flink job successfully submits and executes as intended.
> However, in cases where the schema does not match, the job fails to submit.
> Given that the schema of the files in the source directory is
> unpredictable, I'm seeking a method to handle this situation.
> Create table query
> =
> CREATE TABLE sample (col1 STRING,col2 STRING,col3 STRING,col4
> STRING,file.path` STRING NOT NULL METADATA) WITH ('connector' =
> 'filesystem','path' = 'file:///home/techuser/inputdata','format' =
> 'csv','source.monitor-interval' = '1')
> =
>
> Furthermore, I have a question about whether there's a way to read files
> from the source directory based on a specific regex pattern. This is
> relevant in our situation because only file names that match a particular
> pattern need to be processed by the Flink job.
>
> Thanks and Regards,
> Arjun
>


Queryable state feature in latest version!!

2023-11-06 Thread Puneet Kinra
Hi All

We are using flink 1.10 version which were having Queryable state for querying 
the in-memory state. we are planning to migrate our old applications
to newer version of the flink ,In latest version documents I can't find any 
reference to it. can anyone highlight what was approach to query in memory
state in latest versions.

Thanks

Puneet
Customer Centria Enterprise Solutions Pvt. Ltd. ('Customer Centria') has made 
the following annotations, "This message and the attachments are solely for the 
intended recipient and may contain confidential or privileged information. If 
you are not the intended recipient, any disclosure, copying, use, or 
distribution of the information included in the message and any attachments is 
prohibited. If you have received this communication in error, please notify us 
by reply e-mail immediately and permanently delete this message and any 
attachments. Thank you.


回复: Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-06 Thread Chen Yu
Hi Arjun,

If you can filter files by a regex pattern, I think the config 
`source.path.regex-pattern`[1] maybe what you want.


  'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter 
files to read under the
-- directory of `path` option. This 
regex pattern should be
-- matched with the absolute file path. 
If this option is set,
-- the connector  will recursive all 
files under the directory
-- of `path` option

Best,
Yu Chen


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/filesystem/


发件人: arjun s 
发送时间: 2023年11月6日 20:50
收件人: user@flink.apache.org 
主题: Handling Schema Variability and Applying Regex Patterns in Flink Job 
Configuration

Hi team,
I'm currently utilizing the Table API function within my Flink job, with the 
objective of reading records from CSV files located in a source directory. To 
obtain the file names, I'm creating a table and specifying the schema using the 
Table API in Flink. Consequently, when the schema matches, my Flink job 
successfully submits and executes as intended. However, in cases where the 
schema does not match, the job fails to submit. Given that the schema of the 
files in the source directory is unpredictable, I'm seeking a method to handle 
this situation.
Create table query
=
CREATE TABLE sample (col1 STRING,col2 STRING,col3 STRING,col4 STRING,file.path` 
STRING NOT NULL METADATA) WITH ('connector' = 'filesystem','path' = 
'file:///home/techuser/inputdata','format' = 'csv','source.monitor-interval' = 
'1')
=

Furthermore, I have a question about whether there's a way to read files from 
the source directory based on a specific regex pattern. This is relevant in our 
situation because only file names that match a particular pattern need to be 
processed by the Flink job.

Thanks and Regards,
Arjun


Error in /jars/upload curl request

2023-11-06 Thread Tauseef Janvekar
I am using curl request to upload a jar but it throws the below error

[image: image.png]
Received unknown attribute jarfile.

Not sure what is wrong here. I am following the standard documentation
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/

Please let me know if I have to use some other command to upload a jar
using "/jars/upload" endpoint

I also tried to upload using webui but it hangs continuously and only calls
GET api with 200 success- https://flink-nyquist.hvreaning.com/jars

Thanks,
Tauseef


Re: Clear the State Backends in Flink

2023-11-06 Thread arjun s
Thanks for your response.
I have shared my  scenario below.
In the context of the Flink job use case, our data source is files, with
three new files arriving in the source directory every second. The Flink
job is responsible for reading and processing these files. To the best of
my knowledge, the State Backend maintains a record of the file names that
have been processed by the Flink job. Please correct me if I'm mistaken.

If the State Backend does indeed store the file names, I'm concerned about
potential memory-related issues during long-term operation of the Flink
job. If such issues may arise, what are the recommended best practices for
managing this scenario?

Thanks and regards,
Arjun

On Mon, 6 Nov 2023 at 09:24, Hangxiang Yu  wrote:

> Hi, Arjun.
> Do you mean clearing all states stored in a user-defined state ?
> IIUC, It could be done for Operator state.
> But it cannot be done for Keyed state for users because every operation
> for it is binded with a specific key currently.
> BTW, Could you also share your business scenario ? It could help us to
> rethink the interface. Thanks!
>
> On Tue, Oct 31, 2023 at 12:02 AM arjun s  wrote:
>
>> Hi team,
>> I'm interested in understanding if there is a method available for
>> clearing the State Backends in Flink. If so, could you please provide
>> guidance on how to accomplish this particular use case?
>>
>> Thanks and regards,
>> Arjun S
>>
>
>
> --
> Best,
> Hangxiang.
>


Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-06 Thread arjun s
Hi team,
I'm currently utilizing the Table API function within my Flink job, with
the objective of reading records from CSV files located in a source
directory. To obtain the file names, I'm creating a table and specifying
the schema using the Table API in Flink. Consequently, when the schema
matches, my Flink job successfully submits and executes as intended.
However, in cases where the schema does not match, the job fails to submit.
Given that the schema of the files in the source directory is
unpredictable, I'm seeking a method to handle this situation.
Create table query
=
CREATE TABLE sample (col1 STRING,col2 STRING,col3 STRING,col4
STRING,file.path` STRING NOT NULL METADATA) WITH ('connector' =
'filesystem','path' = 'file:///home/techuser/inputdata','format' =
'csv','source.monitor-interval' = '1')
=

Furthermore, I have a question about whether there's a way to read files
from the source directory based on a specific regex pattern. This is
relevant in our situation because only file names that match a particular
pattern need to be processed by the Flink job.

Thanks and Regards,
Arjun


Flink operator autoscaler scaling down

2023-11-06 Thread Yang LI
Dear Flink Community,

I am currently working on implementing auto-scaling for my Flink
application using the Flink operator's autoscaler. During testing, I
encountered a "java.lang.OutOfMemoryError: Java heap space" exception when
the autoscaler attempted to scale down. This issue arises when the incoming
record rate decreases while the state size has not yet reduced
correspondingly. Despite numerous tests, managing this issue has been
difficult due to the lack of a parameter that allows for specifying a
cooldown period(essential for processing and reducing state size)prior to
actual scaling down. Moreover, determining an optimal duration for this
cooldown period is also not straightforward. I believe that enhancing the
autoscaler with a focus on memory checks or more broadly on stability
conditions could significantly address this issue.. Here are some potential
solutions that, in my opinion, could improve the situation:

   1. Integrate heap memory-related metrics into the metric collection,
   coupled with a memory safety margin check within the autoscaler's algorithm.

   2. Introduce a plugin system and a pre-rescaling step in the Flink
   operator's autoscaler, which would allow users to implement custom plugins.
   These plugins could host listeners that activate during the pre-hook step,
   adding an additional checkpoint before the algorithm executes. So we can
   keep blocking scaling down until custom checks are passed to ensure it is
   safe to proceed with scaling down.

   3. Implement a parameter that establishes a stability threshold for heap
   memory usage percentage or jvm old gc (duration or count). In the event
   that the threshold is exceeded, the system would revert to the last stable
   scale in the scaling history. Then the stabilization interval would start
   to work, providing the Flink cluster with additional time to process and
   reduce the state size



Let me know what you think about it! Thanks!

Best,

Yang LI


退订

2023-11-06 Thread maozhaolin
退订