Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-19 Thread via GitHub


LadyForest merged PR #24512:
URL: https://github.com/apache/flink/pull/24512


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-19 Thread via GitHub


LadyForest merged PR #24511:
URL: https://github.com/apache/flink/pull/24511


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-17 Thread via GitHub


flinkbot commented on PR #24512:
URL: https://github.com/apache/flink/pull/24512#issuecomment-2002894151

   
   ## CI report:
   
   * 79ca29c5dd966fe4af885c9e5b18a737c5e5b6e9 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-17 Thread via GitHub


LadyForest opened a new pull request, #24512:
URL: https://github.com/apache/flink/pull/24512

   ## What is the purpose of the change
   
   This PR is cherry-picked from https://github.com/apache/flink/pull/24390
   
   ## Brief change log
   
   - Fix unstable TableSourceITCase#testTableHintWithLogicalTableScanReuse
   - Moves the staging dir configuration into builder for easier testing
   
   ## Verifying this change
   
   FileSystemOutputFormatTest#testGetUniqueStagingDirectory
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-17 Thread via GitHub


flinkbot commented on PR #24511:
URL: https://github.com/apache/flink/pull/24511#issuecomment-2002875245

   
   ## CI report:
   
   * 09a83bf383c13f3262a61279871cc0e9882f6e47 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-17 Thread via GitHub


LadyForest opened a new pull request, #24511:
URL: https://github.com/apache/flink/pull/24511

   ## What is the purpose of the change
   
   This PR is cherry-picked from https://github.com/apache/flink/pull/24390
   
   
   ## Brief change log
   
   - Fix unstable TableSourceITCase#testTableHintWithLogicalTableScanReuse
   - Moves the staging dir configuration into builder for easier testing
   
   
   ## Verifying this change
   
   FileSystemOutputFormatTest#testGetUniqueStagingDirectory
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializer: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-14 Thread via GitHub


LadyForest commented on code in PR #24492:
URL: https://github.com/apache/flink/pull/24492#discussion_r1524697588


##
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e:
##
@@ -679,17 +679,18 @@ Method 
 calls method 
 in 
(FileInfoExtractorBulkFormat.java:156)
 Method 
 calls method 

 in (FileInfoExtractorBulkFormat.java:140)
 Method 

 calls method 

 in (FileSystemCommitter.java:146)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:288)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:289)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:290)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:291)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:292)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:324)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:325)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:326)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:327)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:328)
 Method 

 has parameter of type 
 in 
(FileSystemOutputFormat.java:0)
-Method 

 calls method 

 in (FileSystemTableSink.java:566)
+Method 

 is annotated with  in 
(FileSystemOutputFormat.java:291)
+Method 

 calls method  in (FileSystemOutputFormat.java:109)

Review Comment:
   I think I can create a Jira ticket to track this issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-14 Thread via GitHub


LadyForest commented on code in PR #24492:
URL: https://github.com/apache/flink/pull/24492#discussion_r1524719948


##
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e:
##
@@ -679,17 +679,18 @@ Method 
 calls method 
 in 
(FileInfoExtractorBulkFormat.java:156)
 Method 
 calls method 

 in (FileInfoExtractorBulkFormat.java:140)
 Method 

 calls method 

 in (FileSystemCommitter.java:146)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:288)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:289)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:290)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:291)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:292)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:324)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:325)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:326)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:327)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:328)
 Method 

 has parameter of type 
 in 
(FileSystemOutputFormat.java:0)
-Method 

 calls method 

 in (FileSystemTableSink.java:566)
+Method 

 is annotated with  in 
(FileSystemOutputFormat.java:291)
+Method 

 calls method  in (FileSystemOutputFormat.java:109)

Review Comment:
   https://issues.apache.org/jira/browse/FLINK-34669



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-14 Thread via GitHub


LadyForest merged PR #24492:
URL: https://github.com/apache/flink/pull/24492


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-14 Thread via GitHub


LadyForest commented on code in PR #24492:
URL: https://github.com/apache/flink/pull/24492#discussion_r1524698054


##
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e:
##
@@ -679,17 +679,18 @@ Method 
 calls method 
 in 
(FileInfoExtractorBulkFormat.java:156)
 Method 
 calls method 

 in (FileInfoExtractorBulkFormat.java:140)
 Method 

 calls method 

 in (FileSystemCommitter.java:146)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:288)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:289)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:290)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:291)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:292)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:324)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:325)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:326)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:327)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:328)
 Method 

 has parameter of type 
 in 
(FileSystemOutputFormat.java:0)
-Method 

 calls method 

 in (FileSystemTableSink.java:566)
+Method 

 is annotated with  in 
(FileSystemOutputFormat.java:291)
+Method 

 calls method  in (FileSystemOutputFormat.java:109)

Review Comment:
   Let me create a Jira ticket to track this issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-14 Thread via GitHub


LadyForest commented on code in PR #24492:
URL: https://github.com/apache/flink/pull/24492#discussion_r1524697588


##
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e:
##
@@ -679,17 +679,18 @@ Method 
 calls method 
 in 
(FileInfoExtractorBulkFormat.java:156)
 Method 
 calls method 

 in (FileInfoExtractorBulkFormat.java:140)
 Method 

 calls method 

 in (FileSystemCommitter.java:146)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:288)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:289)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:290)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:291)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:292)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:324)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:325)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:326)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:327)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:328)
 Method 

 has parameter of type 
 in 
(FileSystemOutputFormat.java:0)
-Method 

 calls method 

 in (FileSystemTableSink.java:566)
+Method 

 is annotated with  in 
(FileSystemOutputFormat.java:291)
+Method 

 calls method  in (FileSystemOutputFormat.java:109)

Review Comment:
   I think I can create a Jira ticket to track this issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-14 Thread via GitHub


LadyForest commented on code in PR #24492:
URL: https://github.com/apache/flink/pull/24492#discussion_r1524695213


##
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e:
##
@@ -679,17 +679,18 @@ Method 
 calls method 
 in 
(FileInfoExtractorBulkFormat.java:156)
 Method 
 calls method 

 in (FileInfoExtractorBulkFormat.java:140)
 Method 

 calls method 

 in (FileSystemCommitter.java:146)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:288)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:289)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:290)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:291)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:292)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:324)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:325)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:326)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:327)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:328)
 Method 

 has parameter of type 
 in 
(FileSystemOutputFormat.java:0)
-Method 

 calls method 

 in (FileSystemTableSink.java:566)
+Method 

 is annotated with  in 
(FileSystemOutputFormat.java:291)
+Method 

 calls method  in (FileSystemOutputFormat.java:109)

Review Comment:
   > I don't know why my comment didn't make it into the previous approval. 樂
   > 
   > > I'm wondering why these rule updates are necessary. The 
`fileSystemOutputFormat` class is annotated as `@Internal` API. Do you 
understand why we need add these exclusions? Or is it an indication that we 
have too strict ArchUnit rules apply here? 樂
   
   This is exactly the point I wish to discuss with you.I think there are some 
areas where the Arch rule could be optimized. 
   
   Firstly, the constraint for connectors is due to the fact that connector 
codebases such as Kafka and Elasticsearch have been removed from Flink and 
turned into external repositories. Therefore, to prevent the internal code 
changes in Flink from affecting the compilation of connectors in other 
repositories, a rule `CONNECTOR_CLASSES_ONLY_DEPEND_ON_PUBLIC_API` was added in 
the arch rule. However, as for the filesystem connector, since it is part of 
the same code repository as Flink, perhaps we could exclude it from this rule. 
On the other hand, most of the violations arise from `Preconditions#checkXXX`, 
which leads me to consider whether we should promote `Preconditions` to be a 
`Public` or `PublicEvolving` interface.
   
   Secondly, I find updating the violation file quite challenging. According to 
the readme, turning on freeze.refreeze=true in the archunit.properties file and 
re-running the tests should update the violation file. However, the method 
diffs include line numbers, which means that if I submit it all at once, it 
would be very painful for the reviewer. If I don't want the reviewer to suffer, 
I can only compare the diffs line by line and revert the parts where only the 
line numbers have changed. Therefore, I suggest that line numbers should not be 
printed in this file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-14 Thread via GitHub


XComp commented on code in PR #24492:
URL: https://github.com/apache/flink/pull/24492#discussion_r1524607876


##
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e:
##
@@ -679,17 +679,18 @@ Method 
 calls method 
 in 
(FileInfoExtractorBulkFormat.java:156)
 Method 
 calls method 

 in (FileInfoExtractorBulkFormat.java:140)
 Method 

 calls method 

 in (FileSystemCommitter.java:146)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:288)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:289)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:290)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:291)
-Method 
 
calls method 
 in (FileSystemOutputFormat.java:292)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:324)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:325)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:326)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:327)
+Method 
 
calls method 
 in (FileSystemOutputFormat.java:328)
 Method 

 has parameter of type 
 in 
(FileSystemOutputFormat.java:0)
-Method 

 calls method 

 in (FileSystemTableSink.java:566)
+Method 

 is annotated with  in 
(FileSystemOutputFormat.java:291)
+Method 

 calls method  in (FileSystemOutputFormat.java:109)

Review Comment:
   I don't know why my comment didn't make it into the previous approval. 
:thinking: 
   
   > I'm wondering why these rule updates are necessary. The 
`fileSystemOutputFormat` class is annotated as `@Internal` API. Do you 
understand why we need add these exclusions? Or is it an indication that we 
have too strict ArchUnit rules apply here? :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-14 Thread via GitHub


LadyForest commented on PR #24492:
URL: https://github.com/apache/flink/pull/24492#issuecomment-1997117461

   > LGTM  But this fix could even go into 1.18 and 1.17, doesn't it?
   
   Yes, indeed. I'll create another two PRs shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-13 Thread via GitHub


flinkbot commented on PR #24492:
URL: https://github.com/apache/flink/pull/24492#issuecomment-1996302414

   
   ## CI report:
   
   * 63db84c3fb1d069c48e0efe8733e63d4dd912777 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-13 Thread via GitHub


LadyForest opened a new pull request, #24492:
URL: https://github.com/apache/flink/pull/24492

   ## What is the purpose of the change
   
   This PR is cherry-picked from #24390
   
   
   ## Brief change log
   
   * Fix unstable TableSourceITCase#testTableHintWithLogicalTableScanReuse
   * Moves the staging dir configuration into builder for easier testing
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - FileSystemOutputFormatTest#testGetUniqueStagingDirectory
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-13 Thread via GitHub


LadyForest merged PR #24390:
URL: https://github.com/apache/flink/pull/24390


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-13 Thread via GitHub


XComp commented on PR #24390:
URL: https://github.com/apache/flink/pull/24390#issuecomment-1993735441

   No, go ahead. It would be good to provide backports as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-13 Thread via GitHub


LadyForest commented on PR #24390:
URL: https://github.com/apache/flink/pull/24390#issuecomment-1993628470

   Hi @XComp, I plan to merge PR since the CI passed unless you have any other 
concerns.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-12 Thread via GitHub


LadyForest commented on PR #24390:
URL: https://github.com/apache/flink/pull/24390#issuecomment-1991461563

   Hi [XComp](https://github.com/XComp), thanks so much for the time and effort 
you've put into the review and discussion :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-12 Thread via GitHub


XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1521248255


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -282,7 +278,7 @@ private FileSystemOutputFormat createSinkFormat(
 TableMetaStoreFactory msFactory = new 
FileSystemCommitterTest.TestMetaStoreFactory(path);
 return new FileSystemOutputFormat.Builder()
 .setMetaStoreFactory(msFactory)
-.setPath(path)
+.setStagingPath(new Path(stagingPath.toString()))

Review Comment:
   You can still stick to `setPath(stagingBasePath)` here and remove the 
`stagingPath` field. The only thing you have to change is that you have to 
assert for empty directory rather than deleted directory in 
`checkWriteAndCommit`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-12 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1521195360


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -37,24 +39,44 @@
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.entry;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link FileSystemOutputFormat}. */
 class FileSystemOutputFormatTest {
 
-@TempDir private java.nio.file.Path tmpPath;
 @TempDir private java.nio.file.Path outputPath;
 
+@TempDir private java.nio.file.Path stagingBaseDir;
+
+private java.nio.file.Path stagingPath;

Review Comment:
   The reason to introduce both`stagingBaseDir` and `stagingPath` is that 
   
   1. We cannot directly use `@TempDir` to annotate `stagingPath`. Junit5 will 
automatically create a temp directory, and thus the test will fail at 
`Preconditions.checkState`.
   
   2. The reason to introduce `@TempDir stagingBaseDir`(rather than reuse 
`outputPath`) is to decouple the test logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-12 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1521155291


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java:
##
@@ -96,6 +99,22 @@ private FileSystemOutputFormat(
 this.outputFileConfig = outputFileConfig;
 this.identifier = identifier;
 this.partitionCommitPolicyFactory = partitionCommitPolicyFactory;
+
+createStagingDirectory(this.stagingPath);
+}
+
+private static void createStagingDirectory(Path stagingPath) {
+try {
+final FileSystem stagingFileSystem = stagingPath.getFileSystem();
+Preconditions.checkState(
+!stagingFileSystem.exists(stagingPath),
+"Staging dir %s already exists",
+stagingFileSystem);

Review Comment:
   Nit: just noticed a typo. It should be `stagingPath` instead of 
`stagingFileSystem` at L#112



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-12 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1521155291


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java:
##
@@ -96,6 +99,22 @@ private FileSystemOutputFormat(
 this.outputFileConfig = outputFileConfig;
 this.identifier = identifier;
 this.partitionCommitPolicyFactory = partitionCommitPolicyFactory;
+
+createStagingDirectory(this.stagingPath);
+}
+
+private static void createStagingDirectory(Path stagingPath) {
+try {
+final FileSystem stagingFileSystem = stagingPath.getFileSystem();
+Preconditions.checkState(
+!stagingFileSystem.exists(stagingPath),
+"Staging dir %s already exists",
+stagingFileSystem);

Review Comment:
   Nit: just noticed a typo that it should be `stagingPath` instead of 
`stagingFileSystem` at L#112



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-12 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1521128496


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -70,6 +85,11 @@ private static Map 
getFileContentByPath(java.nio.file.Path directo
 return contents;
 }
 
+private static Map getStagingFileContent(

Review Comment:
   > The reason I am stressing on that part is that I'd like to remove the 
FileSystemOutputFormat#getStagingFolder method.
   
   Good point! Let me remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-12 Thread via GitHub


XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1521036729


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -83,157 +110,193 @@ void after() {
 @Test
 void testClosingWithoutInput() throws Exception {
 try (OneInputStreamOperatorTestHarness testHarness =
-createSink(false, false, false, new LinkedHashMap<>(), new 
AtomicReference<>())) {
+createTestHarness(createSinkFormat(false, false, false, new 
LinkedHashMap<>( {
 testHarness.setup();
 testHarness.open();
 }
 }
 
 @Test
 void testNonPartition() throws Exception {
-AtomicReference> ref = new 
AtomicReference<>();
-try (OneInputStreamOperatorTestHarness testHarness =
-createSink(false, false, false, new LinkedHashMap<>(), ref)) {
-writeUnorderedRecords(testHarness);
-assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-}
-
-ref.get().finalizeGlobal(finalizationContext);
-Map content = getFileContentByPath(outputPath);
-assertThat(content.values())
-.containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-}
-
-private void writeUnorderedRecords(OneInputStreamOperatorTestHarness testHarness)
-throws Exception {
-testHarness.setup();
-testHarness.open();
-
-testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 
1L));
+checkWriteAndCommit(
+false,
+false,
+false,
+new LinkedHashMap<>(),
+DEFAULT_INPUT_SUPPLIER,
+DEFAULT_OUTPUT_SUPPLIER);
 }
 
 @Test
 void testOverrideNonPartition() throws Exception {
 testNonPartition();
-
-AtomicReference> ref = new 
AtomicReference<>();
-try (OneInputStreamOperatorTestHarness testHarness =
-createSink(true, false, false, new LinkedHashMap<>(), ref)) {
-writeUnorderedRecords(testHarness);
-assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-}
-
-ref.get().finalizeGlobal(finalizationContext);
-Map content = getFileContentByPath(outputPath);
-assertThat(content).hasSize(1);
-assertThat(content.values())
-.containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-assertThat(new File(tmpPath.toUri())).doesNotExist();
+checkWriteAndCommit(
+true,
+false,
+false,
+new LinkedHashMap<>(),
+DEFAULT_INPUT_SUPPLIER,
+DEFAULT_OUTPUT_SUPPLIER);
 }
 
 @Test
 void testStaticPartition() throws Exception {
-AtomicReference> ref = new 
AtomicReference<>();
 LinkedHashMap staticParts = new LinkedHashMap<>();
 staticParts.put("c", "p1");
-try (OneInputStreamOperatorTestHarness testHarness =
-createSink(false, true, false, staticParts, ref)) {
-testHarness.setup();
-testHarness.open();
-
-testHarness.processElement(new StreamRecord<>(Row.of("a1", 1), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 
1L));
-assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-}
 
-ref.get().finalizeGlobal(finalizationContext);
-Map content = getFileContentByPath(outputPath);
-assertThat(content).hasSize(1);
-
assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1");
-assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" + 
"a2,2\n" + "a3,3\n");
-assertThat(new File(tmpPath.toUri())).doesNotExist();
+checkWriteAndCommit(
+false,
+true,
+false,
+staticParts,
+() ->
+Arrays.asList(
+new StreamRecord<>(Row.of("a1", 1), 1L),
+new StreamRecord<>(Row.of("a2", 2), 1L),
+new StreamRecord<>(Row.of("a2", 2), 1L),
+new StreamRecord<>(Row.of("a3", 3), 1L)),
+() ->
+Collections.singletonMap(
+"c=p1", createFileContent("a1,1", 

Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-12 Thread via GitHub


XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1507274806


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -104,15 +113,101 @@ void testNonPartition() throws Exception {
 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+void testStagingDirBehavior(boolean shareStagingDir) throws Exception {
+// sink1
+AtomicReference> sinkRef1 = new 
AtomicReference<>();
+AtomicReference> fileToCommitRef1 = new 
AtomicReference<>();
+writeRowsToSink(
+sinkRef1,
+fileToCommitRef1,
+getStagingDir(shareStagingDir),
+Row.of("a1", 1, "x1"),
+Row.of("a2", 2, "x2"));
+
+// sink2
+AtomicReference> sinkRef2 = new 
AtomicReference<>();
+AtomicReference> fileToCommitRef2 = new 
AtomicReference<>();
+writeRowsToSink(
+sinkRef2,
+fileToCommitRef2,
+getStagingDir(shareStagingDir),
+Row.of("b1", 1, "y1"),
+Row.of("b2", 2, "y2"));
+
+assertSinkBehavior(sinkRef1, fileToCommitRef1, sinkRef2, 
fileToCommitRef2, shareStagingDir);
+}
+
+private void writeRowsToSink(
+AtomicReference> sinkRef,
+AtomicReference> contentRef,
+Path stagingDir,
+Row... records)
+throws Exception {
+try (OneInputStreamOperatorTestHarness testHarness =
+createSink(false, false, false, stagingDir, new 
LinkedHashMap<>(), sinkRef)) {
+writeUnorderedRecords(testHarness, Arrays.asList(records));
+
contentRef.set(getFileContentByPath(Paths.get(stagingDir.getPath(;
+}
+}
+
+private Path getStagingDir(boolean shareStagingDir) {
+String pathPostfix = 
FileSystemTableSink.getStagingPathPostfix(shareStagingDir);
+return Path.fromLocalFile(tmpPath.resolve(pathPostfix).toFile());
+}
+
+private void assertSinkBehavior(
+AtomicReference> sinkRef1,
+AtomicReference> fileToCommitRef1,
+AtomicReference> sinkRef2,
+AtomicReference> fileToCommitRef2,
+boolean shareStagingDir)
+throws Exception {
+Map fileToCommit1 = fileToCommitRef1.get();
+Map fileToCommit2 = fileToCommitRef2.get();
+assertThat(fileToCommit2.keySet()).allMatch(File::exists);
+if (shareStagingDir) {
+assertThat(fileToCommit1.keySet()).noneMatch(File::exists);
+} else {
+assertThat(fileToCommit1.keySet()).allMatch(File::exists);
+}
+sinkRef1.get().finalizeGlobal(finalizationContext);
+sinkRef2.get().finalizeGlobal(finalizationContext);
+Map committedFiles = getFileContentByPath(outputPath);
+if (shareStagingDir) {

Review Comment:
   Your understanding is correct. It's just that this test is actually testing 
`FileSystemOutputFormat` for a scenario where some other class (in our case 
`FileSystemTableSink`) has a bug.
   
   This test is fine if you think that `FileSystemOutputFormat` should be able 
to handle this case. But I'm wondering whether it would be more appropriate to 
fail in such a case where multiple instances are accessing the same folder (but 
there's the question how easily a `FileSystemOutputFormat` instance can detect 
something like that. 
   
   Anyway, resolving this test scenario easily indicates that the scope of the 
different classes is not well-defined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-11 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1520768668


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java:
##
@@ -286,10 +302,21 @@ public Builder setIsToLocal(boolean isToLocal) {
 }
 
 public Builder setPath(Path path) {
+return this.setStagingPath(toStagingPath(path));
+}

Review Comment:
   Nit: to avoid arch rule being violated
   ```suggestion
   this.path = toStagingPath(path);
   return this;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-11 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1520757091


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -70,6 +85,11 @@ private static Map 
getFileContentByPath(java.nio.file.Path directo
 return contents;
 }
 
+private static Map getStagingFileContent(

Review Comment:
   > What we want to make sure is that the class actually creates its own 
staging folder
   
   I believe this is the point we want to verify after moving the generation of 
the staging folder to `FileSystemOutputFormat`.
   
   I understand your concern. In fact, `PartitionWriterTest` and 
`FileSystemCommitterTest` have already covered the checks for written content. 
Therefore, the tests for `FileSystemOutputFormat` should indeed focus more on 
the creation/deletion of directories. If we were to check file content as well, 
it would resemble an IT case.
   
   I guess the reason content is checked is that the test author believes 
`FileSystemOutputFormat` has the capability to write records (implemented 
through the `writeRecord` method) and commit files from the staging folder to 
the outputPath (achieved with `finalizeGlobal`). Thus, it tested the written 
content even before our modifications were made.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-11 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1520696809


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -83,153 +103,175 @@ void after() {
 @Test
 void testClosingWithoutInput() throws Exception {
 try (OneInputStreamOperatorTestHarness testHarness =
-createSink(false, false, false, new LinkedHashMap<>(), new 
AtomicReference<>())) {
+createTestHarness(
+false, false, false, new LinkedHashMap<>(), new 
AtomicReference<>())) {
 testHarness.setup();
 testHarness.open();
 }
 }
 
 @Test
 void testNonPartition() throws Exception {
-AtomicReference> ref = new 
AtomicReference<>();
-try (OneInputStreamOperatorTestHarness testHarness =
-createSink(false, false, false, new LinkedHashMap<>(), ref)) {
-writeUnorderedRecords(testHarness);
-assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-}
-
-ref.get().finalizeGlobal(finalizationContext);
-Map content = getFileContentByPath(outputPath);
-assertThat(content.values())
-.containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-}
-
-private void writeUnorderedRecords(OneInputStreamOperatorTestHarness testHarness)
-throws Exception {
-testHarness.setup();
-testHarness.open();
-
-testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 
1L));
+checkWriteAndCommit(
+false,
+false,
+false,
+new LinkedHashMap<>(),
+DEFAULT_INPUT_SUPPLIER,
+DEFAULT_OUTPUT_SUPPLIER);
 }
 
 @Test
 void testOverrideNonPartition() throws Exception {
 testNonPartition();
-
-AtomicReference> ref = new 
AtomicReference<>();
-try (OneInputStreamOperatorTestHarness testHarness =
-createSink(true, false, false, new LinkedHashMap<>(), ref)) {
-writeUnorderedRecords(testHarness);
-assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-}
-
-ref.get().finalizeGlobal(finalizationContext);
-Map content = getFileContentByPath(outputPath);
-assertThat(content).hasSize(1);
-assertThat(content.values())
-.containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-assertThat(new File(tmpPath.toUri())).doesNotExist();
+checkWriteAndCommit(
+true,
+false,
+false,
+new LinkedHashMap<>(),
+DEFAULT_INPUT_SUPPLIER,
+DEFAULT_OUTPUT_SUPPLIER);
 }
 
 @Test
 void testStaticPartition() throws Exception {
-AtomicReference> ref = new 
AtomicReference<>();
 LinkedHashMap staticParts = new LinkedHashMap<>();
 staticParts.put("c", "p1");
-try (OneInputStreamOperatorTestHarness testHarness =
-createSink(false, true, false, staticParts, ref)) {
-testHarness.setup();
-testHarness.open();
-
-testHarness.processElement(new StreamRecord<>(Row.of("a1", 1), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 
1L));
-assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-}
 
-ref.get().finalizeGlobal(finalizationContext);
-Map content = getFileContentByPath(outputPath);
-assertThat(content).hasSize(1);
-
assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1");
-assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" + 
"a2,2\n" + "a3,3\n");
-assertThat(new File(tmpPath.toUri())).doesNotExist();
+checkWriteAndCommit(
+false,
+true,
+false,
+staticParts,
+() ->
+Arrays.asList(
+new StreamRecord<>(Row.of("a1", 1), 1L),
+new StreamRecord<>(Row.of("a2", 2), 1L),
+new StreamRecord<>(Row.of("a2", 2), 1L),
+new StreamRecord<>(Row.of("a3", 3), 1L)),
+() ->
+Collections.singletonList(
+

Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-11 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1520688625


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java:
##
@@ -197,13 +200,37 @@ public void close() throws IOException {
 }
 }
 
+private Path toStagingPath(Path path) {
+// Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+// Please see FLINK-29114 for more details

Review Comment:
   +1. Let me remove this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-11 Thread via GitHub


XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1519417563


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -83,153 +103,175 @@ void after() {
 @Test
 void testClosingWithoutInput() throws Exception {
 try (OneInputStreamOperatorTestHarness testHarness =
-createSink(false, false, false, new LinkedHashMap<>(), new 
AtomicReference<>())) {
+createTestHarness(
+false, false, false, new LinkedHashMap<>(), new 
AtomicReference<>())) {
 testHarness.setup();
 testHarness.open();
 }
 }
 
 @Test
 void testNonPartition() throws Exception {
-AtomicReference> ref = new 
AtomicReference<>();
-try (OneInputStreamOperatorTestHarness testHarness =
-createSink(false, false, false, new LinkedHashMap<>(), ref)) {
-writeUnorderedRecords(testHarness);
-assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-}
-
-ref.get().finalizeGlobal(finalizationContext);
-Map content = getFileContentByPath(outputPath);
-assertThat(content.values())
-.containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-}
-
-private void writeUnorderedRecords(OneInputStreamOperatorTestHarness testHarness)
-throws Exception {
-testHarness.setup();
-testHarness.open();
-
-testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, "p1"), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p1"), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a2", 2, "p2"), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a3", 3, "p1"), 
1L));
+checkWriteAndCommit(
+false,
+false,
+false,
+new LinkedHashMap<>(),
+DEFAULT_INPUT_SUPPLIER,
+DEFAULT_OUTPUT_SUPPLIER);
 }
 
 @Test
 void testOverrideNonPartition() throws Exception {
 testNonPartition();
-
-AtomicReference> ref = new 
AtomicReference<>();
-try (OneInputStreamOperatorTestHarness testHarness =
-createSink(true, false, false, new LinkedHashMap<>(), ref)) {
-writeUnorderedRecords(testHarness);
-assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-}
-
-ref.get().finalizeGlobal(finalizationContext);
-Map content = getFileContentByPath(outputPath);
-assertThat(content).hasSize(1);
-assertThat(content.values())
-.containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
-assertThat(new File(tmpPath.toUri())).doesNotExist();
+checkWriteAndCommit(
+true,
+false,
+false,
+new LinkedHashMap<>(),
+DEFAULT_INPUT_SUPPLIER,
+DEFAULT_OUTPUT_SUPPLIER);
 }
 
 @Test
 void testStaticPartition() throws Exception {
-AtomicReference> ref = new 
AtomicReference<>();
 LinkedHashMap staticParts = new LinkedHashMap<>();
 staticParts.put("c", "p1");
-try (OneInputStreamOperatorTestHarness testHarness =
-createSink(false, true, false, staticParts, ref)) {
-testHarness.setup();
-testHarness.open();
-
-testHarness.processElement(new StreamRecord<>(Row.of("a1", 1), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a2", 2), 
1L));
-testHarness.processElement(new StreamRecord<>(Row.of("a3", 3), 
1L));
-assertThat(getFileContentByPath(tmpPath)).hasSize(1);
-}
 
-ref.get().finalizeGlobal(finalizationContext);
-Map content = getFileContentByPath(outputPath);
-assertThat(content).hasSize(1);
-
assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1");
-assertThat(content.values()).containsExactly("a1,1\n" + "a2,2\n" + 
"a2,2\n" + "a3,3\n");
-assertThat(new File(tmpPath.toUri())).doesNotExist();
+checkWriteAndCommit(
+false,
+true,
+false,
+staticParts,
+() ->
+Arrays.asList(
+new StreamRecord<>(Row.of("a1", 1), 1L),
+new StreamRecord<>(Row.of("a2", 2), 1L),
+new StreamRecord<>(Row.of("a2", 2), 1L),
+new StreamRecord<>(Row.of("a3", 3), 1L)),
+() ->
+Collections.singletonList(
+

Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-05 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1512605257


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/** Path utils for file system. */
+public class PathUtils {
+
+public static Path getStagingPath(Path path) {

Review Comment:
   I'm opting for option one as well. I didn't choose it in the first place 
because
   
   <1> Not very sure if it is tolerable to have duplicate logic scattered 
throughout the codebase.
   
   <2> I was hesitant about whether altering 
`FileSystemOutputFormat$Builder#setTempPath` to 
`FileSystemOutputFormat$Builder#setPath` might lead to any issues with 
dependencies. While I've manually reviewed the connector repositories and found 
no instances of the method being called, the impact on custom connectors 
remains unclear. Perhaps my concerns are a bit excessive.
   
   It appears that we're on the same page. I'll address the comment as we've 
discussed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-05 Thread via GitHub


XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1512374782


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/** Path utils for file system. */
+public class PathUtils {
+
+public static Path getStagingPath(Path path) {

Review Comment:
   1. Generate duplicate code for temporary file creation in 
`FileSystemOutputFormat` and `HiveTableSink`. (similar to the base code version 
of this PR) which should be ok because it's not really complex code that we're 
dealing with, here.
   2. Move the folder generation into a dedicated method that needs to be 
annotated as `@Public` to honor the ArchUnit tests. That would be a viable 
solution if we think that the temporary folder is in some means relevant to the 
user (e.g. is the naming scheme for the staging directory relevant so that it 
should be not changed between releases?). In that case, we should, indeed, 
annotate it as `@Public`.
   
   To me, it sounds like the staging directory generation is an internal 
matter. How important is it that the naming scheme is matched between the 
`FileSystemTableSink` and the `HiveTableSink`? I would lean towards making the 
temp file generation internal functionality that is independent between 
`FileSystemTableSink` and `HiveTableSink` (option 1). But I'm not familiar with 
this code to judge here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-04 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1512103738


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/** Path utils for file system. */
+public class PathUtils {
+
+public static Path getStagingPath(Path path) {

Review Comment:
   Hi @XComp, I'm afraid that this way also violates the arch unit test.
   
   Here's what I tested in the local.
   
   ```java
   public class PathUtils {
   
   public static Path getStagingPath(Path path) {
   // Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
   // Please see FLINK-29114 for more details
   return getStagingPath(
   path,
   () ->
   String.format(
   ".staging_%d_%s", 
System.currentTimeMillis(), UUID.randomUUID()));
   }
   
   @VisibleForTesting
   static Path getStagingPath(Path path, Supplier suffixSupplier) {
   Path stagingDir = path.suffix(suffixSupplier.get());
   try {
   FileSystem fs = stagingDir.getFileSystem();
   Preconditions.checkState(
   !fs.exists(stagingDir), "Staging dir %s already exists", 
stagingDir);
   fs.mkdirs(stagingDir);
   return stagingDir;
   } catch (IOException e) {
   throw new RuntimeException(e);
   }
   }
   }
   ```
   
   
   Run `ArchitectureTest` to get the following stacktrace. (The first rule has 
been violated before, so let's leave that.)
   
   ```
   java.lang.AssertionError: Architecture Violation [Priority: MEDIUM] - Rule 
'Connector production code must depend only on public API when outside of 
connector packages' was violated (2 times):
   Method 
 calls method 
 in (PathUtils.java:48)
   Method 
 is annotated with 
 in (PathUtils.java:0)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-04 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1512083846


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/utils/PathUtilsTest.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+/** Test for {@link PathUtils}. */
+public class PathUtilsTest {
+
+@Test
+void testUniqueStagingDirectory(@TempDir File tmpDir) {
+final Configuration config = new Configuration();
+config.set(FileSystemConnectorOptions.PATH, tmpDir.getAbsolutePath());
+assertThat(tmpDir.listFiles()).isEmpty();

Review Comment:
   You're right. We don't need this anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-04 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1512043134


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/** Path utils for file system. */
+public class PathUtils {
+
+public static Path getStagingPath(Path path) {
+// Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+// Please see FLINK-29114 for more details
+Path stagingDir =
+new Path(
+path,
+String.join(

Review Comment:
   Sorry, that's a typo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-04 Thread via GitHub


XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1511414346


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/** Path utils for file system. */
+public class PathUtils {
+
+public static Path getStagingPath(Path path) {
+// Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+// Please see FLINK-29114 for more details
+Path stagingDir =
+new Path(
+path,
+String.join(

Review Comment:
   I'm not sure whether that's actually what you want. The resulting String 
would be something like 
`.staging__1709568597254_2cf73aab-39a4-440a-b81c-216be9635bb8` (i.e. double 
underscore).



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/** Path utils for file system. */
+public class PathUtils {
+
+public static Path getStagingPath(Path path) {

Review Comment:
   ```suggestion
   
   public static Path getStagingPath(Path path) {
   return getStagingPath(
   path,
   () ->
   String.format(
   ".staging_%d_%s", 
System.currentTimeMillis(), UUID.randomUUID()));
   }
   
   @VisibleForTesting
   static Path getStagingPath(Path path, Supplier suffixSupplier) {
   ```
   The current test is "theoretically" flaky. We change that by using a 
callback and checking for the Precondition. That would enable us to create the 
following test:
   
   ```java
   @Test
   void testReusingStagingDirFails(@TempDir Path tmpDir) throws IOException 
{
   final String subfolderName = "directory-name";
   Files.createDirectory(tmpDir.resolve(subfolderName));
   assertThatThrownBy(
   () ->
   PathUtils.getStagingPath(
   
org.apache.flink.core.fs.Path.fromLocalFile(
   tmpDir.toFile()),
   () -> subfolderName))
   .isEqualTo(IllegalStateException.class);
   }
   ```
   And that would reveal that our assumption was actually wrong that `mkdirs` 
returns `false` if the directory already exists. :-/



##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/utils/PathUtilsTest.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional 

Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-03-04 Thread via GitHub


LadyForest commented on PR #24390:
URL: https://github.com/apache/flink/pull/24390#issuecomment-1976845985

   Hi @XComp, would you mind taking a look when you're available?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-29 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1508526295


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/utils/PathUtils.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.utils;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/** Path utils for file system. */
+public class PathUtils {

Review Comment:
   I originally planned to move the logic for generating the staging directory 
into `FileSystemOutputFormat`. However, when modifying the `HiveTableSink`, I 
found that `createBatchCompactSink` relies on tmpPath.
   
   I didn't want to introduce changes to other modules, so I ultimately opted 
for a compromise and extracted this utility method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-29 Thread via GitHub


XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1507274806


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -104,15 +113,101 @@ void testNonPartition() throws Exception {
 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+void testStagingDirBehavior(boolean shareStagingDir) throws Exception {
+// sink1
+AtomicReference> sinkRef1 = new 
AtomicReference<>();
+AtomicReference> fileToCommitRef1 = new 
AtomicReference<>();
+writeRowsToSink(
+sinkRef1,
+fileToCommitRef1,
+getStagingDir(shareStagingDir),
+Row.of("a1", 1, "x1"),
+Row.of("a2", 2, "x2"));
+
+// sink2
+AtomicReference> sinkRef2 = new 
AtomicReference<>();
+AtomicReference> fileToCommitRef2 = new 
AtomicReference<>();
+writeRowsToSink(
+sinkRef2,
+fileToCommitRef2,
+getStagingDir(shareStagingDir),
+Row.of("b1", 1, "y1"),
+Row.of("b2", 2, "y2"));
+
+assertSinkBehavior(sinkRef1, fileToCommitRef1, sinkRef2, 
fileToCommitRef2, shareStagingDir);
+}
+
+private void writeRowsToSink(
+AtomicReference> sinkRef,
+AtomicReference> contentRef,
+Path stagingDir,
+Row... records)
+throws Exception {
+try (OneInputStreamOperatorTestHarness testHarness =
+createSink(false, false, false, stagingDir, new 
LinkedHashMap<>(), sinkRef)) {
+writeUnorderedRecords(testHarness, Arrays.asList(records));
+
contentRef.set(getFileContentByPath(Paths.get(stagingDir.getPath(;
+}
+}
+
+private Path getStagingDir(boolean shareStagingDir) {
+String pathPostfix = 
FileSystemTableSink.getStagingPathPostfix(shareStagingDir);
+return Path.fromLocalFile(tmpPath.resolve(pathPostfix).toFile());
+}
+
+private void assertSinkBehavior(
+AtomicReference> sinkRef1,
+AtomicReference> fileToCommitRef1,
+AtomicReference> sinkRef2,
+AtomicReference> fileToCommitRef2,
+boolean shareStagingDir)
+throws Exception {
+Map fileToCommit1 = fileToCommitRef1.get();
+Map fileToCommit2 = fileToCommitRef2.get();
+assertThat(fileToCommit2.keySet()).allMatch(File::exists);
+if (shareStagingDir) {
+assertThat(fileToCommit1.keySet()).noneMatch(File::exists);
+} else {
+assertThat(fileToCommit1.keySet()).allMatch(File::exists);
+}
+sinkRef1.get().finalizeGlobal(finalizationContext);
+sinkRef2.get().finalizeGlobal(finalizationContext);
+Map committedFiles = getFileContentByPath(outputPath);
+if (shareStagingDir) {

Review Comment:
   You're understanding is correct. It's just that this test is actually 
testing `FileSystemOutputFormat` for a scenario where some other class (in our 
case `FileSystemTableSink`) has a bug.
   
   This test is fine if you think that `FileSystemOutputFormat` should be able 
to handle this case. But I'm wondering whether it would be more appropriate to 
fail in such a case where multiple instances are accessing the same folder (but 
there's the question how easily a `FileSystemOutputFormat` instance can detect 
something like that. 
   
   Anyway, resolving this test scenario easily indicates that the scope of the 
different classes is not well-defined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-29 Thread via GitHub


XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1507288792


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -104,15 +113,101 @@ void testNonPartition() throws Exception {
 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+void testStagingDirBehavior(boolean shareStagingDir) throws Exception {

Review Comment:
   I guess we're getting closer here. :-) I'm not that familiar with this part 
of the code. But as said earlier, a complicated verbose unit test is an 
indication for a non-optimal code structure. Also checking the interfaces' 
JavaDocs backs your proposal:
   
   For `DynamicTableSink` (which is what `FileSystemTableSink` implements):
   > Sink of a dynamic table to an external storage system.
   
   For `OutputFormat` (which is what `FileSystemOutputFormat` implements):
   > The base interface for outputs that consumes records. The output format 
describes how to store the final records, for example in a file.
   
   The key sentence here is `The output format describes how to store the final 
records, for example in a file.`. The "how" should include the "where", i.e. 
the `OutputFormat` implementation should be in charge of creating the staging 
folder and, therefore, would be also in charge of create a folder that is not 
used by any other `OutputFormat` instance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-29 Thread via GitHub


LadyForest commented on PR #24390:
URL: https://github.com/apache/flink/pull/24390#issuecomment-1970712539

   Actually, after looking at the code for `HiveTableSink` 
https://github.com/apache/flink/blob/9e81177e44d63501b360b1a246a16ea3faecb548/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java#L791,
 I am even more convinced that we should move the logic for generating the 
staging directory to the internal of the `OutputFormat`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-29 Thread via GitHub


LadyForest commented on PR #24390:
URL: https://github.com/apache/flink/pull/24390#issuecomment-1970685919

   What do you think if we move `toStagingPath` to `FileSystemOutputFormat`?
   Sth. like
   ```java
   private FileSystemOutputFormat(
   FileSystemFactory fsFactory,
   TableMetaStoreFactory msFactory,
   boolean overwrite,
   boolean isToLocal,
   +  Path path,
   -  Path tmpPath,
   String[] partitionColumns,
   boolean dynamicGrouped,
   LinkedHashMap staticPartitions,
   OutputFormatFactory formatFactory,
   PartitionComputer computer,
   OutputFileConfig outputFileConfig,
   ObjectIdentifier identifier,
   PartitionCommitPolicyFactory partitionCommitPolicyFactory) {
   this.fsFactory = fsFactory;
   this.msFactory = msFactory;
   this.overwrite = overwrite;
   this.isToLocal = isToLocal;
   +  this.path = path;
   +  this.tmpPath = toStagingPath();
   this.partitionColumns = partitionColumns;
   this.dynamicGrouped = dynamicGrouped;
   this.staticPartitions = staticPartitions;
   this.formatFactory = formatFactory;
   this.computer = computer;
   this.outputFileConfig = outputFileConfig;
   this.identifier = identifier;
   this.partitionCommitPolicyFactory = partitionCommitPolicyFactory;
   }
   
   private Path toStagingPath() {
   // Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
   // Please see FLINK-29114 for more details
   Path stagingDir =
   new Path(
   path,
   String.join(
   "_",
   ".staging_",
   String.valueOf(System.currentTimeMillis()),
   UUID.randomUUID().toString()));
   try {
   FileSystem fs = stagingDir.getFileSystem();
   Preconditions.checkState(
   fs.mkdirs(stagingDir), "Failed to create staging dir " + 
stagingDir);
   return stagingDir;
   } catch (IOException e) {
   throw new RuntimeException(e);
   }
   }
   
   @VisibleForTesting
   public Path getTmpPath() {
   return tmpPath;
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-29 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1507204553


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java:
##
@@ -386,6 +389,11 @@ private Path toStagingPath() {
 }
 }
 
+@VisibleForTesting

Review Comment:
   Yes, just noticed that rule...Let me think about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-29 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1507195791


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -104,15 +113,101 @@ void testNonPartition() throws Exception {
 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+void testStagingDirBehavior(boolean shareStagingDir) throws Exception {

Review Comment:
   > Generally speaking, unit tests should be as small as possible to focus on 
the specific aspect of the class contract (in this case 
`FileSystemTableSink#toStagingPath` not returning the same folder twice. This 
test adds quite a bit of extra context here which might make it harder to grasp 
the intention for code readers. What's your opinion on that?
   
   +1 with your perspective on unit testing. As I previously mentioned, in this 
case, constructing a UT is quite challenging, which subtly suggests to me that 
there may be some design issues with the current code. Take the staging dir, 
for example, if it needs to be passed from the `FileSystemTableSink` to the 
`OutputFormat` at compile-time, then a proper validation should be done here. 
Conversely, perhaps the `OutputFormat` could decide on the stagingDir itself, 
which would make the testing more targeted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-29 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1507187446


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -104,15 +113,101 @@ void testNonPartition() throws Exception {
 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+void testStagingDirBehavior(boolean shareStagingDir) throws Exception {
+// sink1
+AtomicReference> sinkRef1 = new 
AtomicReference<>();
+AtomicReference> fileToCommitRef1 = new 
AtomicReference<>();
+writeRowsToSink(
+sinkRef1,
+fileToCommitRef1,
+getStagingDir(shareStagingDir),
+Row.of("a1", 1, "x1"),
+Row.of("a2", 2, "x2"));
+
+// sink2
+AtomicReference> sinkRef2 = new 
AtomicReference<>();
+AtomicReference> fileToCommitRef2 = new 
AtomicReference<>();
+writeRowsToSink(
+sinkRef2,
+fileToCommitRef2,
+getStagingDir(shareStagingDir),
+Row.of("b1", 1, "y1"),
+Row.of("b2", 2, "y2"));
+
+assertSinkBehavior(sinkRef1, fileToCommitRef1, sinkRef2, 
fileToCommitRef2, shareStagingDir);
+}
+
+private void writeRowsToSink(
+AtomicReference> sinkRef,
+AtomicReference> contentRef,
+Path stagingDir,
+Row... records)
+throws Exception {
+try (OneInputStreamOperatorTestHarness testHarness =
+createSink(false, false, false, stagingDir, new 
LinkedHashMap<>(), sinkRef)) {
+writeUnorderedRecords(testHarness, Arrays.asList(records));
+
contentRef.set(getFileContentByPath(Paths.get(stagingDir.getPath(;
+}
+}
+
+private Path getStagingDir(boolean shareStagingDir) {
+String pathPostfix = 
FileSystemTableSink.getStagingPathPostfix(shareStagingDir);
+return Path.fromLocalFile(tmpPath.resolve(pathPostfix).toFile());
+}
+
+private void assertSinkBehavior(
+AtomicReference> sinkRef1,
+AtomicReference> fileToCommitRef1,
+AtomicReference> sinkRef2,
+AtomicReference> fileToCommitRef2,
+boolean shareStagingDir)
+throws Exception {
+Map fileToCommit1 = fileToCommitRef1.get();
+Map fileToCommit2 = fileToCommitRef2.get();
+assertThat(fileToCommit2.keySet()).allMatch(File::exists);
+if (shareStagingDir) {
+assertThat(fileToCommit1.keySet()).noneMatch(File::exists);
+} else {
+assertThat(fileToCommit1.keySet()).allMatch(File::exists);
+}
+sinkRef1.get().finalizeGlobal(finalizationContext);
+sinkRef2.get().finalizeGlobal(finalizationContext);
+Map committedFiles = getFileContentByPath(outputPath);
+if (shareStagingDir) {

Review Comment:
   I understand it this way, and please correct me if I'm wrong: When verifying 
the correctness of a fix, it's common to build some test cases to reliably 
reproduce the issue, then propose a solution to address that problem, and rerun 
the tests to ensure that the fix is appropriate and has not introduced any new 
issues. Coming back to this case, since `System.currentTimeMillis()` is a 
non-deterministic function in the original method, in order to reproduce the 
issue stably, it is necessary to make it return a constant value, which is why 
an additional boolean variable was introduced. However, after noticing your 
comments in another pull request, I think your approach is more reasonable, as 
it would throw an exception at compile time rather than delaying the problem to 
runtime, which could cause unexpected consequences.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-29 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1507172607


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java:
##
@@ -374,7 +375,9 @@ public DynamicTableSource.DataStructureConverter 
createDataStructureConverter(
 }
 
 private Path toStagingPath() {
-Path stagingDir = new Path(path, ".staging_" + 
System.currentTimeMillis());
+// Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+// Please see FLINK-29114 for more details
+Path stagingDir = new Path(path, ".staging_" + 
getStagingPathPostfix(false));
 try {
 FileSystem fs = stagingDir.getFileSystem();
 Preconditions.checkState(

Review Comment:
   Sorry, I just noticed that message. Agreed with you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-29 Thread via GitHub


XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1507150180


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java:
##
@@ -386,6 +389,11 @@ private Path toStagingPath() {
 }
 }
 
+@VisibleForTesting

Review Comment:
   [Looks 
like](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57965=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=23032)
 the architecture test don't allow this. :thinking: 



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java:
##
@@ -374,7 +375,9 @@ public DynamicTableSource.DataStructureConverter 
createDataStructureConverter(
 }
 
 private Path toStagingPath() {
-Path stagingDir = new Path(path, ".staging_" + 
System.currentTimeMillis());
+// Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+// Please see FLINK-29114 for more details
+Path stagingDir = new Path(path, ".staging_" + 
getStagingPathPostfix(false));
 try {
 FileSystem fs = stagingDir.getFileSystem();
 Preconditions.checkState(

Review Comment:
   The precondition should be updated. Or did I miss something and you don't 
agree with [what we discussed in the other 
PR](https://github.com/LadyForest/flink/pull/1#discussion_r1505972482)?



##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -104,15 +113,101 @@ void testNonPartition() throws Exception {
 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+void testStagingDirBehavior(boolean shareStagingDir) throws Exception {

Review Comment:
   Generally speaking, unit tests should be as small as possible to focus on 
the specific aspect of the class contract (in this case 
`FileSystemTableSink#toStagingPath` not returning the same folder twice. This 
test adds quite a bit of extra context here which might make it harder to grasp 
the intention for code readers. What's your opinion on that?



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java:
##
@@ -386,6 +389,11 @@ private Path toStagingPath() {
 }
 }
 
+@VisibleForTesting
+static String getStagingPathPostfix(boolean constant) {

Review Comment:
   You introduce the parameter `constant` to make the test parameterizable. But 
ideally, you don't want to make production code "to please test code". I feel 
like this `constant` parameter is not necessary (because the production code 
only uses `false`). The decision whether you return a fixed suffix (or if you 
follow the comment above to return the actual path, instead: a fixed path) or 
the random suffix should live in the test code. WDYT?



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java:
##
@@ -374,7 +375,9 @@ public DynamicTableSource.DataStructureConverter 
createDataStructureConverter(
 }
 
 private Path toStagingPath() {
-Path stagingDir = new Path(path, ".staging_" + 
System.currentTimeMillis());
+// Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+// Please see FLINK-29114 for more details
+Path stagingDir = new Path(path, ".staging_" + 
getStagingPathPostfix(false));

Review Comment:
   Why did you decide on creating the `getStagingPathPostfix` method? This 
creates unnecessary code in the test because we still have to generate the Path 
there. Alternatively, you could move the entire content of `toStagingPath()` 
into a static method `generateStagingPath(Path parent, boolean constant)`. That 
way you just need to call the static method in the test without any path 
resolution.



##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -104,15 +113,101 @@ void testNonPartition() throws Exception {
 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+void testStagingDirBehavior(boolean shareStagingDir) throws Exception {
+// sink1
+AtomicReference> sinkRef1 = new 
AtomicReference<>();
+AtomicReference> fileToCommitRef1 = new 
AtomicReference<>();
+writeRowsToSink(
+sinkRef1,
+fileToCommitRef1,
+getStagingDir(shareStagingDir),
+Row.of("a1", 1, "x1"),
+Row.of("a2", 2, "x2"));
+
+// sink2

Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-28 Thread via GitHub


LadyForest commented on PR #24390:
URL: https://github.com/apache/flink/pull/24390#issuecomment-1970441742

   Hi @snuyanzin @XComp, thanks for your comments on the review. I updated the 
test (actually not perfect either..) and tried best to verify the case. Would 
you mind retaking a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-28 Thread via GitHub


XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1505976015


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##


Review Comment:
   Thanks for clarifying things. I tried to come up with a proposal for a test. 
But to be fair, even that one is not perfect (you want to avoid weakening the 
accessibility of production code for tests). It hard to achieve in other ways 
because of the way the classes are structured (as you already pointed out). 
Doing it properly would require a larger refactoring.
   
   Have a look at LadyForest/flink#1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-28 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1505887655


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java:
##
@@ -374,7 +374,10 @@ public DynamicTableSource.DataStructureConverter 
createDataStructureConverter(
 }
 
 private Path toStagingPath() {
-Path stagingDir = new Path(path, ".staging_" + 
System.currentTimeMillis());
+// Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+// Please see FLINK-29114 for more details
+Path stagingDir =
+new Path(path, ".staging_" + UUID.randomUUID() + 
System.currentTimeMillis());

Review Comment:
   Sorry for the misleading. I meant to say "my original intention."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-28 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1505879003


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##


Review Comment:
   How can we effectively test and reproduce a sporadic runtime issue caused by 
improperly configured parameters at the compile phase in the unit test case? Do 
you have any good suggestions?樂



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-28 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1505871123


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##


Review Comment:
   > As said, reverting your fix in `FileSystemTableSink` doesn't make any of 
the tests fail.
   
   This is because the test does not call any method defined 
`FileSystemTableSink`. However, it simulates the condition where two sinks 
share the same stagingDir which is denoted by the variable `tmpPath`.
   
   Do you think it would be a good idea if I made 
FileSystemTableSink#toStagingPath a static method, added the VisibleForTesting 
annotation, and called it in FileSystemOutputFormatTest?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-28 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1505864009


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##


Review Comment:
   > > [...] the issue is triggered when the 
FileSystemOutputFormat#finalizeGlobal method is called (triggered by the final 
block).
   > 
   > I cannot confirm that. The change code segment isn't covered by the newly 
added tests. As said, reverting your fix in `FileSystemTableSink` doesn't make 
any of the tests fail.
   
   You can verify the root cause by the following steps.
   
   1. temporarily make the toStagingPath a static method and modify the test 
case like the following snippet.
   ```java
   @Test
   void testMultiSinkWriteToSameOutputPathUsingSameStagingDir() throws 
Exception {
   // sink1
   AtomicReference> sinkRef1 = new 
AtomicReference<>();
   AtomicReference fileToCommitRef1 = new AtomicReference<>();
   AtomicReference contentRef1 = new AtomicReference<>();
   try (OneInputStreamOperatorTestHarness testHarness =
   createSink(
   false,
   false,
   false,
   
FileSystemTableSink.toStagingPath(Path.fromLocalFile(tmpPath.toFile())),
   new LinkedHashMap<>(),
   sinkRef1)) {
   writeUnorderedRecords(testHarness);
   Map content = getFileContentByPath(tmpPath);
   assertThat(content).hasSize(1);
   fileToCommitRef1.set(new ArrayList<>(content.keySet()).get(0));
   contentRef1.set(new ArrayList<>(content.values()).get(0));
   }
   
   // sink2
   AtomicReference> sinkRef2 = new 
AtomicReference<>();
   AtomicReference fileToCommitRef2 = new AtomicReference<>();
   AtomicReference contentRef2 = new AtomicReference<>();
   try (OneInputStreamOperatorTestHarness testHarness =
   createSink(
   false,
   false,
   false,
   
FileSystemTableSink.toStagingPath(Path.fromLocalFile(tmpPath.toFile())),
   new LinkedHashMap<>(),
   sinkRef2)) {
   // write different content
   List records =
   Arrays.asList(
   Row.of("b1", 1, "x1"), Row.of("b2", 2, "x2"), 
Row.of("b3", 3, "x3"));
   writeUnorderedRecords(testHarness, records);
   Map content = getFileContentByPath(tmpPath);
   assertThat(content).hasSize(1);
   fileToCommitRef2.set(new ArrayList<>(content.keySet()).get(0));
   contentRef2.set(new ArrayList<>(content.values()).get(0));
   }
   
   File fileToCommit1 = fileToCommitRef1.get();
   File fileToCommit2 = fileToCommitRef2.get();
   // because sink1 and sink2 are writing to the same staging dir
   // the file generated by sink1 has been overwritten by sink2
   
assertThat(fileToCommit1.getParent()).isEqualTo(fileToCommit2.getParent());
   assertThat(fileToCommit1).doesNotExist();
   
   // lost records
   assertThat(contentRef1.get())
   .isEqualTo("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
   
   assertThat(fileToCommit2).exists();
   assertThat(contentRef2.get()).isEqualTo("b1,1,x1\n" + "b2,2,x2\n" + 
"b3,3,x3\n");
   
   // let sink1 commit, and it actually commits the file generated by 
sink2
   sinkRef1.get().finalizeGlobal(finalizationContext);
   
assertThat(getFileContentByPath(outputPath).values()).containsExactly(contentRef2.get());
   
   // sink2 will commit nothing
   sinkRef2.get().finalizeGlobal(finalizationContext);
   
assertThat(getFileContentByPath(outputPath).values()).containsExactly(contentRef2.get());
   }
   ```
   If you run this case, it will fail because the staging dir generated by 
`FileSystemTableSink.toStagingPath` never returns the same path, and no data 
loss happens.
   
   
   2. Then, you change the code to return a constant path and rerun the 
previous case again; it'll be a success, which indicates one file is 
overwritten by another.
   ```java
   @VisibleForTesting
   public static Path toStagingPath(Path path) {
   // return a constant path to reproduce the issue
   Path stagingDir =
   new Path(path, ".staging_foo");
  ...
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-28 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1505838606


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##


Review Comment:
   Personally, I believe the crux of the problem is that 
`FileSystemTableSink#toStagingPath` acts like a remote time bomb, which only 
detonates within `FileSystemOutputFormat` when multiple sinks write to the same 
physical table and create the staging directory at the exact same time. Because 
it spans multiple components, constructing a unit test is quite challenging. 
Another approach would be to add an ITCase (in fact, that unstable case could, 
to some extent, be considered an ITCase for this issue).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-28 Thread via GitHub


XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1505832996


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##


Review Comment:
   > [...] the issue is triggered when the 
FileSystemOutputFormat#finalizeGlobal method is called (triggered by the final 
block).
   
   I cannot confirm that. The change code segment isn't covered by the newly 
added tests. As said, reverting your fix in `FileSystemTableSink` doesn't make 
any of the tests fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-28 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1505822872


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##


Review Comment:
   The reason I added tests in `FileSystemOutputFormatTest` is that the issue 
is triggered when the `FileSystemOutputFormat#finalizeGlobal` method is called 
(triggered by the final block).
   
   If we add tests only for `FileSystemTableSink`, we won't be able to 
reproduce the issue and verify the fix. Do you think it would be a good idea if 
I made `FileSystemTableSink#toStagingPath` a static method, added the 
`VisibleForTesting` annotation, and called it in `FileSystemOutputFormatTest`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-28 Thread via GitHub


XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1505818193


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java:
##
@@ -374,7 +374,10 @@ public DynamicTableSource.DataStructureConverter 
createDataStructureConverter(
 }
 
 private Path toStagingPath() {
-Path stagingDir = new Path(path, ".staging_" + 
System.currentTimeMillis());
+// Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+// Please see FLINK-29114 for more details
+Path stagingDir =
+new Path(path, ".staging_" + UUID.randomUUID() + 
System.currentTimeMillis());

Review Comment:
   The last two sentences are contradicting. ;-) But I'm fine if you think that 
the timestamp is helpful. But you might want to add a separator between the 
time and the UUID. And you might want to put the timestamp before the UUID if 
you think it's helpful for troubleshooting (for the sake of ordering by file 
name).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-28 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1505814697


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java:
##
@@ -374,7 +374,10 @@ public DynamicTableSource.DataStructureConverter 
createDataStructureConverter(
 }
 
 private Path toStagingPath() {
-Path stagingDir = new Path(path, ".staging_" + 
System.currentTimeMillis());
+// Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+// Please see FLINK-29114 for more details
+Path stagingDir =
+new Path(path, ".staging_" + UUID.randomUUID() + 
System.currentTimeMillis());

Review Comment:
   Sorry, I missed the comment.  I intend to keep it to ease troubleshooting. 
I'm okay with removing it since it's redundant anyway.
   
   



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java:
##
@@ -374,7 +374,10 @@ public DynamicTableSource.DataStructureConverter 
createDataStructureConverter(
 }
 
 private Path toStagingPath() {
-Path stagingDir = new Path(path, ".staging_" + 
System.currentTimeMillis());
+// Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+// Please see FLINK-29114 for more details
+Path stagingDir =
+new Path(path, ".staging_" + UUID.randomUUID() + 
System.currentTimeMillis());

Review Comment:
   I intend to keep it to ease troubleshooting. I'm okay with removing it since 
it's redundant anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-28 Thread via GitHub


LadyForest commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1505814186


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java:
##
@@ -374,7 +374,10 @@ public DynamicTableSource.DataStructureConverter 
createDataStructureConverter(
 }
 
 private Path toStagingPath() {
-Path stagingDir = new Path(path, ".staging_" + 
System.currentTimeMillis());
+// Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+// Please see FLINK-29114 for more details
+Path stagingDir =
+new Path(path, ".staging_" + UUID.randomUUID() + 
System.currentTimeMillis());

Review Comment:
   I intend to keep it to ease troubleshooting. I'm okay with removing it since 
it's redundant anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-28 Thread via GitHub


snuyanzin commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1505790506


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##


Review Comment:
   yes, confirm, it seems the current tests do not invoke 
`FileSystemTableSink#toStagingPath` in either way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-28 Thread via GitHub


XComp commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1505770714


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##


Review Comment:
   I guess, I'm not the right person to judge here because the connector 
codebase is not really an area where I have much knowledge. But the test you 
added doesn't cover the fix (neither does reverting the fix of this PR make any 
of the newly added tests fail nor does any of the tests call the method in 
question `FileSystemTableSink#toStagingPath`).
   
   That is where I'm wondering what the value of the added tests are. Or am I 
missing something here? :thinking:
   
   



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java:
##
@@ -374,7 +374,10 @@ public DynamicTableSource.DataStructureConverter 
createDataStructureConverter(
 }
 
 private Path toStagingPath() {
-Path stagingDir = new Path(path, ".staging_" + 
System.currentTimeMillis());
+// Add a random UUID to prevent multiple sinks from sharing the same 
staging dir.
+// Please see FLINK-29114 for more details
+Path stagingDir =
+new Path(path, ".staging_" + UUID.randomUUID() + 
System.currentTimeMillis());

Review Comment:
   ```suggestion
   new Path(path, ".staging_" + UUID.randomUUID());
   ```
   
   Maybe, you missed [my previous 
comment](https://github.com/apache/flink/pull/24390/files#r1503917055) but 
shouldn't we be able to only use the UUID here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory [flink]

2024-02-28 Thread via GitHub


snuyanzin commented on code in PR #24390:
URL: https://github.com/apache/flink/pull/24390#discussion_r1505763661


##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -104,15 +110,139 @@ void testNonPartition() throws Exception {
 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
 }
 
+@Test
+void testMultiSinkWriteToSameOutputPathUsingSameStagingDir() throws 
Exception {
+// sink1
+AtomicReference> sinkRef1 = new 
AtomicReference<>();
+AtomicReference fileToCommitRef1 = new AtomicReference<>();
+AtomicReference contentRef1 = new AtomicReference<>();
+try (OneInputStreamOperatorTestHarness testHarness =
+createSink(false, false, false, new LinkedHashMap<>(), 
sinkRef1)) {
+writeUnorderedRecords(testHarness);
+Map content = getFileContentByPath(tmpPath);
+assertThat(content).hasSize(1);
+fileToCommitRef1.set(new ArrayList<>(content.keySet()).get(0));
+contentRef1.set(new ArrayList<>(content.values()).get(0));
+}
+
+// sink2
+AtomicReference> sinkRef2 = new 
AtomicReference<>();
+AtomicReference fileToCommitRef2 = new AtomicReference<>();
+AtomicReference contentRef2 = new AtomicReference<>();
+try (OneInputStreamOperatorTestHarness testHarness =
+createSink(false, false, false, new LinkedHashMap<>(), 
sinkRef2)) {
+// write different content
+List records =
+Arrays.asList(
+Row.of("b1", 1, "x1"), Row.of("b2", 2, "x2"), 
Row.of("b3", 3, "x3"));
+writeUnorderedRecords(testHarness, records);
+Map content = getFileContentByPath(tmpPath);
+assertThat(content).hasSize(1);
+fileToCommitRef2.set(new ArrayList<>(content.keySet()).get(0));
+contentRef2.set(new ArrayList<>(content.values()).get(0));
+}
+
+File fileToCommit1 = fileToCommitRef1.get();
+File fileToCommit2 = fileToCommitRef2.get();
+// because sink1 and sink2 are writing to the same staging dir
+// the file generated by sink1 has been overwritten by sink2
+
assertThat(fileToCommit1.getParent()).isEqualTo(fileToCommit2.getParent());
+assertThat(fileToCommit1).doesNotExist();

Review Comment:
   ```suggestion
   
assertThat(fileToCommit1).hasParent(fileToCommit2.getParent()).doesNotExist();
   ```
   could be merged and simlified a bit



##
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java:
##
@@ -104,15 +110,139 @@ void testNonPartition() throws Exception {
 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + 
"a3,3,p1\n");
 }
 
+@Test
+void testMultiSinkWriteToSameOutputPathUsingSameStagingDir() throws 
Exception {
+// sink1
+AtomicReference> sinkRef1 = new 
AtomicReference<>();
+AtomicReference fileToCommitRef1 = new AtomicReference<>();
+AtomicReference contentRef1 = new AtomicReference<>();
+try (OneInputStreamOperatorTestHarness testHarness =
+createSink(false, false, false, new LinkedHashMap<>(), 
sinkRef1)) {
+writeUnorderedRecords(testHarness);
+Map content = getFileContentByPath(tmpPath);
+assertThat(content).hasSize(1);
+fileToCommitRef1.set(new ArrayList<>(content.keySet()).get(0));
+contentRef1.set(new ArrayList<>(content.values()).get(0));
+}
+
+// sink2
+AtomicReference> sinkRef2 = new 
AtomicReference<>();
+AtomicReference fileToCommitRef2 = new AtomicReference<>();
+AtomicReference contentRef2 = new AtomicReference<>();
+try (OneInputStreamOperatorTestHarness testHarness =
+createSink(false, false, false, new LinkedHashMap<>(), 
sinkRef2)) {
+// write different content
+List records =
+Arrays.asList(
+Row.of("b1", 1, "x1"), Row.of("b2", 2, "x2"), 
Row.of("b3", 3, "x3"));
+writeUnorderedRecords(testHarness, records);
+Map content = getFileContentByPath(tmpPath);
+assertThat(content).hasSize(1);
+fileToCommitRef2.set(new ArrayList<>(content.keySet()).get(0));
+contentRef2.set(new ArrayList<>(content.values()).get(0));
+}
+
+File fileToCommit1 = fileToCommitRef1.get();
+File fileToCommit2 = fileToCommitRef2.get();
+// because sink1 and sink2 are writing to the same staging dir
+// the file generated by sink1 has been overwritten by sink2
+
assertThat(fileToCommit1.getParent()).isEqualTo(fileToCommit2.getParent());
+