Hey Kostas, We’re a little bit off from a 1.10 update but I can certainly see if that CompressWriterFactory might solve my use case for when we do.
If there is anything I can do to help document that feature, please let me know. Thanks! Austin On Wed, Mar 4, 2020 at 4:58 AM Kostas Kloudas <kklou...@apache.org> wrote: > Hi Austin, > > I will have a look at your repo. In the meantime, given that [1] is > already merged in 1.10, > would upgrading to 1.10 and using the newly introduced > CompressWriterFactory be an option for you? > > It is unfortunate that this feature was not documented. > > Cheers, > Kostas > > [1] https://issues.apache.org/jira/browse/FLINK-13634 > > > On Tue, Mar 3, 2020 at 11:13 PM Austin Cawley-Edwards < > austin.caw...@gmail.com> wrote: > >> Hi all, >> >> Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy >> Kostas -- strange though, as I wasn't using a bounded source when I first >> ran into this issue. I have updated the example repo to use an unbounded >> source[1], and the same file corruption problems remain. >> >> Anything else I could be doing wrong with the compression stream? >> >> Thanks again, >> Austin >> >> [1]: >> https://github.com/austince/flink-streaming-file-sink-compression/tree/unbounded >> >> On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas <kklou...@apache.org> >> wrote: >> >>> Hi Austin and Rafi, >>> >>> @Rafi Thanks for providing the pointers! >>> Unfortunately there is no progress on the FLIP (or the issue). >>> >>> @ Austin In the meantime, what you could do --assuming that your input >>> is bounded -- you could simply not stop the job after the whole input is >>> processed, then wait until the output is committed, and then cancel the >>> job. I know and I agree that this is not an elegant solution but it is a >>> temporary workaround. >>> >>> Hopefully the FLIP and related issue is going to be prioritised soon. >>> >>> Cheers, >>> Kostas >>> >>> On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch <rafi.ar...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> This happens because StreamingFileSink does not support a finite input >>>> stream. >>>> In the docs it's mentioned under "Important Considerations": >>>> >>>> [image: image.png] >>>> >>>> This behaviour often surprises users... >>>> >>>> There's a FLIP >>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs> >>>> and >>>> an issue <https://issues.apache.org/jira/browse/FLINK-13103> about >>>> fixing this. I'm not sure what's the status though, maybe Kostas can share. >>>> >>>> Thanks, >>>> Rafi >>>> >>>> >>>> On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards < >>>> austin.caw...@gmail.com> wrote: >>>> >>>>> Hi Dawid and Kostas, >>>>> >>>>> Sorry for the late reply + thank you for the troubleshooting. I put >>>>> together an example repo that reproduces the issue[1], because I did have >>>>> checkpointing enabled in my previous case -- still must be doing something >>>>> wrong with that config though. >>>>> >>>>> Thanks! >>>>> Austin >>>>> >>>>> [1]: https://github.com/austince/flink-streaming-file-sink-compression >>>>> >>>>> >>>>> On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas <kklou...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Austin, >>>>>> >>>>>> Dawid is correct in that you need to enable checkpointing for the >>>>>> StreamingFileSink to work. >>>>>> >>>>>> I hope this solves the problem, >>>>>> Kostas >>>>>> >>>>>> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz >>>>>> <dwysakow...@apache.org> wrote: >>>>>> > >>>>>> > Hi Austing, >>>>>> > >>>>>> > If I am not mistaken the StreamingFileSink by default flushes on >>>>>> checkpoints. If you don't have checkpoints enabled it might happen that >>>>>> not >>>>>> all data is flushed. >>>>>> > >>>>>> > I think you can also adjust that behavior with: >>>>>> > >>>>>> > forBulkFormat(...) >>>>>> > >>>>>> > .withRollingPolicy(/* your custom logic */) >>>>>> > >>>>>> > I also cc Kostas who should be able to correct me if I am wrong. >>>>>> > >>>>>> > Best, >>>>>> > >>>>>> > Dawid >>>>>> > >>>>>> > On 22/02/2020 01:59, Austin Cawley-Edwards wrote: >>>>>> > >>>>>> > Hi there, >>>>>> > >>>>>> > Using Flink 1.9.1, trying to write .tgz files with the >>>>>> StreamingFileSink#BulkWriter. It seems like flushing the output stream >>>>>> doesn't flush all the data written. I've verified I can create valid >>>>>> files >>>>>> using the same APIs and data on there own, so thinking it must be >>>>>> something >>>>>> I'm doing wrong with the bulk format. I'm writing to the local >>>>>> filesystem, >>>>>> with the `file://` protocol. >>>>>> > >>>>>> > For Tar/ Gzipping, I'm using the Apache Commons Compression >>>>>> library, version 1.20. >>>>>> > >>>>>> > Here's a runnable example of the issue: >>>>>> > >>>>>> > import org.apache.commons.compress.archivers.tar.TarArchiveEntry; >>>>>> > import >>>>>> org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; >>>>>> > import >>>>>> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; >>>>>> > import org.apache.flink.api.common.serialization.BulkWriter; >>>>>> > import org.apache.flink.core.fs.FSDataOutputStream; >>>>>> > import org.apache.flink.core.fs.Path; >>>>>> > import >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>>> > import >>>>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; >>>>>> > >>>>>> > import java.io.FileOutputStream; >>>>>> > import java.io.IOException; >>>>>> > import java.io.Serializable; >>>>>> > import java.nio.charset.StandardCharsets; >>>>>> > >>>>>> > class Scratch { >>>>>> > public static class Record implements Serializable { >>>>>> > private static final long serialVersionUID = 1L; >>>>>> > >>>>>> > String id; >>>>>> > >>>>>> > public Record() {} >>>>>> > >>>>>> > public Record(String id) { >>>>>> > this.id = id; >>>>>> > } >>>>>> > >>>>>> > public String getId() { >>>>>> > return id; >>>>>> > } >>>>>> > >>>>>> > public void setId(String id) { >>>>>> > this.id = id; >>>>>> > } >>>>>> > } >>>>>> > >>>>>> > public static void main(String[] args) throws Exception { >>>>>> > final StreamExecutionEnvironment env = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>> > >>>>>> > TarArchiveOutputStream taos = new TarArchiveOutputStream(new >>>>>> GzipCompressorOutputStream(new >>>>>> FileOutputStream("/home/austin/Downloads/test.tgz"))); >>>>>> > TarArchiveEntry fileEntry = new >>>>>> TarArchiveEntry(String.format("%s.txt", "test")); >>>>>> > String fullText = "hey\nyou\nwork"; >>>>>> > byte[] fullTextData = fullText.getBytes(); >>>>>> > fileEntry.setSize(fullTextData.length); >>>>>> > taos.putArchiveEntry(fileEntry); >>>>>> > taos.write(fullTextData, 0, fullTextData.length); >>>>>> > taos.closeArchiveEntry(); >>>>>> > taos.flush(); >>>>>> > taos.close(); >>>>>> > >>>>>> > StreamingFileSink<Record> textSink = StreamingFileSink >>>>>> > .forBulkFormat(new >>>>>> Path("file:///home/austin/Downloads/text-output"), >>>>>> > new BulkWriter.Factory<Record>() { >>>>>> > @Override >>>>>> > public BulkWriter<Record> create(FSDataOutputStream >>>>>> out) throws IOException { >>>>>> > final TarArchiveOutputStream compressedOutputStream >>>>>> = new TarArchiveOutputStream(new GzipCompressorOutputStream(out)); >>>>>> > >>>>>> > return new BulkWriter<Record>() { >>>>>> > @Override >>>>>> > public void addElement(Record record) throws >>>>>> IOException { >>>>>> > TarArchiveEntry fileEntry = new >>>>>> TarArchiveEntry(String.format("%s.txt", record.id)); >>>>>> > byte[] fullTextData = >>>>>> "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8); >>>>>> > fileEntry.setSize(fullTextData.length); >>>>>> > >>>>>> compressedOutputStream.putArchiveEntry(fileEntry); >>>>>> > compressedOutputStream.write(fullTextData, 0, >>>>>> fullTextData.length); >>>>>> > compressedOutputStream.closeArchiveEntry(); >>>>>> > } >>>>>> > >>>>>> > @Override >>>>>> > public void flush() throws IOException { >>>>>> > compressedOutputStream.flush(); >>>>>> > } >>>>>> > >>>>>> > @Override >>>>>> > public void finish() throws IOException { >>>>>> > this.flush(); >>>>>> > } >>>>>> > }; >>>>>> > } >>>>>> > }) >>>>>> > .withBucketCheckInterval(1000) >>>>>> > .build(); >>>>>> > >>>>>> > env >>>>>> > .fromElements(new Record("1"), new Record("2")) >>>>>> > .addSink(textSink) >>>>>> > .name("Streaming File Sink") >>>>>> > .uid("streaming-file-sink"); >>>>>> > env.execute("streaming file sink test"); >>>>>> > } >>>>>> > } >>>>>> > >>>>>> > >>>>>> > From the stat/ hex dumps, you can see that the first bits are >>>>>> there, but are then cut off: >>>>>> > >>>>>> > ~/Downloads » stat test.tgz >>>>>> > File: test.tgz >>>>>> > Size: 114 Blocks: 8 IO Block: 4096 regular file >>>>>> > Device: 801h/2049d Inode: 30041077 Links: 1 >>>>>> > Access: (0664/-rw-rw-r--) Uid: ( 1000/ austin) Gid: ( 1000/ >>>>>> austin) >>>>>> > Access: 2020-02-21 19:30:06.009028283 -0500 >>>>>> > Modify: 2020-02-21 19:30:44.509424406 -0500 >>>>>> > Change: 2020-02-21 19:30:44.509424406 -0500 >>>>>> > Birth: - >>>>>> > >>>>>> > ~/Downloads » tar -tvf test.tgz >>>>>> > -rw-r--r-- 0/0 12 2020-02-21 19:35 test.txt >>>>>> > >>>>>> > ~/Downloads » hd test.tgz >>>>>> > 00000000 1f 8b 08 00 00 00 00 00 00 ff ed cf 31 0e 80 20 >>>>>> |............1.. | >>>>>> > 00000010 0c 85 61 66 4f c1 09 cc 2b 14 3c 8f 83 89 89 03 >>>>>> |..afO...+.<.....| >>>>>> > 00000020 09 94 a8 b7 77 30 2e ae 8a 2e fd 96 37 f6 af 4c >>>>>> |....w0......7..L| >>>>>> > 00000030 45 7a d9 c4 34 04 02 22 b3 c5 e9 be 00 b1 25 1f >>>>>> |Ez..4.."......%.| >>>>>> > 00000040 1d 63 f0 81 82 05 91 77 d1 58 b4 8c ba d4 22 63 >>>>>> |.c.....w.X...."c| >>>>>> > 00000050 36 78 7c eb fe dc 0b 69 5f 98 a7 bd db 53 ed d6 >>>>>> |6x|....i_....S..| >>>>>> > 00000060 94 97 bf 5b 94 52 4a 7d e7 00 4d ce eb e7 00 08 >>>>>> |...[.RJ}..M.....| >>>>>> > 00000070 00 00 |..| >>>>>> > 00000072 >>>>>> > >>>>>> > >>>>>> > >>>>>> > text-output/37 » tar -xzf part-0-0 >>>>>> > >>>>>> > gzip: stdin: unexpected end of file >>>>>> > tar: Child returned status 1 >>>>>> > tar: Error is not recoverable: exiting now >>>>>> > >>>>>> > text-output/37 » stat part-0-0 >>>>>> > File: part-0-0 >>>>>> > Size: 10 Blocks: 8 IO Block: 4096 regular >>>>>> file >>>>>> > Device: 801h/2049d Inode: 4590487 Links: 1 >>>>>> > Access: (0664/-rw-rw-r--) Uid: ( 1000/ austin) Gid: ( 1000/ >>>>>> austin) >>>>>> > Access: 2020-02-21 19:33:06.258888702 -0500 >>>>>> > Modify: 2020-02-21 19:33:04.466870139 -0500 >>>>>> > Change: 2020-02-21 19:33:05.294878716 -0500 >>>>>> > Birth: - >>>>>> > >>>>>> > text-output/37 » hd part-0-0 >>>>>> > 00000000 1f 8b 08 00 00 00 00 00 00 ff >>>>>> |..........| >>>>>> > 0000000a >>>>>> > >>>>>> > Is there anything simple I'm missing? >>>>>> > >>>>>> > Best, >>>>>> > Austin >>>>>> >>>>>