Hi Rafi, There is also an ongoing effort to support bounded streams in DataStream API [1], which might provide the backbone for the functionalists that you need.
Piotrek [1] https://issues.apache.org/jira/browse/FLINK-11875 <https://issues.apache.org/jira/browse/FLINK-11875> > On 25 Mar 2019, at 10:00, Kostas Kloudas <k.klou...@ververica.com> wrote: > > Hi Rafi, > > Although not the most elegant, but one solution could be to write your > program using the file > source in PROCESS_CONTINUOUSLY mode, as described here > https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html > > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html> > > and when you are sure that the processing of your file is done, then you > cancel the job. > > As I said, this is not the most elegant solution but it will do the job. > > Cheers, > Kostas > > > > On Mon, Mar 25, 2019 at 9:49 AM Rafi Aroch <rafi.ar...@gmail.com > <mailto:rafi.ar...@gmail.com>> wrote: > Hi Kostas, > > Thank you. > I'm currently testing my job against a small file, so it's finishing before > the checkpointing starts. > But also if it was a larger file and checkpoint did happen, there would > always be the tailing events starting after the last checkpoint until the > source has finished. > So would these events be lost? > In this case, any flow which is (bounded stream) => (StreamingFileSink) > would not give the expected results... > > The other alternative would be using BucketingSink, but it would not guaranty > exactly-once into S3 which is not preferable. > > Can you suggest any workaround? Somehow making sure checkpointing is > triggered at the end? > > Rafi > > > On Thu, Mar 21, 2019 at 9:40 PM Kostas Kloudas <k.klou...@ververica.com > <mailto:k.klou...@ververica.com>> wrote: > Sorry Rafi, > > I just read your previous response where you say that you have already > activated checkpointing. > My bad for not paying attention. > > Unfortunately, currently in-progress files only roll (or get finalized) on > checkpoint barriers and NOT when calling close(). > This is due to the fact that at the function level, Flink does not > differentiate between failures and normal termination. > But there are plans to fix it: > https://issues.apache.org/jira/browse/FLINK-2646 > <https://issues.apache.org/jira/browse/FLINK-2646> > > So given the above, you should check if checkpoints go through your pipeline > or not before your source > stream reaches its end. If there are no checkpoints, then your in-progress > files will not be finalized and > Parquet, for example, will not write the footer that is needed to be able to > properly read the file. > > Kostas > > > On Thu, Mar 21, 2019 at 8:03 PM Rafi Aroch <rafi.ar...@gmail.com > <mailto:rafi.ar...@gmail.com>> wrote: > Hi Kostas, > > Yes I have. > > Rafi > > On Thu, Mar 21, 2019, 20:47 Kostas Kloudas <kklou...@gmail.com > <mailto:kklou...@gmail.com>> wrote: > Hi Rafi, > > Have you enabled checkpointing for you job? > > Cheers, > Kostas > > On Thu, Mar 21, 2019 at 5:18 PM Rafi Aroch <rafi.ar...@gmail.com > <mailto:rafi.ar...@gmail.com>> wrote: > Hi Piotr and Kostas, > > Thanks for your reply. > > The issue is that I don't see any committed files, only in-progress. > I tried to debug the code for more details. I see that in BulkPartWriter I do > reach the write methods and see events getting written, but I never reach the > closeForCommit. I reach straight to the close function where all parts are > disposed. > > In my job I have a finite stream (source is reading from parquet file/s). > Doing some windowed aggregation and writing back to a parquet file. > As far as I know, it should commit files during checkpoints and when the > stream has finished. I did enabled checkpointing. > I did verify that if I connect to other sinks, I see the events. > > Let me know if I can provide any further information that could be helpful. > > Would appreciate your help. > > Thanks, > Rafi > > > On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <kklou...@gmail.com > <mailto:kklou...@gmail.com>> wrote: > Hi Rafi, > > Piotr is correct. In-progress files are not necessarily readable. > The valid files are the ones that are "committed" or finalized. > > Cheers, > Kostas > > On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <pi...@ververica.com > <mailto:pi...@ververica.com>> wrote: > Hi, > > I’m not sure, but shouldn’t you be just reading committed files and ignore > in-progress? Maybe Kostas could add more insight to this topic. > > Piotr Nowojski > >> On 20 Mar 2019, at 12:23, Rafi Aroch <rafi.ar...@gmail.com >> <mailto:rafi.ar...@gmail.com>> wrote: >> >> Hi, >> >> I'm trying to stream events in Prorobuf format into a parquet file. >> I looked into both streaming-file options: BucketingSink & StreamingFileSink. >> I first tried using the newer StreamingFileSink with the forBulkFormat API. >> I noticed there's currently support only for the Avro format with the >> ParquetAvroWriters. >> I followed the same convention as Avro and wrote a ParquetProtoWriters >> builder class: >> >> public class ParquetProtoWriters { >> >> private static final int pageSize = 64 * 1024; >> >> public static <T extends Message> ParquetWriterFactory<T> forType(final >> Class<T> protoClass) { >> final ParquetBuilder<T> builder = (out) -> >> createProtoParquetWriter(protoClass, out); >> return new ParquetWriterFactory<>(builder); >> } >> >> private static <T extends Message> ParquetWriter<T> >> createProtoParquetWriter( >> Class<T> type, >> OutputFile out) throws IOException { >> >> return ProtoParquetWriter.<T>builder(out) >> .withPageSize(pageSize) >> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >> .withCompressionCodec(CompressionCodecName.SNAPPY) >> .withProtoClass(type) >> .build(); >> } >> } >> And then I use it as follows: >> StreamingFileSink >> .forBulkFormat(new Path("some-path), >> ParquetProtoWriters.forType(SomeProtoType.class)) >> .build(); >> I ran tests on the ParquetProtoWriters itself and it writes everything >> properly and i'm able to read the files. >> >> When I use the sink as part of a job I see illegal Parquet files created: >> # parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea >> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet >> file (too small length: 4) >> >> Can anyone suggest what am I missing here? >> >> When trying to use the BucketingSink, I wrote a Writer class for Protobuf >> and everything worked perfectly: >> public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements >> Writer<T> { >> >> private static final long serialVersionUID = -975302556515811398L; >> >> private Path path; >> private Class<? extends Message> protoClass; >> private transient ParquetWriter<T> writer; >> >> private int position; >> private final CompressionCodecName compressionCodecName = >> CompressionCodecName.SNAPPY; >> private final int pageSize = 64 * 1024; >> >> public FlinkProtoParquetWriter(Class<? extends Message> protoClass) { >> this.protoClass = protoClass; >> } >> >> @Override >> public void open(FileSystem fs, Path path) throws IOException { >> this.position = 0; >> this.path = path; >> >> if (writer != null) { >> writer.close(); >> } >> >> writer = createWriter(); >> } >> >> @Override >> public long flush() throws IOException { >> Preconditions.checkNotNull(writer); >> position += writer.getDataSize(); >> writer.close(); >> writer = createWriter(); >> >> return position; >> } >> >> @Override >> public long getPos() { >> Preconditions.checkNotNull(writer); >> return position + writer.getDataSize(); >> } >> >> @Override >> public void close() throws IOException { >> if (writer != null) { >> writer.close(); >> writer = null; >> } >> } >> >> @Override >> public void write(T element) throws IOException { >> Preconditions.checkNotNull(writer); >> writer.write(element); >> } >> >> @Override >> public Writer<T> duplicate() { >> return new FlinkProtoParquetWriter<>(protoClass); >> } >> >> private ParquetWriter<T> createWriter() throws IOException { >> return ProtoParquetWriter >> .<T>builder(path) >> .withPageSize(pageSize) >> .withCompressionCodec(compressionCodecName) >> .withProtoClass(protoClass) >> .build(); >> } >> } >> >> Rafi > > > > -- > Kostas Kloudas | Software Engineer > > <https://www.ververica.com/> > Follow us @VervericaData > -- > Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference > Stream Processing | Event Driven | Real Time > -- > Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > -- > Data Artisans GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen > > > -- > Kostas Kloudas | Software Engineer > > <https://www.ververica.com/> > Follow us @VervericaData > -- > Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference > Stream Processing | Event Driven | Real Time > -- > Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > -- > Data Artisans GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen