Hey Kostas,

We’re a little bit off from a 1.10 update but I can certainly see if that
CompressWriterFactory might solve my use case for when we do.

If there is anything I can do to help document that feature, please let me
know.

Thanks!

Austin

On Wed, Mar 4, 2020 at 4:58 AM Kostas Kloudas <kklou...@apache.org> wrote:

> Hi Austin,
>
> I will have a look at your repo. In the meantime, given that [1] is
> already merged in 1.10,
> would upgrading to 1.10 and using the newly introduced
> CompressWriterFactory be an option for you?
>
> It is unfortunate that this feature was not documented.
>
> Cheers,
> Kostas
>
> [1] https://issues.apache.org/jira/browse/FLINK-13634
>
>
> On Tue, Mar 3, 2020 at 11:13 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi all,
>>
>> Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy
>> Kostas -- strange though, as I wasn't using a bounded source when I first
>> ran into this issue. I have updated the example repo to use an unbounded
>> source[1], and the same file corruption problems remain.
>>
>> Anything else I could be doing wrong with the compression stream?
>>
>> Thanks again,
>> Austin
>>
>> [1]:
>> https://github.com/austince/flink-streaming-file-sink-compression/tree/unbounded
>>
>> On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas <kklou...@apache.org>
>> wrote:
>>
>>> Hi Austin and Rafi,
>>>
>>> @Rafi Thanks for providing the pointers!
>>> Unfortunately there is no progress on the FLIP (or the issue).
>>>
>>> @ Austin In the meantime, what you could do --assuming that your input
>>> is bounded --  you could simply not stop the job after the whole input is
>>> processed, then wait until the output is committed, and then cancel the
>>> job. I know and I agree that this is not an elegant solution but it is a
>>> temporary workaround.
>>>
>>> Hopefully the FLIP and related issue is going to be prioritised soon.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch <rafi.ar...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> This happens because StreamingFileSink does not support a finite input
>>>> stream.
>>>> In the docs it's mentioned under "Important Considerations":
>>>>
>>>> [image: image.png]
>>>>
>>>> This behaviour often surprises users...
>>>>
>>>> There's a FLIP
>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs>
>>>>  and
>>>> an issue <https://issues.apache.org/jira/browse/FLINK-13103> about
>>>> fixing this. I'm not sure what's the status though, maybe Kostas can share.
>>>>
>>>> Thanks,
>>>> Rafi
>>>>
>>>>
>>>> On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <
>>>> austin.caw...@gmail.com> wrote:
>>>>
>>>>> Hi Dawid and Kostas,
>>>>>
>>>>> Sorry for the late reply + thank you for the troubleshooting. I put
>>>>> together an example repo that reproduces the issue[1], because I did have
>>>>> checkpointing enabled in my previous case -- still must be doing something
>>>>> wrong with that config though.
>>>>>
>>>>> Thanks!
>>>>> Austin
>>>>>
>>>>> [1]: https://github.com/austince/flink-streaming-file-sink-compression
>>>>>
>>>>>
>>>>> On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas <kklou...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Austin,
>>>>>>
>>>>>> Dawid is correct in that you need to enable checkpointing for the
>>>>>> StreamingFileSink to work.
>>>>>>
>>>>>> I hope this solves the problem,
>>>>>> Kostas
>>>>>>
>>>>>> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
>>>>>> <dwysakow...@apache.org> wrote:
>>>>>> >
>>>>>> > Hi Austing,
>>>>>> >
>>>>>> > If I am not mistaken the StreamingFileSink by default flushes on
>>>>>> checkpoints. If you don't have checkpoints enabled it might happen that 
>>>>>> not
>>>>>> all data is flushed.
>>>>>> >
>>>>>> > I think you can also adjust that behavior with:
>>>>>> >
>>>>>> > forBulkFormat(...)
>>>>>> >
>>>>>> > .withRollingPolicy(/* your custom logic */)
>>>>>> >
>>>>>> > I also cc Kostas who should be able to correct me if I am wrong.
>>>>>> >
>>>>>> > Best,
>>>>>> >
>>>>>> > Dawid
>>>>>> >
>>>>>> > On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
>>>>>> >
>>>>>> > Hi there,
>>>>>> >
>>>>>> > Using Flink 1.9.1, trying to write .tgz files with the
>>>>>> StreamingFileSink#BulkWriter. It seems like flushing the output stream
>>>>>> doesn't flush all the data written. I've verified I can create valid 
>>>>>> files
>>>>>> using the same APIs and data on there own, so thinking it must be 
>>>>>> something
>>>>>> I'm doing wrong with the bulk format. I'm writing to the local 
>>>>>> filesystem,
>>>>>> with the `file://` protocol.
>>>>>> >
>>>>>> > For Tar/ Gzipping, I'm using the Apache Commons Compression
>>>>>> library, version 1.20.
>>>>>> >
>>>>>> > Here's a runnable example of the issue:
>>>>>> >
>>>>>> > import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
>>>>>> > import
>>>>>> org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
>>>>>> > import
>>>>>> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
>>>>>> > import org.apache.flink.api.common.serialization.BulkWriter;
>>>>>> > import org.apache.flink.core.fs.FSDataOutputStream;
>>>>>> > import org.apache.flink.core.fs.Path;
>>>>>> > import
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>>> > import
>>>>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>>>>>> >
>>>>>> > import java.io.FileOutputStream;
>>>>>> > import java.io.IOException;
>>>>>> > import java.io.Serializable;
>>>>>> > import java.nio.charset.StandardCharsets;
>>>>>> >
>>>>>> > class Scratch {
>>>>>> >   public static class Record implements Serializable {
>>>>>> >     private static final long serialVersionUID = 1L;
>>>>>> >
>>>>>> >     String id;
>>>>>> >
>>>>>> >     public Record() {}
>>>>>> >
>>>>>> >     public Record(String id) {
>>>>>> >       this.id = id;
>>>>>> >     }
>>>>>> >
>>>>>> >     public String getId() {
>>>>>> >       return id;
>>>>>> >     }
>>>>>> >
>>>>>> >     public void setId(String id) {
>>>>>> >       this.id = id;
>>>>>> >     }
>>>>>> >   }
>>>>>> >
>>>>>> >   public static void main(String[] args) throws Exception {
>>>>>> >     final StreamExecutionEnvironment env =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>> >
>>>>>> >     TarArchiveOutputStream taos = new TarArchiveOutputStream(new
>>>>>> GzipCompressorOutputStream(new
>>>>>> FileOutputStream("/home/austin/Downloads/test.tgz")));
>>>>>> >     TarArchiveEntry fileEntry = new
>>>>>> TarArchiveEntry(String.format("%s.txt", "test"));
>>>>>> >     String fullText = "hey\nyou\nwork";
>>>>>> >     byte[] fullTextData = fullText.getBytes();
>>>>>> >     fileEntry.setSize(fullTextData.length);
>>>>>> >     taos.putArchiveEntry(fileEntry);
>>>>>> >     taos.write(fullTextData, 0, fullTextData.length);
>>>>>> >     taos.closeArchiveEntry();
>>>>>> >     taos.flush();
>>>>>> >     taos.close();
>>>>>> >
>>>>>> >     StreamingFileSink<Record> textSink = StreamingFileSink
>>>>>> >         .forBulkFormat(new
>>>>>> Path("file:///home/austin/Downloads/text-output"),
>>>>>> >             new BulkWriter.Factory<Record>() {
>>>>>> >               @Override
>>>>>> >               public BulkWriter<Record> create(FSDataOutputStream
>>>>>> out) throws IOException {
>>>>>> >                 final TarArchiveOutputStream compressedOutputStream
>>>>>> = new TarArchiveOutputStream(new GzipCompressorOutputStream(out));
>>>>>> >
>>>>>> >                 return new BulkWriter<Record>() {
>>>>>> >                   @Override
>>>>>> >                   public void addElement(Record record) throws
>>>>>> IOException {
>>>>>> >                     TarArchiveEntry fileEntry = new
>>>>>> TarArchiveEntry(String.format("%s.txt", record.id));
>>>>>> >                     byte[] fullTextData =
>>>>>> "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
>>>>>> >                     fileEntry.setSize(fullTextData.length);
>>>>>> >
>>>>>>  compressedOutputStream.putArchiveEntry(fileEntry);
>>>>>> >                     compressedOutputStream.write(fullTextData, 0,
>>>>>> fullTextData.length);
>>>>>> >                     compressedOutputStream.closeArchiveEntry();
>>>>>> >                   }
>>>>>> >
>>>>>> >                   @Override
>>>>>> >                   public void flush() throws IOException {
>>>>>> >                     compressedOutputStream.flush();
>>>>>> >                   }
>>>>>> >
>>>>>> >                   @Override
>>>>>> >                   public void finish() throws IOException {
>>>>>> >                     this.flush();
>>>>>> >                   }
>>>>>> >                 };
>>>>>> >               }
>>>>>> >             })
>>>>>> >         .withBucketCheckInterval(1000)
>>>>>> >         .build();
>>>>>> >
>>>>>> >     env
>>>>>> >         .fromElements(new Record("1"), new Record("2"))
>>>>>> >         .addSink(textSink)
>>>>>> >         .name("Streaming File Sink")
>>>>>> >         .uid("streaming-file-sink");
>>>>>> >     env.execute("streaming file sink test");
>>>>>> >   }
>>>>>> > }
>>>>>> >
>>>>>> >
>>>>>> > From the stat/ hex dumps, you can see that the first bits are
>>>>>> there, but are then cut off:
>>>>>> >
>>>>>> > ~/Downloads » stat test.tgz
>>>>>> >   File: test.tgz
>>>>>> >   Size: 114       Blocks: 8          IO Block: 4096   regular file
>>>>>> > Device: 801h/2049d Inode: 30041077    Links: 1
>>>>>> > Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/
>>>>>> austin)
>>>>>> > Access: 2020-02-21 19:30:06.009028283 -0500
>>>>>> > Modify: 2020-02-21 19:30:44.509424406 -0500
>>>>>> > Change: 2020-02-21 19:30:44.509424406 -0500
>>>>>> >  Birth: -
>>>>>> >
>>>>>> > ~/Downloads » tar -tvf test.tgz
>>>>>> > -rw-r--r-- 0/0              12 2020-02-21 19:35 test.txt
>>>>>> >
>>>>>> > ~/Downloads » hd test.tgz
>>>>>> > 00000000  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20
>>>>>> |............1.. |
>>>>>> > 00000010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03
>>>>>> |..afO...+.<.....|
>>>>>> > 00000020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c
>>>>>> |....w0......7..L|
>>>>>> > 00000030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f
>>>>>> |Ez..4.."......%.|
>>>>>> > 00000040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63
>>>>>> |.c.....w.X...."c|
>>>>>> > 00000050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6
>>>>>> |6x|....i_....S..|
>>>>>> > 00000060  94 97 bf 5b 94 52 4a 7d  e7 00 4d ce eb e7 00 08
>>>>>> |...[.RJ}..M.....|
>>>>>> > 00000070  00 00                                             |..|
>>>>>> > 00000072
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > text-output/37 » tar -xzf part-0-0
>>>>>> >
>>>>>> > gzip: stdin: unexpected end of file
>>>>>> > tar: Child returned status 1
>>>>>> > tar: Error is not recoverable: exiting now
>>>>>> >
>>>>>> > text-output/37 » stat part-0-0
>>>>>> >   File: part-0-0
>>>>>> >   Size: 10              Blocks: 8          IO Block: 4096   regular
>>>>>> file
>>>>>> > Device: 801h/2049d      Inode: 4590487     Links: 1
>>>>>> > Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/
>>>>>> austin)
>>>>>> > Access: 2020-02-21 19:33:06.258888702 -0500
>>>>>> > Modify: 2020-02-21 19:33:04.466870139 -0500
>>>>>> > Change: 2020-02-21 19:33:05.294878716 -0500
>>>>>> >  Birth: -
>>>>>> >
>>>>>> > text-output/37 » hd part-0-0
>>>>>> > 00000000  1f 8b 08 00 00 00 00 00  00 ff
>>>>>> |..........|
>>>>>> > 0000000a
>>>>>> >
>>>>>> > Is there anything simple I'm missing?
>>>>>> >
>>>>>> > Best,
>>>>>> > Austin
>>>>>>
>>>>>

Reply via email to