Thanks Piotr & Kostas.

Really looking forward to this :)

Rafi


On Wed, Mar 27, 2019 at 10:58 AM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi Rafi,
>
> There is also an ongoing effort to support bounded streams in DataStream
> API [1], which might provide the backbone for the functionalists that you
> need.
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-11875
>
> On 25 Mar 2019, at 10:00, Kostas Kloudas <k.klou...@ververica.com> wrote:
>
> Hi Rafi,
>
> Although not the most elegant, but one solution could be to write your
> program using the file
> source in PROCESS_CONTINUOUSLY mode, as described here
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html
>
> and when you are sure that the processing of your file is done, then you
> cancel the job.
>
> As I said, this is not the most elegant solution but it will do the job.
>
> Cheers,
> Kostas
>
>
>
> On Mon, Mar 25, 2019 at 9:49 AM Rafi Aroch <rafi.ar...@gmail.com> wrote:
>
>> Hi Kostas,
>>
>> Thank you.
>> I'm currently testing my job against a small file, so it's finishing
>> before the checkpointing starts.
>> But also if it was a larger file and checkpoint did happen, there would
>> always be the tailing events starting after the last checkpoint until the
>> source has finished.
>> So would these events be lost?
>> In this case, any flow which is  (bounded stream) => (StreamingFileSink)
>> would not give the expected results...
>>
>> The other alternative would be using BucketingSink, but it would not
>> guaranty exactly-once into S3 which is not preferable.
>>
>> Can you suggest any workaround? Somehow making sure checkpointing is
>> triggered at the end?
>>
>> Rafi
>>
>>
>> On Thu, Mar 21, 2019 at 9:40 PM Kostas Kloudas <k.klou...@ververica.com>
>> wrote:
>>
>>> Sorry Rafi,
>>>
>>> I just read your previous response where you say that you have already
>>> activated checkpointing.
>>> My bad for not paying attention.
>>>
>>> Unfortunately, currently in-progress files only roll (or get finalized)
>>> on checkpoint barriers and NOT when calling close().
>>> This is due to the fact that at the function level, Flink does not
>>> differentiate between failures and normal termination.
>>> But there are plans to fix it:
>>> https://issues.apache.org/jira/browse/FLINK-2646
>>>
>>> So given the above, you should check if checkpoints go through your
>>> pipeline or not before your source
>>> stream reaches its end. If there are no checkpoints, then your
>>> in-progress files will not be finalized and
>>> Parquet, for example, will not write the footer that is needed to be
>>> able to properly read the file.
>>>
>>> Kostas
>>>
>>>
>>> On Thu, Mar 21, 2019 at 8:03 PM Rafi Aroch <rafi.ar...@gmail.com> wrote:
>>>
>>>> Hi Kostas,
>>>>
>>>> Yes I have.
>>>>
>>>> Rafi
>>>>
>>>> On Thu, Mar 21, 2019, 20:47 Kostas Kloudas <kklou...@gmail.com> wrote:
>>>>
>>>>> Hi Rafi,
>>>>>
>>>>> Have you enabled checkpointing for you job?
>>>>>
>>>>> Cheers,
>>>>> Kostas
>>>>>
>>>>> On Thu, Mar 21, 2019 at 5:18 PM Rafi Aroch <rafi.ar...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Piotr and Kostas,
>>>>>>
>>>>>> Thanks for your reply.
>>>>>>
>>>>>> The issue is that I don't see any committed files, only in-progress.
>>>>>> I tried to debug the code for more details. I see that in
>>>>>> *BulkPartWriter* I do reach the *write* methods and see events
>>>>>> getting written, but I never reach the *closeForCommit*. I reach
>>>>>> straight to the *close* function where all parts are disposed.
>>>>>>
>>>>>> In my job I have a finite stream (source is reading from parquet
>>>>>> file/s). Doing some windowed aggregation and writing back to a parquet
>>>>>> file.
>>>>>> As far as I know, it should commit files during checkpoints and when
>>>>>> the stream has finished. I did enabled checkpointing.
>>>>>> I did verify that if I connect to other sinks, I see the events.
>>>>>>
>>>>>> Let me know if I can provide any further information that could be
>>>>>> helpful.
>>>>>>
>>>>>> Would appreciate your help.
>>>>>>
>>>>>> Thanks,
>>>>>> Rafi
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <kklou...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Rafi,
>>>>>>>
>>>>>>> Piotr is correct. In-progress files are not necessarily readable.
>>>>>>> The valid files are the ones that are "committed" or finalized.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Kostas
>>>>>>>
>>>>>>> On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <pi...@ververica.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I’m not sure, but shouldn’t you be just reading committed files and
>>>>>>>> ignore in-progress? Maybe Kostas could add more insight to this topic.
>>>>>>>>
>>>>>>>> Piotr Nowojski
>>>>>>>>
>>>>>>>> On 20 Mar 2019, at 12:23, Rafi Aroch <rafi.ar...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm trying to stream events in Prorobuf format into a parquet file.
>>>>>>>> I looked into both streaming-file options: BucketingSink &
>>>>>>>> StreamingFileSink.
>>>>>>>> I first tried using the newer *StreamingFileSink* with the 
>>>>>>>> *forBulkFormat
>>>>>>>> *API. I noticed there's currently support only for the Avro format
>>>>>>>> with the *ParquetAvroWriters*.
>>>>>>>> I followed the same convention as Avro and wrote a
>>>>>>>> *ParquetProtoWriters* builder class:
>>>>>>>>
>>>>>>>> public class ParquetProtoWriters {
>>>>>>>>
>>>>>>>>     private static final int pageSize = 64 * 1024;
>>>>>>>>
>>>>>>>>     public static <T extends Message> ParquetWriterFactory<T> 
>>>>>>>> forType(final Class<T> protoClass) {
>>>>>>>>         final ParquetBuilder<T> builder = (out) -> 
>>>>>>>> createProtoParquetWriter(protoClass, out);
>>>>>>>>         return new ParquetWriterFactory<>(builder);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     private static <T extends Message> ParquetWriter<T> 
>>>>>>>> createProtoParquetWriter(
>>>>>>>>             Class<T> type,
>>>>>>>>             OutputFile out) throws IOException {
>>>>>>>>
>>>>>>>>         return ProtoParquetWriter.<T>builder(out)
>>>>>>>>                 .withPageSize(pageSize)
>>>>>>>>                 .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>>>                 .withCompressionCodec(CompressionCodecName.SNAPPY)
>>>>>>>>                 .withProtoClass(type)
>>>>>>>>                 .build();
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>> And then I use it as follows:
>>>>>>>>
>>>>>>>> StreamingFileSink
>>>>>>>>         .forBulkFormat(new Path("some-path), 
>>>>>>>> ParquetProtoWriters.forType(SomeProtoType.class))
>>>>>>>>         .build();
>>>>>>>>
>>>>>>>> I ran tests on the *ParquetProtoWriters *itself and it writes
>>>>>>>> everything properly and i'm able to read the files.
>>>>>>>>
>>>>>>>> When I use the sink as part of a job *I see illegal Parquet files
>>>>>>>> created*:
>>>>>>>>
>>>>>>>> # parquet-tools cat 
>>>>>>>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
>>>>>>>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a 
>>>>>>>> Parquet file (too small length: 4)
>>>>>>>>
>>>>>>>>
>>>>>>>> Can anyone suggest what am I missing here?
>>>>>>>>
>>>>>>>> When trying to use the *BucketingSink*, I wrote a Writer class for
>>>>>>>> Protobuf and everything worked perfectly:
>>>>>>>>
>>>>>>>> public class FlinkProtoParquetWriter<T extends MessageOrBuilder> 
>>>>>>>> implements Writer<T> {
>>>>>>>>
>>>>>>>>     private static final long serialVersionUID = -975302556515811398L;
>>>>>>>>
>>>>>>>>     private Path path;
>>>>>>>>     private Class<? extends Message> protoClass;
>>>>>>>>     private transient ParquetWriter<T> writer;
>>>>>>>>
>>>>>>>>     private int position;
>>>>>>>>     private final CompressionCodecName compressionCodecName = 
>>>>>>>> CompressionCodecName.SNAPPY;
>>>>>>>>     private final int pageSize = 64 * 1024;
>>>>>>>>
>>>>>>>>     public FlinkProtoParquetWriter(Class<? extends Message> 
>>>>>>>> protoClass) {
>>>>>>>>         this.protoClass = protoClass;
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void open(FileSystem fs, Path path) throws IOException {
>>>>>>>>         this.position = 0;
>>>>>>>>         this.path = path;
>>>>>>>>
>>>>>>>>         if (writer != null) {
>>>>>>>>             writer.close();
>>>>>>>>         }
>>>>>>>>
>>>>>>>>         writer = createWriter();
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public long flush() throws IOException {
>>>>>>>>         Preconditions.checkNotNull(writer);
>>>>>>>>         position += writer.getDataSize();
>>>>>>>>         writer.close();
>>>>>>>>         writer = createWriter();
>>>>>>>>
>>>>>>>>         return position;
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public long getPos() {
>>>>>>>>         Preconditions.checkNotNull(writer);
>>>>>>>>         return position + writer.getDataSize();
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void close() throws IOException {
>>>>>>>>         if (writer != null) {
>>>>>>>>             writer.close();
>>>>>>>>             writer = null;
>>>>>>>>         }
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void write(T element) throws IOException {
>>>>>>>>         Preconditions.checkNotNull(writer);
>>>>>>>>         writer.write(element);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public Writer<T> duplicate() {
>>>>>>>>         return new FlinkProtoParquetWriter<>(protoClass);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     private ParquetWriter<T> createWriter() throws IOException {
>>>>>>>>         return ProtoParquetWriter
>>>>>>>>                 .<T>builder(path)
>>>>>>>>                 .withPageSize(pageSize)
>>>>>>>>                 .withCompressionCodec(compressionCodecName)
>>>>>>>>                 .withProtoClass(protoClass)
>>>>>>>>                 .build();
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> Rafi
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>
>>> --
>>> Kostas Kloudas | Software Engineer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>> --
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>> Stream Processing | Event Driven | Real Time
>>> --
>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>> --
>>> Data Artisans GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>
>>
>
> --
> Kostas Kloudas | Software Engineer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
> --
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
>
>

Reply via email to