RE: SecurityManager in Flink
Hi Gabor, The issue is that, read permission is not getting checked when Flink FileSource is listing the files under given source directory. This is happening as Security Manager is coming as null. public String[] list() { SecurityManager security = System.getSecurityManager(); -> Here Security Manager is coming as Null. if (security != null) { security.checkRead(path); } if (isInvalid()) { return null; } return fs.list(this); } While debugging it, found a method in Flink Security manager like below, hence I suspected towards it and queried to know the role of Flink Security manager. public static void setFromConfiguration(Configuration configuration) { final FlinkSecurityManager flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); if (flinkSecurityManager != null) { try { System.setSecurityManager(flinkSecurityManager); } catch (Exception e) { … … Regards, Kirti Dhar From: Gabor Somogyi Sent: Wednesday, March 6, 2024 7:17 PM To: Kirti Dhar Upadhyay K Cc: User@flink.apache.org Subject: Re: SecurityManager in Flink Hi Kirti, Not sure what is the exact issue here but I'm not convinced that having FlinkSecurityManager is going to solve it. Here is the condition however: * cluster.intercept-user-system-exit != DISABLED (this must be changed) * cluster.processes.halt-on-fatal-error == false (this is good by default) Here is a gist what Flink's SecurityManager does: /** * {@code FlinkSecurityManager} to control certain behaviors that can be captured by Java system * security manager. It can be used to control unexpected user behaviors that potentially impact * cluster availability, for example, it can warn or prevent user code from terminating JVM by * System.exit or halt by logging or throwing an exception. This does not necessarily prevent * malicious users who try to tweak security manager on their own, but more for being dependable * against user mistakes by gracefully handling them informing users rather than causing silent * unavailability. */ G On Wed, Mar 6, 2024 at 11:10 AM Kirti Dhar Upadhyay K via user mailto:user@flink.apache.org>> wrote: Hi Team, I am using Flink File Source with Local File System. I am facing an issue, if source directory does not has read permission, it is returning the list of files as null instead of throwing permission exception (refer the highlighted line below), resulting in NPE. final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath()); for (FileStatus containedStatus : containedFiles) { addSplitsForPath(containedStatus, fs, target); } Debugging the issue found that, SecurityManager is coming as null while listing the files, hence skipping the permissions on directory. What is the way to set SecurityManager in Flink? Regards, Kirti Dhar
RE: SecurityManager in Flink
Hi Yanfei, I am facing this issue on jdk1.8/11. Thanks for giving pointer, I will try to set Security manager and check the behaviour. Regards, Kirti Dhar -Original Message- From: Yanfei Lei Sent: Wednesday, March 6, 2024 4:37 PM To: Kirti Dhar Upadhyay K Cc: User@flink.apache.org Subject: Re: SecurityManager in Flink Hi Kirti Dhar, What is your java version? I guess this problem may be related to FLINK-33309[1]. Maybe you can try adding "-Djava.security.manager" to the java options. [1] https://issues.apache.org/jira/browse/FLINK-33309 Kirti Dhar Upadhyay K via user 于2024年3月6日周三 18:10写道: > > Hi Team, > > > > I am using Flink File Source with Local File System. > > I am facing an issue, if source directory does not has read permission, it is > returning the list of files as null instead of throwing permission exception > (refer the highlighted line below), resulting in NPE. > > > > final FileStatus[] containedFiles = > fs.listStatus(fileStatus.getPath()); > for (FileStatus containedStatus : containedFiles) { > addSplitsForPath(containedStatus, fs, target); } > > Debugging the issue found that, SecurityManager is coming as null while > listing the files, hence skipping the permissions on directory. > > What is the way to set SecurityManager in Flink? > > > > Regards, > > Kirti Dhar > > -- Best, Yanfei
RE: SecurityManager in Flink
Hi Hang, You got it right. The problem is exactly at the same line where you pointed [1]. I have used below solution as of now. ``` If(!Files.isReadable(Paths.get(fileStatus.getPath().getPath( { throw new FlinkRuntimeException("Cannot list files under " + fileStatus.getPath()); } final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath()); for (FileStatus containedStatus : containedFiles) { addSplitsForPath(containedStatus, fs, target); } ``` Although, if you go inside localf.list(), it checks automatically for the read permission using Security Manager. This check is getting skipped as Security Manager is coming as null. Hence I suspected towards Security Manager. [1] https://github.com/apache/flink/blob/9b1375520b6b351df7551d85fcecd920e553cc3a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L161C32-L161C38<https://protect2.fireeye.com/v1/url?k=31323334-501d5122-313273af-45444731-722ab8a60d77d5b6=1=634cbd0d-6962-4ee2-bb8d-7f771a0d428c=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F9b1375520b6b351df7551d85fcecd920e553cc3a%2Fflink-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fcore%2Ffs%2Flocal%2FLocalFileSystem.java%23L161C32-L161C38> Regards, Kirti Dhar From: Hang Ruan Sent: Wednesday, March 6, 2024 6:46 PM To: Kirti Dhar Upadhyay K Cc: User@flink.apache.org Subject: Re: SecurityManager in Flink Hi, Kirti. Could you please provide the stack trace of this NPE? I check the code and I think maybe the problem lies in LocalFileSystem#listStatus. The code in line 161[1] may return null, which will let LocalFileSystem#listStatus return null. Then the `containedFiles` is null and the NPE occurs. I think we should add code to handle this situation as follows. ``` final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath()); if (containedFiles == null) { throw new FlinkRuntimeException("Cannot list files under " + fileStatus.getPath()); } for (FileStatus containedStatus : containedFiles) { addSplitsForPath(containedStatus, fs, target); } ``` Best, Hang [1] https://github.com/apache/flink/blob/9b1375520b6b351df7551d85fcecd920e553cc3a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java#L161C32-L161C38<https://protect2.fireeye.com/v1/url?k=31323334-501d5122-313273af-45444731-722ab8a60d77d5b6=1=634cbd0d-6962-4ee2-bb8d-7f771a0d428c=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F9b1375520b6b351df7551d85fcecd920e553cc3a%2Fflink-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fcore%2Ffs%2Flocal%2FLocalFileSystem.java%23L161C32-L161C38> Kirti Dhar Upadhyay K via user mailto:user@flink.apache.org>> 于2024年3月6日周三 18:10写道: Hi Team, I am using Flink File Source with Local File System. I am facing an issue, if source directory does not has read permission, it is returning the list of files as null instead of throwing permission exception (refer the highlighted line below), resulting in NPE. final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath()); for (FileStatus containedStatus : containedFiles) { addSplitsForPath(containedStatus, fs, target); } Debugging the issue found that, SecurityManager is coming as null while listing the files, hence skipping the permissions on directory. What is the way to set SecurityManager in Flink? Regards, Kirti Dhar
SecurityManager in Flink
Hi Team, I am using Flink File Source with Local File System. I am facing an issue, if source directory does not has read permission, it is returning the list of files as null instead of throwing permission exception (refer the highlighted line below), resulting in NPE. final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath()); for (FileStatus containedStatus : containedFiles) { addSplitsForPath(containedStatus, fs, target); } Debugging the issue found that, SecurityManager is coming as null while listing the files, hence skipping the permissions on directory. What is the way to set SecurityManager in Flink? Regards, Kirti Dhar
RE: RE: Flink Kafka Sink + Schema Registry + Message Headers
Thanks Jiabao and Yaroslav for your quick responses. Regards, Kirti Dhar From: Yaroslav Tkachenko Sent: 01 February 2024 21:42 Cc: user@flink.apache.org Subject: Re: RE: Flink Kafka Sink + Schema Registry + Message Headers The schema registry support is provided in ConfluentRegistryAvroSerializationSchema class (flink-avro-confluent-registry package). On Thu, Feb 1, 2024 at 8:04 AM Yaroslav Tkachenko mailto:yaros...@goldsky.com>> wrote: You can also implement a custom KafkaRecordSerializationSchema, which allows creating a ProducerRecord (see "serialize" method) - you can set message key, headers, etc. manually. It's supported in older versions. On Thu, Feb 1, 2024 at 4:49 AM Jiabao Sun mailto:jiabao@xtransfer.cn>> wrote: Sorry, I didn't notice the version information. This feature was completed in FLINK-31049[1] and will be released in version 3.1.0 of Kafka. The release process[2] is currently underway and will be completed soon. However, version 3.1.0 does not promise support for Flink 1.16. If you need to use this feature, you can consider cherry-picking this commit[3] to the v3.0 branch and package it for your own use. Regarding Schema Registry, I am not familiar with this feature and I apologize for not being able to provide an answer. Best, Jiabao [1] https://issues.apache.org/jira/browse/FLINK-31049 [2] https://lists.apache.org/list?d...@flink.apache.org:lte=1M:flink-connector-kafka%20v3.1.0 [3] https://github.com/apache/flink-connector-kafka/pull/18<https://protect2.fireeye.com/v1/url?k=31323334-501cfaf3-313273af-454445554331-1e24d52ba288559e=1=bfa69810-8bec-43fb-9f3e-34bf00ccc1c9=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fpull%2F18> On 2024/02/01 11:58:29 Kirti Dhar Upadhyay K via user wrote: > Hi Jiabao, > > Thanks for reply. > > Currently I am using Flink 1.16.1 and I am not able to find any > HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder. > Although on github I found this support here: > https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java<https://protect2.fireeye.com/v1/url?k=31323334-501cfaf3-313273af-454445554331-cabd0ad9c1eb5efc=1=bfa69810-8bec-43fb-9f3e-34bf00ccc1c9=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fblob%2Fv3.1%2Fflink-connector-kafka%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fconnector%2Fkafka%2Fsink%2FKafkaRecordSerializationSchemaBuilder.java> > But this doesn't seem released yet. Can you please point me towards correct > Flink version? > > Also, any help on question 1 regarding Schema Registry? > > Regards, > Kirti Dhar > > -Original Message- > From: Jiabao Sun mailto:ji...@xtransfer.cn>> > Sent: 01 February 2024 13:29 > To: user@flink.apache.org<mailto:user@flink.apache.org> > Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers > > Hi Kirti, > > Kafka Sink supports sending messages with headers. > You should implement a HeaderProvider to extract headers from input element. > > > KafkaSink sink = KafkaSink.builder() > .setBootstrapServers(brokers) > .setRecordSerializer(KafkaRecordSerializationSchema.builder() > .setTopic("topic-name") > .setValueSerializationSchema(new SimpleStringSchema()) > .setHeaderProvider(new HeaderProvider() { > @Override > public Headers getHeaders(String input) { > //TODO: implements it > return null; > } > }) > .build() > ) > .build(); > > Best, > Jiabao > > > On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote: > > Hi Mates, > > > > I have below queries regarding Flink Kafka Sink. > > > > > > 1. Does Kafka Sink support schema registry? If yes, is there any > > documentations to configure the same? > > 2. Does Kafka Sink support sending messages (ProducerRecord) with > > headers? > > > > > > Regards, > > Kirti Dhar > > > > >
RE: Flink Kafka Sink + Schema Registry + Message Headers
Hi Jiabao, Thanks for reply. Currently I am using Flink 1.16.1 and I am not able to find any HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder. Although on github I found this support here: https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java But this doesn't seem released yet. Can you please point me towards correct Flink version? Also, any help on question 1 regarding Schema Registry? Regards, Kirti Dhar -Original Message- From: Jiabao Sun Sent: 01 February 2024 13:29 To: user@flink.apache.org Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers Hi Kirti, Kafka Sink supports sending messages with headers. You should implement a HeaderProvider to extract headers from input element. KafkaSink sink = KafkaSink.builder() .setBootstrapServers(brokers) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic-name") .setValueSerializationSchema(new SimpleStringSchema()) .setHeaderProvider(new HeaderProvider() { @Override public Headers getHeaders(String input) { //TODO: implements it return null; } }) .build() ) .build(); Best, Jiabao On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote: > Hi Mates, > > I have below queries regarding Flink Kafka Sink. > > > 1. Does Kafka Sink support schema registry? If yes, is there any > documentations to configure the same? > 2. Does Kafka Sink support sending messages (ProducerRecord) with headers? > > > Regards, > Kirti Dhar > >
Flink Kafka Sink + Schema Registry + Message Headers
Hi Mates, I have below queries regarding Flink Kafka Sink. 1. Does Kafka Sink support schema registry? If yes, is there any documentations to configure the same? 2. Does Kafka Sink support sending messages (ProducerRecord) with headers? Regards, Kirti Dhar
RE: CSV Decoder with AVRO schema generated Object
Hi Alexander, Thanks for reply. Actually I have a system where data travels in form of user defined, AVRO schema generated objects. Sample code: static void readCsvWithCustomSchemaDecoder(StreamExecutionEnvironment env, Path dataDirectory) throws Exception { Class recordClazz = EmployeeTest.class; // This is AVRO generated java object having fields emp_id and Name CsvSchema.Builder builder = CsvSchema.builder().setUseHeader(true).setReorderColumns(true).setColumnSeparator(','). setEscapeChar('"').setLineSeparator(System.lineSeparator()).setQuoteChar('"').setArrayElementSeparator(";"). setNullValue(""); CsvReaderFormat csvFormat = CsvReaderFormat.forSchema(CsvSchema.builder().build(), TypeInformation.of(recordClazz)); FileSource.FileSourceBuilder fileSourceBuilder = FileSource.forRecordStreamFormat(csvFormat, dataDirectory).monitorContinuously(Duration.ofSeconds(30)); fileSourceBuilder.setFileEnumerator((FileEnumerator.Provider) () -> new NonSplittingRecursiveEnumerator(new DefaultFileFilter())); FileSource source = fileSourceBuilder.build(); final DataStreamSource file = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps() .withTimestampAssigner(new WatermarkAssigner((Object input) -> System.currentTimeMillis())),"FileSource"); file.print(); } Regards, Kirti Dhar From: Alexander Fedulov Sent: 26 October 2023 20:59 To: Kirti Dhar Upadhyay K Cc: user@flink.apache.org Subject: Re: CSV Decoder with AVRO schema generated Object Hi Kirti, What do you mean exactly by "Flink CSV Decoder"? Please provide a snippet of the code that you are trying to execute. To be honest, combining CSV with AVRO-generated classes sounds rather strange and you might want to reconsider your approach. As for a quick fix, using aliases in your reader schema might help [1] [1] https://avro.apache.org/docs/1.8.1/spec.html#Aliases Best, Alexander Fedulov On Thu, 26 Oct 2023 at 16:24, Kirti Dhar Upadhyay K via user mailto:user@flink.apache.org>> wrote: Hi Team, I am using Flink CSV Decoder with AVSC generated java Object and facing issue if the field name contains underscore(_) or fieldname starts with Capital case. Sample Schema: { "namespace": "avro.employee", "type": "record", "name": "EmployeeTest", "fields": [ { "name": "emp_id", "type": ["null","long"] }, { "name": "Name", "type": ["null","string"] } ] } Generated Java Object getters/setters: public void setEmpId(java.lang.Long value) { this.emp_id = value; } . . public java.lang.CharSequence getName() { return Name; } Input record: emp_id,Name 1,peter Exception: Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "emp_id" (class avro.person.EmployeeTest), not marked as ignorable (2 known properties: "empId", "name"]) I have also found an old JIRA regarding this: https://issues.apache.org/jira/browse/FLINK-2874 Any help would be appreciated! Regards, Kirti Dhar
CSV Decoder with AVRO schema generated Object
Hi Team, I am using Flink CSV Decoder with AVSC generated java Object and facing issue if the field name contains underscore(_) or fieldname starts with Capital case. Sample Schema: { "namespace": "avro.employee", "type": "record", "name": "EmployeeTest", "fields": [ { "name": "emp_id", "type": ["null","long"] }, { "name": "Name", "type": ["null","string"] } ] } Generated Java Object getters/setters: public void setEmpId(java.lang.Long value) { this.emp_id = value; } . . public java.lang.CharSequence getName() { return Name; } Input record: emp_id,Name 1,peter Exception: Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "emp_id" (class avro.person.EmployeeTest), not marked as ignorable (2 known properties: "empId", "name"]) I have also found an old JIRA regarding this: https://issues.apache.org/jira/browse/FLINK-2874 Any help would be appreciated! Regards, Kirti Dhar
RE: File Source Watermark Issue
Hi Community, Can someone help me here? Regards, Kirti Dhar From: Kirti Dhar Upadhyay K Sent: 10 October 2023 15:52 To: user@flink.apache.org Subject: File Source Watermark Issue Hi Team, I am using Flink File Source with window aggregator as process function, and stuck with a weird issues. File source doesn't seem emitting/progressing the watermarks, whereas if I put a delay (say 100ms) while extracting timestamp from event, it is working fine. A bit same thing I found in comments here https://stackoverflow.com/questions/68736330/flink-watermark-not-advancing-at-all-stuck-at-9223372036854775808/68743019#68743019 Can someone help me here? Regards, Kirti Dhar
File Source Watermark Issue
Hi Team, I am using Flink File Source with window aggregator as process function, and stuck with a weird issues. File source doesn't seem emitting/progressing the watermarks, whereas if I put a delay (say 100ms) while extracting timestamp from event, it is working fine. A bit same thing I found in comments here https://stackoverflow.com/questions/68736330/flink-watermark-not-advancing-at-all-stuck-at-9223372036854775808/68743019#68743019 Can someone help me here? Regards, Kirti Dhar
RE: Flink File Source: File read strategy
Thanks Shammon. Is there any way to verify that File Source reads files directly from S3? Regards, Kirti Dhar From: Shammon FY Sent: 25 September 2023 06:27 To: Kirti Dhar Upadhyay K Cc: user@flink.apache.org Subject: Re: Flink File Source: File read strategy Hi Kirti, I think the default file `Source` does not download files locally in Flink, but reads them directly from S3. However, Flink also supports configuring temporary directories through `io.tmp.dirs`. If it is a user-defined source, it can be obtained from FlinkS3FileSystem. After the Flink job is completed, the directory will be cleaned up. Best, Shammon FY On Fri, Sep 22, 2023 at 3:11 PM Kirti Dhar Upadhyay K via user mailto:user@flink.apache.org>> wrote: Hi Community, I am using Flink File Source with Amazon S3. Please help me on below questions- 1. When Split Enumerator assigns split to Source Reader, does it downloads the file temporarily and then starts reading/decoding the records from file or it creates direct stream with S3? 1. If it is downloaded locally then on which path? Is it configurable? 1. Does this temporary file automatically gets deleted or any explicit cleanup is required? Regards, Kirti Dhar
Flink File Source: File read strategy
Hi Community, I am using Flink File Source with Amazon S3. Please help me on below questions- 1. When Split Enumerator assigns split to Source Reader, does it downloads the file temporarily and then starts reading/decoding the records from file or it creates direct stream with S3? 1. If it is downloaded locally then on which path? Is it configurable? 1. Does this temporary file automatically gets deleted or any explicit cleanup is required? Regards, Kirti Dhar
Query Regarding CSV Decoder
Hi Team, I am using CSV decoder with Flink file source. I am stuck with decoding issues as below- 1. In case there is any blank line in between two records or blank lines in the end of file, it returns the blank object. E.g- Input Records: id,name,age,isPermanent,tenure,salary,gender,contact 5,emp1,25,true,3.5,455,Male,123456789;987654321 10,emp2,25,true,3.5,555,Male,123456789;987654321 Output: {"id": 5, "name": "emp1", "age": 25, "isPermanent": true, "tenure": 3.5, "salary": 455.0, "gender": "Male", "contact": [123456789, 987654321]} {"id": null, "name": null, "age": 0, "isPermanent": false, "tenure": 0.0, "salary": 0.0, "gender": null, "contact": null} {"id": 10, "name": "emp2", "age": 25, "isPermanent": true, "tenure": 3.5, "salary": 555.0, "gender": "Female", "contact": [123456789, 987654321]} Is there any way, so that blank object creation can be avoided for blank lines present? 1. If blank value is coming for any numeric data type, it assigns default value of the data type, whereas it fails decoding for objects like Enum. Scenario - 1: Input: id,name,age,isPermanent,tenure,salary,gender,contact 10,emp1,25,true,2.5,455, ,123456789;987654321 Output: Exception occurs as gender(Enum) is not provided. Scenario - 2: Input: id,name,age,isPermanent,tenure,salary,gender,contact 10,emp1,25,true,,455,Male,123456789;987654321 Output: {"id": 10, "name": "emp1", "age": 25, "isPermanent": true, "tenure": 0.0, "salary": 455.0, "gender": "Male", "contact": [123456789, 987654321]} Ideally it should fail the decoding as blank is string not number, but it gives the default value of datatype. Is there any way to fail the decoding in this case? Any help will be appreciated. Regards, Kirti Dhar
Recommended Download Directory for File Source
Hello Guys, I am using Flink File Source with Amazon S3. AFAIK, File source first downloads the file in temporary location and then starts reading the file and emitting the records. By default the download location is /tmp directory. In case of containerized environment, where Pods have limited memory, is /tmp directory the recommended download directory? Or we should we any persistent location for the same by configuring io.tmp.dirs? Is there significant impact on performance? Regards, Kirti Dhar
File Source Exactly Once Delivery Semantics
Hi Team, I am using Flink File Source in one of my use case. I observed that, while reading file by source reader it stores its position in checkpointed data. In case application crashes, it restores its position from checkpointed data, once application comes up, which may result in re-emitting few records which were emitted in between last checkpointing and application crash. Whereas in doc link https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/guarantees/ I found that File source ensures exactly once delivery semantics with help of data sink. "To guarantee end-to-end exactly-once record delivery (in addition to exactly-once state semantics), the data sink needs to take part in the checkpointing mechanism." Can someone put some light on this? Regards, Kirti Dhar
RE: Custom Counter on Flink File Source
Thanks Hang. Any expected date for Flink 1.18.0 release? Regards, Kirti Dhar From: Hang Ruan Sent: 07 June 2023 07:34 To: Kirti Dhar Upadhyay K Cc: user@flink.apache.org Subject: Re: Custom Counter on Flink File Source Hi, Kirti Dhar Upadhyay K. I check the FLIP-274[1]. This issue will be released in the 1.18.0. It is not contained in any release now. Best, Hang [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator Kirti Dhar Upadhyay K mailto:kirti.k.dhar.upadh...@ericsson.com>> 于2023年6月7日周三 02:51写道: Hi Hang, Thanks for reply. I tried using SplitEnumeratorContext passed in AbstractFileSource#createEnumerator but resulted as NullPointerException. As SplitEnumeratorContext provides its implementation as SourceCoordinatorContext having metricGroup() as below- @Override public SplitEnumeratorMetricGroup metricGroup() { return null; } Am I doing any mistake? Regards, Kirti Dhar From: Hang Ruan mailto:ruanhang1...@gmail.com>> Sent: 06 June 2023 08:12 To: Kirti Dhar Upadhyay K mailto:kirti.k.dhar.upadh...@ericsson.com>> Cc: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Custom Counter on Flink File Source Hi, Kirti Dhar Upadhyay K. We could get the metric group from the context, like `SourceReaderContext` and `SplitEnumeratorContext`. These contexts could be found when creating readers and enumerators. See `AbstractFileSource#createReader` and `AbstractFileSource#createEnumerator`. Best, Hang Kirti Dhar Upadhyay K via user mailto:user@flink.apache.org>> 于2023年6月5日周一 22:57写道: Hi Community, I am trying to add a new counter for number of files collected on Flink File Source. Referring the doc https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ I understand how to add a new counter on any operator. this.counter = getRuntimeContext().getMetricGroup().counter("myCounter"); But not able to get this RuntimeContext on FileSource. Can someone give some clue on this? Regards, Kirti Dhar
RE: Custom Counter on Flink File Source
Hi Hang, Thanks for reply. I tried using SplitEnumeratorContext passed in AbstractFileSource#createEnumerator but resulted as NullPointerException. As SplitEnumeratorContext provides its implementation as SourceCoordinatorContext having metricGroup() as below- @Override public SplitEnumeratorMetricGroup metricGroup() { return null; } Am I doing any mistake? Regards, Kirti Dhar From: Hang Ruan Sent: 06 June 2023 08:12 To: Kirti Dhar Upadhyay K Cc: user@flink.apache.org Subject: Re: Custom Counter on Flink File Source Hi, Kirti Dhar Upadhyay K. We could get the metric group from the context, like `SourceReaderContext` and `SplitEnumeratorContext`. These contexts could be found when creating readers and enumerators. See `AbstractFileSource#createReader` and `AbstractFileSource#createEnumerator`. Best, Hang Kirti Dhar Upadhyay K via user mailto:user@flink.apache.org>> 于2023年6月5日周一 22:57写道: Hi Community, I am trying to add a new counter for number of files collected on Flink File Source. Referring the doc https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ I understand how to add a new counter on any operator. this.counter = getRuntimeContext().getMetricGroup().counter("myCounter"); But not able to get this RuntimeContext on FileSource. Can someone give some clue on this? Regards, Kirti Dhar
Custom Counter on Flink File Source
Hi Community, I am trying to add a new counter for number of files collected on Flink File Source. Referring the doc https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ I understand how to add a new counter on any operator. this.counter = getRuntimeContext().getMetricGroup().counter("myCounter"); But not able to get this RuntimeContext on FileSource. Can someone give some clue on this? Regards, Kirti Dhar
File Source Limitations
Hi Community, I am planning to use FileSource (with S3) in my application. Hence encountered with below limitations: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#current-limitations 1. Watermarking does not work very well for large backlogs of files. This is because watermarks eagerly advance within a file, and the next file might contain data later than the watermark. Ques: Is there any ideal use case/settings/configurations where this problem does not come into picture? OR can be avoided? 1. For Unbounded File Sources, the enumerator currently remembers paths of all already processed files, which is a state that can, in some cases, grow rather large. Ques: As a workaround of this problem, what if I configure a state backend (say RocksDBStateBackend) with some configured TTL, which shall automatically delete the older data. Is there any repercussions of this? Regards, Kirti Dhar
RE: SplitEnumerator and SourceReader
Thanks a lot Martijn for quick response. For point 3, I might got confused on below link: https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-where_run_enumerator Anyways, thanks for clarifying all things. Just a further question on “Yes, because the enumerator needs to remember the paths of all currently processed files. Depending on the use case, that can grow to be big. This is documented at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#current-limitations” Is there any recommendation for this limitation like size of files or number of files or checkpointing state backend etc? Regards, Kirti Dhar From: Martijn Visser Sent: 20 April 2023 18:14 To: Kirti Dhar Upadhyay K Cc: user@flink.apache.org Subject: Re: SplitEnumerator and SourceReader Hi Kirti Dhar, 1. The SourceReader downloads the file, which is assigned to him by the SplitEnumerator 2. This depends on the format; a BulkFormat like Parquet or ORC can be read in batches of records at a time. 3. The SplitEnumerator runs on the JobManager, not on a TaskManager. Have you read something different in the documentation? 4. Yes, because the enumerator needs to remember the paths of all currently processed files. Depending on the use case, that can grow to be big. This is documented at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#current-limitations Best regards, Martijn On Thu, Apr 20, 2023 at 2:30 PM Kirti Dhar Upadhyay K via user mailto:user@flink.apache.org>> wrote: Hi Community, I have started using file source of Flink 1.17.x recently. I was going through the FLIP-27 documentation and as much I understand SplitEnumerator lists files (splits) and assigns to SourceReader. A single instance of SplitEnumerator runs whereas parallelism can be done on SourceReader side. I have below queries on same: 1. Who actually downloads the file (let’s say the file is on S3)? Is it SplitEnumerator which downloads the files and then assign the splits to SourceReaders OR it only lists and give the path of file in split to SourceReader, which downloads the file and process? 1. Is the complete file downloaded in one go? OR chunked downloading is also possible? 1. I got that SplitEnumerator can be run on JobManager OR on single instance of TaskManager. How a user can configure it where to run? 1. Is there any memory footprint impact if FileSource is running in streaming mode (continuous streaming)? Thanks for any help! Regards, Kirti Dhar
SplitEnumerator and SourceReader
Hi Community, I have started using file source of Flink 1.17.x recently. I was going through the FLIP-27 documentation and as much I understand SplitEnumerator lists files (splits) and assigns to SourceReader. A single instance of SplitEnumerator runs whereas parallelism can be done on SourceReader side. I have below queries on same: 1. Who actually downloads the file (let's say the file is on S3)? Is it SplitEnumerator which downloads the files and then assign the splits to SourceReaders OR it only lists and give the path of file in split to SourceReader, which downloads the file and process? 1. Is the complete file downloaded in one go? OR chunked downloading is also possible? 1. I got that SplitEnumerator can be run on JobManager OR on single instance of TaskManager. How a user can configure it where to run? 1. Is there any memory footprint impact if FileSource is running in streaming mode (continuous streaming)? Thanks for any help! Regards, Kirti Dhar
Support of CSV to AVRO Converter in DataStream FileSource
Hi Community, I am reading CSV data using data stream file source connector and need to convert them into AVRO generated specific objects. I am using CsvReaderFormat with CSVSchema but this supports only primitive types of AVRO (that also except null and bytes). Is there any support provided for AVRO complex and Logical types as well? As I can see few classes like CsvToRowDataConverters and RowDataToAvroConverter but seems they are specific to Table APIs. Regards, Kirti Dhar
Queries/Help regarding limitations on File source
Hi, I am using Data stream file source connector in one of my use case. I was going through the documentation where I found below limitations: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#current-limitations 1. Watermarking does not work very well for large backlogs of files. This is because watermarks eagerly advance within a file, and the next file might contain data later than the watermark. Queries: Is there any FLIP/design document to better understand the impact of these limitations? Also, is there any work ongoing on these limitations for future Flink releases, if yes, please redirect to any related document? 1. For Unbounded File Sources, the enumerator currently remembers paths of all already processed files, which is a state that can, in some cases, grow rather large. Query: What all data per file is part of checkpointing state by file source? Appreciate any help! Regards, Kirti Dhar