RE: SecurityManager in Flink

2024-03-06 Thread Kirti Dhar Upadhyay K via user
Hi Gabor,

The issue is that, read permission is not getting checked when Flink FileSource 
is listing the files under given source directory.
This is happening as Security Manager is coming as null.

public String[] list() {
SecurityManager security = System.getSecurityManager(); -> Here Security 
Manager is coming as Null.
if (security != null) {
security.checkRead(path);
}
if (isInvalid()) {
return null;
}
return fs.list(this);
}

While debugging it, found a method in Flink Security manager  like below, hence 
I suspected towards it and queried to know the role of Flink Security manager.


public static void setFromConfiguration(Configuration configuration) {
final FlinkSecurityManager flinkSecurityManager =
FlinkSecurityManager.fromConfiguration(configuration);
if (flinkSecurityManager != null) {
try {
System.setSecurityManager(flinkSecurityManager);
} catch (Exception e) {
…
…

Regards,
Kirti Dhar

From: Gabor Somogyi 
Sent: Wednesday, March 6, 2024 7:17 PM
To: Kirti Dhar Upadhyay K 
Cc: User@flink.apache.org
Subject: Re: SecurityManager in Flink

Hi Kirti,

Not sure what is the exact issue here but I'm not convinced that having 
FlinkSecurityManager is going to solve it.
Here is the condition however:
* cluster.intercept-user-system-exit != DISABLED (this must be changed)
* cluster.processes.halt-on-fatal-error == false (this is good by default)

Here is a gist what Flink's SecurityManager does:
/**
 * {@code FlinkSecurityManager} to control certain behaviors that can be 
captured by Java system
 * security manager. It can be used to control unexpected user behaviors that 
potentially impact
 * cluster availability, for example, it can warn or prevent user code from 
terminating JVM by
 * System.exit or halt by logging or throwing an exception. This does not 
necessarily prevent
 * malicious users who try to tweak security manager on their own, but more for 
being dependable
 * against user mistakes by gracefully handling them informing users rather 
than causing silent
 * unavailability.
 */

G


On Wed, Mar 6, 2024 at 11:10 AM Kirti Dhar Upadhyay K via user 
mailto:user@flink.apache.org>> wrote:
Hi Team,

I am using Flink File Source with Local File System.
I am facing an issue, if source directory does not has read permission, it is 
returning the list of files as null instead of throwing permission exception 
(refer the highlighted line below), resulting in NPE.

final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
for (FileStatus containedStatus : containedFiles) {
addSplitsForPath(containedStatus, fs, target);
}
Debugging the issue found that, SecurityManager is coming as null while listing 
the files, hence skipping the permissions on directory.
What is the way to set SecurityManager in Flink?

Regards,
Kirti Dhar



RE: SecurityManager in Flink

2024-03-06 Thread Kirti Dhar Upadhyay K via user
Hi Yanfei,

I am facing this issue on jdk1.8/11.
 Thanks for giving pointer, I will try to set Security manager and check the 
behaviour.

Regards,
Kirti Dhar

-Original Message-
From: Yanfei Lei  
Sent: Wednesday, March 6, 2024 4:37 PM
To: Kirti Dhar Upadhyay K 
Cc: User@flink.apache.org
Subject: Re: SecurityManager in Flink

Hi Kirti Dhar,
What is your java version? I guess this problem may be related to 
FLINK-33309[1]. Maybe you can try adding "-Djava.security.manager" to the java 
options.

[1] https://issues.apache.org/jira/browse/FLINK-33309

Kirti Dhar Upadhyay K via user  于2024年3月6日周三 18:10写道:
>
> Hi Team,
>
>
>
> I am using Flink File Source with Local File System.
>
> I am facing an issue, if source directory does not has read permission, it is 
> returning the list of files as null instead of throwing permission exception 
> (refer the highlighted line below), resulting in NPE.
>
>
>
> final FileStatus[] containedFiles = 
> fs.listStatus(fileStatus.getPath());
> for (FileStatus containedStatus : containedFiles) {
> addSplitsForPath(containedStatus, fs, target); }
>
> Debugging the issue found that, SecurityManager is coming as null while 
> listing the files, hence skipping the permissions on directory.
>
> What is the way to set SecurityManager in Flink?
>
>
>
> Regards,
>
> Kirti Dhar
>
>



--
Best,
Yanfei


RE: SecurityManager in Flink

2024-03-06 Thread Kirti Dhar Upadhyay K via user
Hi Hang,

You got it right. The problem is exactly at the same line where you pointed [1].
I have used below solution as of now.

```
If(!Files.isReadable(Paths.get(fileStatus.getPath().getPath( {
throw new FlinkRuntimeException("Cannot list files under " + 
fileStatus.getPath());
}

final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());

for (FileStatus containedStatus : containedFiles) {
addSplitsForPath(containedStatus, fs, target);
}
```

Although, if you go inside localf.list(), it checks automatically for the read 
permission using Security Manager. This check is getting skipped as Security 
Manager is coming as null.
Hence I suspected towards Security Manager.

[1] 
https://github.com/apache/flink/blob/9b1375520b6b351df7551d85fcecd920e553cc3a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L161C32-L161C38<https://protect2.fireeye.com/v1/url?k=31323334-501d5122-313273af-45444731-722ab8a60d77d5b6=1=634cbd0d-6962-4ee2-bb8d-7f771a0d428c=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F9b1375520b6b351df7551d85fcecd920e553cc3a%2Fflink-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fcore%2Ffs%2Flocal%2FLocalFileSystem.java%23L161C32-L161C38>


Regards,
Kirti Dhar

From: Hang Ruan 
Sent: Wednesday, March 6, 2024 6:46 PM
To: Kirti Dhar Upadhyay K 
Cc: User@flink.apache.org
Subject: Re: SecurityManager in Flink

Hi, Kirti.

Could you please provide the stack trace of this NPE? I check the code and I 
think maybe the problem lies in LocalFileSystem#listStatus.
The code in line 161[1] may return null, which will let 
LocalFileSystem#listStatus return null. Then the `containedFiles` is null and 
the NPE occurs.
I think we should add code to handle this situation as follows.

```
final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
if (containedFiles == null) {
throw new FlinkRuntimeException("Cannot list files under " + 
fileStatus.getPath());
}
for (FileStatus containedStatus : containedFiles) {
addSplitsForPath(containedStatus, fs, target);
}
```

Best,
Hang

[1] 
https://github.com/apache/flink/blob/9b1375520b6b351df7551d85fcecd920e553cc3a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L161C32-L161C38<https://protect2.fireeye.com/v1/url?k=31323334-501d5122-313273af-45444731-722ab8a60d77d5b6=1=634cbd0d-6962-4ee2-bb8d-7f771a0d428c=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F9b1375520b6b351df7551d85fcecd920e553cc3a%2Fflink-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fcore%2Ffs%2Flocal%2FLocalFileSystem.java%23L161C32-L161C38>

Kirti Dhar Upadhyay K via user 
mailto:user@flink.apache.org>> 于2024年3月6日周三 18:10写道:
Hi Team,

I am using Flink File Source with Local File System.
I am facing an issue, if source directory does not has read permission, it is 
returning the list of files as null instead of throwing permission exception 
(refer the highlighted line below), resulting in NPE.

final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
for (FileStatus containedStatus : containedFiles) {
addSplitsForPath(containedStatus, fs, target);
}
Debugging the issue found that, SecurityManager is coming as null while listing 
the files, hence skipping the permissions on directory.
What is the way to set SecurityManager in Flink?

Regards,
Kirti Dhar



SecurityManager in Flink

2024-03-06 Thread Kirti Dhar Upadhyay K via user
Hi Team,

I am using Flink File Source with Local File System.
I am facing an issue, if source directory does not has read permission, it is 
returning the list of files as null instead of throwing permission exception 
(refer the highlighted line below), resulting in NPE.

final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
for (FileStatus containedStatus : containedFiles) {
addSplitsForPath(containedStatus, fs, target);
}
Debugging the issue found that, SecurityManager is coming as null while listing 
the files, hence skipping the permissions on directory.
What is the way to set SecurityManager in Flink?

Regards,
Kirti Dhar



RE: RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-02-01 Thread Kirti Dhar Upadhyay K via user
Thanks Jiabao and Yaroslav for your quick responses.

Regards,
Kirti Dhar

From: Yaroslav Tkachenko 
Sent: 01 February 2024 21:42
Cc: user@flink.apache.org
Subject: Re: RE: Flink Kafka Sink + Schema Registry + Message Headers

The schema registry support is provided in 
ConfluentRegistryAvroSerializationSchema class (flink-avro-confluent-registry 
package).

On Thu, Feb 1, 2024 at 8:04 AM Yaroslav Tkachenko 
mailto:yaros...@goldsky.com>> wrote:
You can also implement a custom KafkaRecordSerializationSchema, which allows 
creating a ProducerRecord (see "serialize" method) - you can set message key, 
headers, etc. manually. It's supported in older versions.

On Thu, Feb 1, 2024 at 4:49 AM Jiabao Sun 
mailto:jiabao@xtransfer.cn>> wrote:
Sorry, I didn't notice the version information.
This feature was completed in FLINK-31049[1] and will be released in version 
3.1.0 of Kafka.
The release process[2] is currently underway and will be completed soon.

However, version 3.1.0 does not promise support for Flink 1.16.
If you need to use this feature, you can consider cherry-picking this commit[3] 
to the v3.0 branch and package it for your own use.

Regarding Schema Registry, I am not familiar with this feature and I apologize 
for not being able to provide an answer.

Best,
Jiabao

[1] https://issues.apache.org/jira/browse/FLINK-31049
[2] 
https://lists.apache.org/list?d...@flink.apache.org:lte=1M:flink-connector-kafka%20v3.1.0
[3] 
https://github.com/apache/flink-connector-kafka/pull/18<https://protect2.fireeye.com/v1/url?k=31323334-501cfaf3-313273af-454445554331-1e24d52ba288559e=1=bfa69810-8bec-43fb-9f3e-34bf00ccc1c9=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fpull%2F18>


On 2024/02/01 11:58:29 Kirti Dhar Upadhyay K via user wrote:
> Hi Jiabao,
>
> Thanks for reply.
>
> Currently I am using Flink 1.16.1 and I am not able to find any 
> HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder.
> Although on github I found this support here: 
> https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java<https://protect2.fireeye.com/v1/url?k=31323334-501cfaf3-313273af-454445554331-cabd0ad9c1eb5efc=1=bfa69810-8bec-43fb-9f3e-34bf00ccc1c9=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fblob%2Fv3.1%2Fflink-connector-kafka%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fconnector%2Fkafka%2Fsink%2FKafkaRecordSerializationSchemaBuilder.java>
> But this doesn't seem released yet. Can you please point me towards correct 
> Flink version?
>
> Also, any help on question 1 regarding Schema Registry?
>
> Regards,
> Kirti Dhar
>
> -Original Message-
> From: Jiabao Sun mailto:ji...@xtransfer.cn>>
> Sent: 01 February 2024 13:29
> To: user@flink.apache.org<mailto:user@flink.apache.org>
> Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers
>
> Hi Kirti,
>
> Kafka Sink supports sending messages with headers.
> You should implement a HeaderProvider to extract headers from input element.
>
>
> KafkaSink sink = KafkaSink.builder()
> .setBootstrapServers(brokers)
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
> .setTopic("topic-name")
> .setValueSerializationSchema(new SimpleStringSchema())
> .setHeaderProvider(new HeaderProvider() {
> @Override
> public Headers getHeaders(String input) {
> //TODO: implements it
> return null;
>     }
> })
> .build()
> )
> .build();
>
> Best,
> Jiabao
>
>
> On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote:
> > Hi Mates,
> >
> > I have below queries regarding Flink Kafka Sink.
> >
> >
> >   1.  Does Kafka Sink support schema registry? If yes, is there any 
> > documentations to configure the same?
> >   2.  Does Kafka Sink support sending  messages (ProducerRecord) with 
> > headers?
> >
> >
> > Regards,
> > Kirti Dhar
> >
> >
>


RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-02-01 Thread Kirti Dhar Upadhyay K via user
Hi Jiabao,

Thanks for reply.

Currently I am using Flink 1.16.1 and I am not able to find any HeaderProvider 
setter method in class KafkaRecordSerializationSchemaBuilder.
Although on github I found this support here: 
https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
But this doesn't seem released yet. Can you please point me towards correct 
Flink version?

Also, any help on question 1 regarding Schema Registry?

Regards,
Kirti Dhar

-Original Message-
From: Jiabao Sun  
Sent: 01 February 2024 13:29
To: user@flink.apache.org
Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers

Hi Kirti,

Kafka Sink supports sending messages with headers.
You should implement a HeaderProvider to extract headers from input element.


KafkaSink sink = KafkaSink.builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.setHeaderProvider(new HeaderProvider() {
@Override
public Headers getHeaders(String input) {
//TODO: implements it
return null;
}
})
.build()
)
.build();

Best,
Jiabao


On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote:
> Hi Mates,
> 
> I have below queries regarding Flink Kafka Sink.
> 
> 
>   1.  Does Kafka Sink support schema registry? If yes, is there any 
> documentations to configure the same?
>   2.  Does Kafka Sink support sending  messages (ProducerRecord) with headers?
> 
> 
> Regards,
> Kirti Dhar
> 
> 


Flink Kafka Sink + Schema Registry + Message Headers

2024-01-31 Thread Kirti Dhar Upadhyay K via user
Hi Mates,

I have below queries regarding Flink Kafka Sink.


  1.  Does Kafka Sink support schema registry? If yes, is there any 
documentations to configure the same?
  2.  Does Kafka Sink support sending  messages (ProducerRecord) with headers?


Regards,
Kirti Dhar



RE: CSV Decoder with AVRO schema generated Object

2023-10-26 Thread Kirti Dhar Upadhyay K via user
Hi Alexander,

Thanks for reply.
Actually I have a system where data travels in form of user defined, AVRO 
schema generated objects.

Sample code:

static void readCsvWithCustomSchemaDecoder(StreamExecutionEnvironment env, Path 
dataDirectory) throws Exception {

Class recordClazz = EmployeeTest.class; // This is AVRO generated java 
object having fields emp_id and Name
CsvSchema.Builder builder = 
CsvSchema.builder().setUseHeader(true).setReorderColumns(true).setColumnSeparator(',').

setEscapeChar('"').setLineSeparator(System.lineSeparator()).setQuoteChar('"').setArrayElementSeparator(";").
setNullValue("");

CsvReaderFormat csvFormat = 
CsvReaderFormat.forSchema(CsvSchema.builder().build(), 
TypeInformation.of(recordClazz));

FileSource.FileSourceBuilder fileSourceBuilder = 
FileSource.forRecordStreamFormat(csvFormat, 
dataDirectory).monitorContinuously(Duration.ofSeconds(30));
fileSourceBuilder.setFileEnumerator((FileEnumerator.Provider) () -> new 
NonSplittingRecursiveEnumerator(new DefaultFileFilter()));
FileSource source = fileSourceBuilder.build();

final DataStreamSource file = env.fromSource(source, 
WatermarkStrategy.forMonotonousTimestamps()
.withTimestampAssigner(new WatermarkAssigner((Object input) -> 
System.currentTimeMillis())),"FileSource");
file.print();
}


Regards,
Kirti Dhar

From: Alexander Fedulov 
Sent: 26 October 2023 20:59
To: Kirti Dhar Upadhyay K 
Cc: user@flink.apache.org
Subject: Re: CSV Decoder with AVRO schema generated Object

Hi Kirti,

What do you mean exactly by "Flink CSV Decoder"? Please provide a snippet of 
the code that you are trying to execute.

To be honest, combining CSV with AVRO-generated classes sounds rather strange 
and you might want to reconsider your approach.
As for a quick fix, using aliases in your reader schema might help [1]

[1] https://avro.apache.org/docs/1.8.1/spec.html#Aliases

Best,
Alexander Fedulov

On Thu, 26 Oct 2023 at 16:24, Kirti Dhar Upadhyay K via user 
mailto:user@flink.apache.org>> wrote:
Hi Team,

I am using Flink CSV Decoder with AVSC generated java Object and facing issue 
if the field name contains underscore(_) or fieldname starts with Capital case.
Sample Schema:

{
  "namespace": "avro.employee",
  "type": "record",
  "name": "EmployeeTest",
  "fields": [
{
  "name": "emp_id",
  "type": ["null","long"]
},
{
  "name": "Name",
  "type": ["null","string"]
}
]
}

Generated Java Object getters/setters:



public void setEmpId(java.lang.Long value) {
  this.emp_id = value;
}



.

.



public java.lang.CharSequence getName() {
  return Name;
}


Input record:
emp_id,Name
1,peter

Exception:
Caused by: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
 Unrecognized field "emp_id" (class avro.person.EmployeeTest), not marked as 
ignorable (2 known properties: "empId", "name"])

I have also found an old JIRA regarding this: 
https://issues.apache.org/jira/browse/FLINK-2874

Any help would be appreciated!

Regards,
Kirti Dhar






CSV Decoder with AVRO schema generated Object

2023-10-26 Thread Kirti Dhar Upadhyay K via user
Hi Team,

I am using Flink CSV Decoder with AVSC generated java Object and facing issue 
if the field name contains underscore(_) or fieldname starts with Capital case.
Sample Schema:

{
  "namespace": "avro.employee",
  "type": "record",
  "name": "EmployeeTest",
  "fields": [
{
  "name": "emp_id",
  "type": ["null","long"]
},
{
  "name": "Name",
  "type": ["null","string"]
}
]
}

Generated Java Object getters/setters:



public void setEmpId(java.lang.Long value) {
  this.emp_id = value;
}



.

.



public java.lang.CharSequence getName() {
  return Name;
}


Input record:
emp_id,Name
1,peter

Exception:
Caused by: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
 Unrecognized field "emp_id" (class avro.person.EmployeeTest), not marked as 
ignorable (2 known properties: "empId", "name"])

I have also found an old JIRA regarding this: 
https://issues.apache.org/jira/browse/FLINK-2874

Any help would be appreciated!

Regards,
Kirti Dhar






RE: File Source Watermark Issue

2023-10-15 Thread Kirti Dhar Upadhyay K via user
Hi Community,

Can someone help me here?

Regards,
Kirti Dhar

From: Kirti Dhar Upadhyay K
Sent: 10 October 2023 15:52
To: user@flink.apache.org
Subject: File Source Watermark Issue

Hi Team,

I am using Flink File Source with window aggregator as process function, and 
stuck with a weird issues.
File source doesn't seem emitting/progressing the watermarks, whereas if I put 
a delay (say 100ms) while extracting timestamp from event, it is working fine.

A bit same thing I found in comments here 
https://stackoverflow.com/questions/68736330/flink-watermark-not-advancing-at-all-stuck-at-9223372036854775808/68743019#68743019

Can someone help me here?

Regards,
Kirti Dhar


File Source Watermark Issue

2023-10-10 Thread Kirti Dhar Upadhyay K via user
Hi Team,

I am using Flink File Source with window aggregator as process function, and 
stuck with a weird issues.
File source doesn't seem emitting/progressing the watermarks, whereas if I put 
a delay (say 100ms) while extracting timestamp from event, it is working fine.

A bit same thing I found in comments here 
https://stackoverflow.com/questions/68736330/flink-watermark-not-advancing-at-all-stuck-at-9223372036854775808/68743019#68743019

Can someone help me here?

Regards,
Kirti Dhar


RE: Flink File Source: File read strategy

2023-09-24 Thread Kirti Dhar Upadhyay K via user
Thanks Shammon.
Is there any way to verify that File Source reads files directly from S3?

Regards,
Kirti Dhar

From: Shammon FY 
Sent: 25 September 2023 06:27
To: Kirti Dhar Upadhyay K 
Cc: user@flink.apache.org
Subject: Re: Flink File Source: File read strategy

Hi Kirti,

I think the default file `Source` does not download files locally in Flink, but 
reads them directly from S3. However, Flink also supports configuring temporary 
directories through `io.tmp.dirs`. If it is a user-defined source, it can be 
obtained from FlinkS3FileSystem. After the Flink job is completed, the 
directory will be cleaned up.

Best,
Shammon FY

On Fri, Sep 22, 2023 at 3:11 PM Kirti Dhar Upadhyay K via user 
mailto:user@flink.apache.org>> wrote:
Hi Community,

I am using Flink File Source with Amazon S3.
Please help me on below questions-


  1.  When Split Enumerator assigns split to Source Reader, does it downloads 
the file temporarily and then starts reading/decoding the records from file or 
it creates direct stream with S3?


  1.  If it is downloaded locally then on which path? Is it configurable?


  1.  Does this temporary file automatically gets deleted or any explicit 
cleanup is required?


Regards,
Kirti Dhar


Flink File Source: File read strategy

2023-09-22 Thread Kirti Dhar Upadhyay K via user
Hi Community,

I am using Flink File Source with Amazon S3.
Please help me on below questions-


  1.  When Split Enumerator assigns split to Source Reader, does it downloads 
the file temporarily and then starts reading/decoding the records from file or 
it creates direct stream with S3?


  1.  If it is downloaded locally then on which path? Is it configurable?


  1.  Does this temporary file automatically gets deleted or any explicit 
cleanup is required?


Regards,
Kirti Dhar


Query Regarding CSV Decoder

2023-09-19 Thread Kirti Dhar Upadhyay K via user
Hi Team,

I am using CSV decoder with Flink file source.
I am stuck with decoding issues as below-


  1.  In case there is any blank line in between two records or blank lines in 
the end of file, it returns the blank object. E.g-
Input Records:
id,name,age,isPermanent,tenure,salary,gender,contact
5,emp1,25,true,3.5,455,Male,123456789;987654321

10,emp2,25,true,3.5,555,Male,123456789;987654321

Output:
{"id": 5, "name": "emp1", "age": 25, "isPermanent": true, "tenure": 3.5, 
"salary": 455.0, "gender": "Male", "contact": [123456789, 987654321]}
{"id": null, "name": null, "age": 0, "isPermanent": false, "tenure": 0.0, 
"salary": 0.0, "gender": null, "contact": null}
{"id": 10, "name": "emp2", "age": 25, "isPermanent": true, "tenure": 3.5, 
"salary": 555.0, "gender": "Female", "contact": [123456789, 987654321]}

Is there any way, so that blank object creation can be avoided for blank lines 
present?


  1.  If blank value is coming for any numeric data type, it assigns default 
value of the data type, whereas it fails decoding for objects like Enum.



Scenario - 1:

Input:

id,name,age,isPermanent,tenure,salary,gender,contact

10,emp1,25,true,2.5,455, ,123456789;987654321



Output:

Exception occurs as gender(Enum) is not provided.



Scenario - 2:

Input:

id,name,age,isPermanent,tenure,salary,gender,contact

10,emp1,25,true,,455,Male,123456789;987654321



Output:

{"id": 10, "name": "emp1", "age": 25, "isPermanent": true, "tenure": 0.0, 
"salary": 455.0, "gender": "Male", "contact": [123456789, 987654321]}



Ideally it should fail the decoding as blank is string not number, but it gives 
the default value of datatype. Is there any way to fail the decoding in this 
case?

Any help will be appreciated.

Regards,
Kirti Dhar


Recommended Download Directory for File Source

2023-09-15 Thread Kirti Dhar Upadhyay K via user
Hello Guys,

I am using Flink File Source with Amazon S3.
AFAIK, File source first downloads the file in temporary location and then 
starts reading the file and emitting the records.
By default the download location is /tmp directory.

In case of containerized environment, where Pods have limited memory, is /tmp 
directory the recommended download directory?
Or we should we any persistent location for the same by configuring 
io.tmp.dirs? Is there significant impact on performance?

Regards,
Kirti Dhar




File Source Exactly Once Delivery Semantics

2023-08-02 Thread Kirti Dhar Upadhyay K via user
Hi Team,

I am using Flink File Source in one of my use case.
I observed that, while reading file by source reader it stores its position in 
checkpointed data.
In case application crashes, it restores its position from checkpointed data, 
once application comes up, which may result in re-emitting few records which 
were emitted in between last checkpointing and application crash.
Whereas in doc link 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/guarantees/
 I found that File source ensures exactly once delivery semantics with help of 
data sink.
"To guarantee end-to-end exactly-once record delivery (in addition to 
exactly-once state semantics), the data sink needs to take part in the 
checkpointing mechanism."


Can someone put some light on this?

Regards,
Kirti Dhar



RE: Custom Counter on Flink File Source

2023-06-07 Thread Kirti Dhar Upadhyay K via user
Thanks Hang.
Any expected date for Flink 1.18.0 release?

Regards,
Kirti Dhar

From: Hang Ruan 
Sent: 07 June 2023 07:34
To: Kirti Dhar Upadhyay K 
Cc: user@flink.apache.org
Subject: Re: Custom Counter on Flink File Source

Hi, Kirti Dhar Upadhyay K.

I check the FLIP-274[1]. This issue will be released in the 1.18.0. It is not 
contained in any release now.

Best,
Hang

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator

Kirti Dhar Upadhyay K 
mailto:kirti.k.dhar.upadh...@ericsson.com>> 
于2023年6月7日周三 02:51写道:
Hi Hang,

Thanks for reply.
I tried using SplitEnumeratorContext passed in 
AbstractFileSource#createEnumerator but resulted as NullPointerException.
As SplitEnumeratorContext provides its implementation as 
SourceCoordinatorContext having metricGroup() as below-


@Override
public SplitEnumeratorMetricGroup metricGroup() {
return null;
}

Am I doing any mistake?

Regards,
Kirti Dhar

From: Hang Ruan mailto:ruanhang1...@gmail.com>>
Sent: 06 June 2023 08:12
To: Kirti Dhar Upadhyay K 
mailto:kirti.k.dhar.upadh...@ericsson.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Custom Counter on Flink File Source

Hi, Kirti Dhar Upadhyay K.

We could get the metric group from the context, like `SourceReaderContext` and 
`SplitEnumeratorContext`. These contexts could be found when creating readers 
and enumerators. See `AbstractFileSource#createReader` and 
`AbstractFileSource#createEnumerator`.

Best,
Hang

Kirti Dhar Upadhyay K via user 
mailto:user@flink.apache.org>> 于2023年6月5日周一 22:57写道:
Hi Community,

I am trying to add a new counter for number of files collected on Flink File 
Source.
Referring the doc  
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ I 
understand how to add a new counter on any operator.

this.counter = getRuntimeContext().getMetricGroup().counter("myCounter");

But not able to get this RuntimeContext on FileSource.
Can someone give some clue on this?

Regards,
Kirti Dhar


RE: Custom Counter on Flink File Source

2023-06-06 Thread Kirti Dhar Upadhyay K via user
Hi Hang,

Thanks for reply.
I tried using SplitEnumeratorContext passed in 
AbstractFileSource#createEnumerator but resulted as NullPointerException.
As SplitEnumeratorContext provides its implementation as 
SourceCoordinatorContext having metricGroup() as below-


@Override
public SplitEnumeratorMetricGroup metricGroup() {
return null;
}

Am I doing any mistake?

Regards,
Kirti Dhar

From: Hang Ruan 
Sent: 06 June 2023 08:12
To: Kirti Dhar Upadhyay K 
Cc: user@flink.apache.org
Subject: Re: Custom Counter on Flink File Source

Hi, Kirti Dhar Upadhyay K.

We could get the metric group from the context, like `SourceReaderContext` and 
`SplitEnumeratorContext`. These contexts could be found when creating readers 
and enumerators. See `AbstractFileSource#createReader` and 
`AbstractFileSource#createEnumerator`.

Best,
Hang

Kirti Dhar Upadhyay K via user 
mailto:user@flink.apache.org>> 于2023年6月5日周一 22:57写道:
Hi Community,

I am trying to add a new counter for number of files collected on Flink File 
Source.
Referring the doc  
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ I 
understand how to add a new counter on any operator.

this.counter = getRuntimeContext().getMetricGroup().counter("myCounter");

But not able to get this RuntimeContext on FileSource.
Can someone give some clue on this?

Regards,
Kirti Dhar


Custom Counter on Flink File Source

2023-06-05 Thread Kirti Dhar Upadhyay K via user
Hi Community,

I am trying to add a new counter for number of files collected on Flink File 
Source.
Referring the doc  
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ I 
understand how to add a new counter on any operator.

this.counter = getRuntimeContext().getMetricGroup().counter("myCounter");

But not able to get this RuntimeContext on FileSource.
Can someone give some clue on this?

Regards,
Kirti Dhar


File Source Limitations

2023-04-25 Thread Kirti Dhar Upadhyay K via user
Hi Community,

I am planning to use FileSource (with S3) in my application. Hence encountered 
with below limitations:
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#current-limitations



  1.  Watermarking does not work very well for large backlogs of files. This is 
because watermarks eagerly advance within a file, and the next file might 
contain data later than the watermark.
Ques: Is there any ideal use case/settings/configurations where this problem 
does not come into picture? OR can be avoided?


  1.  For Unbounded File Sources, the enumerator currently remembers paths of 
all already processed files, which is a state that can, in some cases, grow 
rather large.
Ques: As a workaround of this problem, what if I configure a state backend (say 
RocksDBStateBackend) with some configured TTL, which shall automatically delete 
the older data. Is there any repercussions of this?


Regards,
Kirti Dhar




RE: SplitEnumerator and SourceReader

2023-04-20 Thread Kirti Dhar Upadhyay K via user
Thanks a lot Martijn for quick response.

For point 3, I might got confused on below link:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-where_run_enumerator

Anyways,  thanks for clarifying all things.

Just a further question on
“Yes, because the enumerator needs to remember the paths of all currently 
processed files. Depending on the use case, that can grow to be big. This is 
documented at 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#current-limitations”

Is there any recommendation for this limitation like size of files or number of 
files or checkpointing state backend etc?

Regards,
Kirti Dhar

From: Martijn Visser 
Sent: 20 April 2023 18:14
To: Kirti Dhar Upadhyay K 
Cc: user@flink.apache.org
Subject: Re: SplitEnumerator and SourceReader

Hi Kirti Dhar,

1. The SourceReader downloads the file, which is assigned to him by the 
SplitEnumerator
2. This depends on the format; a BulkFormat like Parquet or ORC can be read in 
batches of records at a time.
3. The SplitEnumerator runs on the JobManager, not on a TaskManager. Have you 
read something different in the documentation?
4. Yes, because the enumerator needs to remember the paths of all currently 
processed files. Depending on the use case, that can grow to be big. This is 
documented at 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#current-limitations

Best regards,

Martijn



On Thu, Apr 20, 2023 at 2:30 PM Kirti Dhar Upadhyay K via user 
mailto:user@flink.apache.org>> wrote:
Hi Community,

I have started using file source of Flink 1.17.x recently.
I was going through the FLIP-27 documentation and as much I understand 
SplitEnumerator lists files (splits) and assigns to SourceReader. A single 
instance of SplitEnumerator  runs whereas parallelism can be done on 
SourceReader side. I have below queries on same:


  1.  Who actually downloads the file (let’s say the file is on S3)? Is it 
SplitEnumerator which downloads the files and then assign the splits to 
SourceReaders OR it only lists and give the path of file in split to 
SourceReader, which downloads the file and process?


  1.  Is the complete file downloaded in one go? OR chunked downloading is also 
possible?



  1.  I got that SplitEnumerator can be run on JobManager OR on single instance 
of TaskManager. How a user can configure it where to run?



  1.  Is there any memory footprint impact if FileSource is running in 
streaming mode (continuous streaming)?


Thanks for any help!

Regards,
Kirti Dhar


SplitEnumerator and SourceReader

2023-04-20 Thread Kirti Dhar Upadhyay K via user
Hi Community,

I have started using file source of Flink 1.17.x recently.
I was going through the FLIP-27 documentation and as much I understand 
SplitEnumerator lists files (splits) and assigns to SourceReader. A single 
instance of SplitEnumerator  runs whereas parallelism can be done on 
SourceReader side. I have below queries on same:


  1.  Who actually downloads the file (let's say the file is on S3)? Is it 
SplitEnumerator which downloads the files and then assign the splits to 
SourceReaders OR it only lists and give the path of file in split to 
SourceReader, which downloads the file and process?


  1.  Is the complete file downloaded in one go? OR chunked downloading is also 
possible?



  1.  I got that SplitEnumerator can be run on JobManager OR on single instance 
of TaskManager. How a user can configure it where to run?



  1.  Is there any memory footprint impact if FileSource is running in 
streaming mode (continuous streaming)?


Thanks for any help!

Regards,
Kirti Dhar


Support of CSV to AVRO Converter in DataStream FileSource

2023-04-14 Thread Kirti Dhar Upadhyay K via user
Hi Community,

I am reading CSV data using data stream file source connector and need to 
convert them into AVRO generated specific objects.
I am using CsvReaderFormat with CSVSchema but this supports only primitive 
types of AVRO (that also except null and bytes).


Is there any support provided for AVRO complex and Logical types as well? As I 
can see few classes like CsvToRowDataConverters and RowDataToAvroConverter

but seems they are specific to Table APIs.



Regards,

Kirti Dhar



Queries/Help regarding limitations on File source

2023-04-13 Thread Kirti Dhar Upadhyay K via user
Hi,

I am using Data stream file source connector in one of my use case.
I was going through the documentation where I found below limitations:

https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#current-limitations

  1.  Watermarking does not work very well for large backlogs of files. This is 
because watermarks eagerly advance within a file, and the next file might 
contain data later than the watermark.
Queries:
Is there any FLIP/design document to better understand the impact of these 
limitations?
Also, is there any work ongoing on these limitations for future Flink releases, 
if yes, please redirect to any related document?




  1.  For Unbounded File Sources, the enumerator currently remembers paths of 
all already processed files, which is a state that can, in some cases, grow 
rather large.
Query:
   What all data per file is part of checkpointing state by file source?

Appreciate any help!

Regards,
Kirti Dhar