Looping in @Kostas Kloudas <kklou...@apache.org> who should be able to clarify things.
David On Wed, Oct 7, 2020 at 7:12 PM Dan Diephouse <d...@netzooid.com> wrote: > Thanks! Completely missed that in the docs. It's now working, however it's > not working with compression writers. Someone else noted this issue here: > > > https://stackoverflow.com/questions/62138635/flink-streaming-compression-not-working-using-amazon-aws-s3-connector-streaming > > Looking at the code, I'm not sure I follow the nuances of why sync() > doesn't just do a call to flush in RefCountedBufferingFileStream: > > public void sync() throws IOException { > throw new UnsupportedOperationException("S3RecoverableFsDataOutputStream > cannot sync state to S3. " + > "Use persist() to create a persistent recoverable intermediate point."); > } > > If there are any pointers here on what should happen, happy to submit a > patch. > > > > > On Wed, Oct 7, 2020 at 1:37 AM David Anderson <da...@alpinegizmo.com> > wrote: > >> Dan, >> >> The first point you've raised is a known issue: When a job is stopped, >> the unfinished part files are not transitioned to the finished state. This >> is mentioned in the docs as Important Note 2 [1], and fixing this is >> waiting on FLIP-46 [2]. That section of the docs also includes some >> S3-specific warnings, but nothing pertaining to managing credentials. >> Perhaps [3] will help. >> >> Regards, >> David >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#general >> [2] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs >> [3] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials >> >> >> On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse <d...@netzooid.com> wrote: >> >>> First, let me say, Flink is super cool - thanks everyone for making my >>> life easier in a lot of ways! Wish I had this 10 years ago.... >>> >>> Onto the fun stuff: I am attempting to use the StreamingFileSink with >>> S3. Note that Flink is embedded in my app, not running as a standalone >>> cluster. >>> >>> I am having a few problems, which I have illustrated in the small test >>> case below. >>> >>> 1) After my job finishes, data never gets committed to S3. Looking >>> through the code, I've noticed that data gets flushed to disk, but the >>> multi-part upload is never finished. Even though my data doesn't hit the >>> min part size, I would expect that if my job ends, my data should get >>> uploaded since the job is 100% done. >>> >>> I am also having problems when the job is running not uploading - but I >>> haven't been able to distill that down to a simple test case, so I thought >>> I'd start here. >>> >>> 2) The S3 Filesystem does not pull credentials from the Flink >>> Configuration when running in embedded mode. I have a workaround for this, >>> but it is ugly. If you comment out the line in the test case which talks >>> about this workaround, you will end up with a "Java.net.SocketException: >>> Host is down" >>> >>> Can anyone shed light on these two issues? Thanks! >>> >>> import org.apache.flink.api.common.serialization.SimpleStringEncoder; >>> import org.apache.flink.configuration.Configuration; >>> import org.apache.flink.core.fs.FileSystem; >>> import org.apache.flink.core.fs.Path; >>> import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; >>> import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; >>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import >>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; >>> import org.junit.jupiter.api.Test; >>> >>> public class S3Test { >>> @Test >>> public void whyDoesntThisWork() throws Exception { >>> Configuration configuration = new Configuration(); >>> configuration.setString("state.backend", >>> MemoryStateBackendFactory.class.getName()); >>> configuration.setString("s3.access.key", "****"); >>> configuration.setString("s3.secret.key", "****"); >>> >>> // If I don't do this, the S3 filesystem never gets the >>> credentials >>> FileSystem.initialize(configuration, null); >>> >>> LocalStreamEnvironment env = >>> StreamExecutionEnvironment.createLocalEnvironment(1, configuration); >>> >>> StreamingFileSink<String> s3 = StreamingFileSink >>> .forRowFormat(new Path("s3://bucket/"), new >>> SimpleStringEncoder<String>()) >>> .build(); >>> >>> env.fromElements("string1", "string2") >>> .addSink(s3); >>> >>> env.execute(); >>> >>> System.out.println("Done"); >>> } >>> } >>> >>> >>> -- >>> Dan Diephouse >>> @dandiep >>> >> > > -- > Dan Diephouse > @dandiep >