Hi Rafi,

There is also an ongoing effort to support bounded streams in DataStream API 
[1], which might provide the backbone for the functionalists that you need. 

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-11875 
<https://issues.apache.org/jira/browse/FLINK-11875>

> On 25 Mar 2019, at 10:00, Kostas Kloudas <k.klou...@ververica.com> wrote:
> 
> Hi Rafi,
> 
> Although not the most elegant, but one solution could be to write your 
> program using the file 
> source in PROCESS_CONTINUOUSLY mode, as described here 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html>
>  
> and when you are sure that the processing of your file is done, then you 
> cancel the job.
> 
> As I said, this is not the most elegant solution but it will do the job.
> 
> Cheers,
> Kostas
> 
> 
> 
> On Mon, Mar 25, 2019 at 9:49 AM Rafi Aroch <rafi.ar...@gmail.com 
> <mailto:rafi.ar...@gmail.com>> wrote:
> Hi Kostas,
> 
> Thank you.
> I'm currently testing my job against a small file, so it's finishing before 
> the checkpointing starts.
> But also if it was a larger file and checkpoint did happen, there would 
> always be the tailing events starting after the last checkpoint until the 
> source has finished.
> So would these events be lost?
> In this case, any flow which is  (bounded stream) => (StreamingFileSink) 
> would not give the expected results...
> 
> The other alternative would be using BucketingSink, but it would not guaranty 
> exactly-once into S3 which is not preferable. 
> 
> Can you suggest any workaround? Somehow making sure checkpointing is 
> triggered at the end?
> 
> Rafi
> 
> 
> On Thu, Mar 21, 2019 at 9:40 PM Kostas Kloudas <k.klou...@ververica.com 
> <mailto:k.klou...@ververica.com>> wrote:
> Sorry Rafi,
> 
> I just read your previous response where you say that you have already 
> activated checkpointing.
> My bad for not paying attention.
> 
> Unfortunately, currently in-progress files only roll (or get finalized) on 
> checkpoint barriers and NOT when calling close(). 
> This is due to the fact that at the function level, Flink does not 
> differentiate between failures and normal termination.
> But there are plans to fix it: 
> https://issues.apache.org/jira/browse/FLINK-2646 
> <https://issues.apache.org/jira/browse/FLINK-2646>
> 
> So given the above, you should check if checkpoints go through your pipeline 
> or not before your source 
> stream reaches its end. If there are no checkpoints, then your in-progress 
> files will not be finalized and 
> Parquet, for example, will not write the footer that is needed to be able to 
> properly read the file.
> 
> Kostas
> 
> 
> On Thu, Mar 21, 2019 at 8:03 PM Rafi Aroch <rafi.ar...@gmail.com 
> <mailto:rafi.ar...@gmail.com>> wrote:
> Hi Kostas, 
> 
> Yes I have. 
> 
> Rafi 
> 
> On Thu, Mar 21, 2019, 20:47 Kostas Kloudas <kklou...@gmail.com 
> <mailto:kklou...@gmail.com>> wrote:
> Hi Rafi, 
> 
> Have you enabled checkpointing for you job?
> 
> Cheers,
> Kostas
> 
> On Thu, Mar 21, 2019 at 5:18 PM Rafi Aroch <rafi.ar...@gmail.com 
> <mailto:rafi.ar...@gmail.com>> wrote:
> Hi Piotr and Kostas,
> 
> Thanks for your reply.
> 
> The issue is that I don't see any committed files, only in-progress.
> I tried to debug the code for more details. I see that in BulkPartWriter I do 
> reach the write methods and see events getting written, but I never reach the 
> closeForCommit. I reach straight to the close function where all parts are 
> disposed.
> 
> In my job I have a finite stream (source is reading from parquet file/s). 
> Doing some windowed aggregation and writing back to a parquet file. 
> As far as I know, it should commit files during checkpoints and when the 
> stream has finished. I did enabled checkpointing.
> I did verify that if I connect to other sinks, I see the events.
> 
> Let me know if I can provide any further information that could be helpful.
> 
> Would appreciate your help.
> 
> Thanks,
> Rafi
> 
> 
> On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <kklou...@gmail.com 
> <mailto:kklou...@gmail.com>> wrote:
> Hi Rafi,
> 
> Piotr is correct. In-progress files are not necessarily readable.
> The valid files are the ones that are "committed" or finalized.
> 
> Cheers,
> Kostas
> 
> On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <pi...@ververica.com 
> <mailto:pi...@ververica.com>> wrote:
> Hi,
> 
> I’m not sure, but shouldn’t you be just reading committed files and ignore 
> in-progress? Maybe Kostas could add more insight to this topic.
> 
> Piotr Nowojski
> 
>> On 20 Mar 2019, at 12:23, Rafi Aroch <rafi.ar...@gmail.com 
>> <mailto:rafi.ar...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> I'm trying to stream events in Prorobuf format into a parquet file.
>> I looked into both streaming-file options: BucketingSink & StreamingFileSink.
>> I first tried using the newer StreamingFileSink with the forBulkFormat API. 
>> I noticed there's currently support only for the Avro format with the 
>> ParquetAvroWriters.
>> I followed the same convention as Avro and wrote a ParquetProtoWriters 
>> builder class:
>> 
>> public class ParquetProtoWriters {
>> 
>>     private static final int pageSize = 64 * 1024;
>> 
>>     public static <T extends Message> ParquetWriterFactory<T> forType(final 
>> Class<T> protoClass) {
>>         final ParquetBuilder<T> builder = (out) -> 
>> createProtoParquetWriter(protoClass, out);
>>         return new ParquetWriterFactory<>(builder);
>>     }
>> 
>>     private static <T extends Message> ParquetWriter<T> 
>> createProtoParquetWriter(
>>             Class<T> type,
>>             OutputFile out) throws IOException {
>> 
>>         return ProtoParquetWriter.<T>builder(out)
>>                 .withPageSize(pageSize)
>>                 .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>                 .withCompressionCodec(CompressionCodecName.SNAPPY)
>>                 .withProtoClass(type)
>>                 .build();
>>     }
>> }
>> And then I use it as follows:
>> StreamingFileSink
>>         .forBulkFormat(new Path("some-path), 
>> ParquetProtoWriters.forType(SomeProtoType.class))
>>         .build();
>> I ran tests on the ParquetProtoWriters itself and it writes everything 
>> properly and i'm able to read the files.
>> 
>> When I use the sink as part of a job I see illegal Parquet files created:
>> # parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet 
>> file (too small length: 4)
>> 
>> Can anyone suggest what am I missing here?
>> 
>> When trying to use the BucketingSink, I wrote a Writer class for Protobuf 
>> and everything worked perfectly:
>> public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements 
>> Writer<T> {
>> 
>>     private static final long serialVersionUID = -975302556515811398L;
>> 
>>     private Path path;
>>     private Class<? extends Message> protoClass;
>>     private transient ParquetWriter<T> writer;
>> 
>>     private int position;
>>     private final CompressionCodecName compressionCodecName = 
>> CompressionCodecName.SNAPPY;
>>     private final int pageSize = 64 * 1024;
>> 
>>     public FlinkProtoParquetWriter(Class<? extends Message> protoClass) {
>>         this.protoClass = protoClass;
>>     }
>> 
>>     @Override
>>     public void open(FileSystem fs, Path path) throws IOException {
>>         this.position = 0;
>>         this.path = path;
>> 
>>         if (writer != null) {
>>             writer.close();
>>         }
>> 
>>         writer = createWriter();
>>     }
>> 
>>     @Override
>>     public long flush() throws IOException {
>>         Preconditions.checkNotNull(writer);
>>         position += writer.getDataSize();
>>         writer.close();
>>         writer = createWriter();
>> 
>>         return position;
>>     }
>> 
>>     @Override
>>     public long getPos() {
>>         Preconditions.checkNotNull(writer);
>>         return position + writer.getDataSize();
>>     }
>> 
>>     @Override
>>     public void close() throws IOException {
>>         if (writer != null) {
>>             writer.close();
>>             writer = null;
>>         }
>>     }
>> 
>>     @Override
>>     public void write(T element) throws IOException {
>>         Preconditions.checkNotNull(writer);
>>         writer.write(element);
>>     }
>> 
>>     @Override
>>     public Writer<T> duplicate() {
>>         return new FlinkProtoParquetWriter<>(protoClass);
>>     }
>> 
>>     private ParquetWriter<T> createWriter() throws IOException {
>>         return ProtoParquetWriter
>>                 .<T>builder(path)
>>                 .withPageSize(pageSize)
>>                 .withCompressionCodec(compressionCodecName)
>>                 .withProtoClass(protoClass)
>>                 .build();
>>     }
>> }
>> 
>> Rafi
> 
> 
> 
> -- 
> Kostas Kloudas | Software Engineer
> 
>  <https://www.ververica.com/>
> Follow us @VervericaData
> --
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen      
> 
> 
> -- 
> Kostas Kloudas | Software Engineer
> 
>  <https://www.ververica.com/>
> Follow us @VervericaData
> --
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen      

Reply via email to