Looping in @Kostas Kloudas <kklou...@apache.org> who should be able to
clarify things.

David

On Wed, Oct 7, 2020 at 7:12 PM Dan Diephouse <d...@netzooid.com> wrote:

> Thanks! Completely missed that in the docs. It's now working, however it's
> not working with compression writers. Someone else noted this issue here:
>
>
> https://stackoverflow.com/questions/62138635/flink-streaming-compression-not-working-using-amazon-aws-s3-connector-streaming
>
> Looking at the code, I'm not sure I follow the nuances of why sync()
> doesn't just do a call to flush in RefCountedBufferingFileStream:
>
> public void sync() throws IOException {
> throw new UnsupportedOperationException("S3RecoverableFsDataOutputStream
> cannot sync state to S3. " +
> "Use persist() to create a persistent recoverable intermediate point.");
> }
>
> If there are any pointers here on what should happen, happy to submit a
> patch.
>
>
>
>
> On Wed, Oct 7, 2020 at 1:37 AM David Anderson <da...@alpinegizmo.com>
> wrote:
>
>> Dan,
>>
>> The first point you've raised is a known issue: When a job is stopped,
>> the unfinished part files are not transitioned to the finished state. This
>> is mentioned in the docs as Important Note 2 [1], and fixing this is
>> waiting on FLIP-46 [2]. That section of the docs also includes some
>> S3-specific warnings, but nothing pertaining to managing credentials.
>> Perhaps [3] will help.
>>
>> Regards,
>> David
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#general
>> [2]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials
>>
>>
>> On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse <d...@netzooid.com> wrote:
>>
>>> First, let me say, Flink is super cool - thanks everyone for making my
>>> life easier in a lot of ways! Wish I had this 10 years ago....
>>>
>>> Onto the fun stuff: I am attempting to use the StreamingFileSink with
>>> S3. Note that Flink is embedded in my app, not running as a standalone
>>> cluster.
>>>
>>> I am having a few problems, which I have illustrated in the small test
>>> case below.
>>>
>>> 1) After my job finishes, data never gets committed to S3. Looking
>>> through the code, I've noticed that data gets flushed to disk, but the
>>> multi-part upload is never finished. Even though my data doesn't hit the
>>> min part size, I would expect that if my job ends, my data should get
>>> uploaded since the job is 100% done.
>>>
>>> I am also having problems when the job is running not uploading - but I
>>> haven't been able to distill that down to a simple test case, so I thought
>>> I'd start here.
>>>
>>> 2) The S3 Filesystem does not pull credentials from the Flink
>>> Configuration when running in embedded mode. I have a workaround for this,
>>> but it is ugly. If you comment out the line in the test case which talks
>>> about this workaround, you will end up with a "Java.net.SocketException:
>>> Host is down"
>>>
>>> Can anyone shed light on these two issues? Thanks!
>>>
>>> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
>>> import org.apache.flink.configuration.Configuration;
>>> import org.apache.flink.core.fs.FileSystem;
>>> import org.apache.flink.core.fs.Path;
>>> import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
>>> import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import
>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>>> import org.junit.jupiter.api.Test;
>>>
>>> public class S3Test {
>>>     @Test
>>>     public void whyDoesntThisWork() throws Exception {
>>>         Configuration configuration = new Configuration();
>>>         configuration.setString("state.backend",
>>> MemoryStateBackendFactory.class.getName());
>>>         configuration.setString("s3.access.key", "****");
>>>         configuration.setString("s3.secret.key", "****");
>>>
>>>         // If I don't do this, the S3 filesystem never gets the
>>> credentials
>>>         FileSystem.initialize(configuration, null);
>>>
>>>         LocalStreamEnvironment env =
>>> StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
>>>
>>>         StreamingFileSink<String> s3 = StreamingFileSink
>>>                 .forRowFormat(new Path("s3://bucket/"), new
>>> SimpleStringEncoder<String>())
>>>                 .build();
>>>
>>>         env.fromElements("string1", "string2")
>>>             .addSink(s3);
>>>
>>>         env.execute();
>>>
>>>         System.out.println("Done");
>>>     }
>>> }
>>>
>>>
>>> --
>>> Dan Diephouse
>>> @dandiep
>>>
>>
>
> --
> Dan Diephouse
> @dandiep
>

Reply via email to