Btw, I ran the same exact code on a local Flink cluster run with `./bin/start-cluster.sh` on my local machine. With `s3a` it did not work, the part files do not roll over; however, with the local filesystem it works perfectly. Should I be looking at the S3Committer in Flink to see if there is something odd going on?
On Thu, Apr 9, 2020 at 7:49 AM Roshan Punnoose <[email protected]> wrote: > Nope just the s3a. I'll keep looking around to see if there is anything > else I can see. If you think of anything else to try, let me know. > > On Thu, Apr 9, 2020, 7:41 AM Kostas Kloudas <[email protected]> wrote: > >> It should not be a problem because from what you posted, you are using >> "s3a" as the scheme for s3. >> Are you using "s3p" for Presto? This should also be done in order for >> Flink to understand where to use the one or the other. >> >> On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose <[email protected]> wrote: >> > >> > Lastly, could it be the way I built the flink image for kube? I added >> both the presto and Hadoop plugins >> > >> > On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <[email protected]> wrote: >> >> >> >> Sorry realized this came off the user list by mistake. Adding the >> thread back in. >> >> >> >> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <[email protected]> >> wrote: >> >>> >> >>> Yes sorry, no errors on the task manager. However, I am new to flink >> so don't know all the places to look for the logs. Been looking at the task >> manager logs and don't see any exceptions there. Not sure where to look for >> s3 exceptions in particular. >> >>> >> >>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <[email protected]> >> wrote: >> >>>> >> >>>> Yes, this is why I reached out for further information. >> >>>> >> >>>> Incrementing the part counter is the responsibility of the >> >>>> StreamingFileSink, whose code is FS-agnostic, so it should also fail >> >>>> in the local FS. >> >>>> Now if it is on the S3 side, it would help if you have any more info, >> >>>> for example any logs from S3, to see if anything went wrong on their >> >>>> end. >> >>>> >> >>>> So your logs refer to normal execution, i.e. no failures and no >> >>>> restarting, right? >> >>>> >> >>>> Cheers, >> >>>> Kostas >> >>>> >> >>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <[email protected]> >> wrote: >> >>>> > >> >>>> > Surprisingly the same code running against the local filesystem >> works perfectly. The part counter increments correctly. >> >>>> > >> >>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <[email protected]> >> wrote: >> >>>> >> >> >>>> >> Hi Roshan, >> >>>> >> >> >>>> >> Your logs refer to a simple run without any failures or re-running >> >>>> >> from a savepoint, right? >> >>>> >> >> >>>> >> I am asking because I am trying to reproduce it by running a >> modified >> >>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot. >> >>>> >> The ITCase runs against the local filesystem, and not S3, but I >> added >> >>>> >> the OutputFileConfig and it seems that the part counter is >> increases >> >>>> >> as expected. >> >>>> >> >> >>>> >> Is there any other information that would help us reproduce the >> issue? >> >>>> >> >> >>>> >> Cheers, >> >>>> >> Kostas >> >>>> >> >> >>>> >> [1] >> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java >> >>>> >> >> >>>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <[email protected]> >> wrote: >> >>>> >> > >> >>>> >> > Hi, >> >>>> >> > >> >>>> >> > I am trying to get the parquet writer to write to s3; however, >> the files do not seem to be rolling over. The same file "part-0-0.parquet" >> is being created each time. Like the 'partCounter" is not being updated? >> Maybe the Bucket is being recreated each time? I don't really know... Here >> are some logs: >> >>>> >> > >> >>>> >> > 2020-04-09 01:28:10,350 INFO >> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask >> 0 checkpointing for checkpoint with id=2 (max part counter=2). >> >>>> >> > 2020-04-09 01:28:10,589 INFO >> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask >> 0 received completion notification for checkpoint with id=2. >> >>>> >> > 2020-04-09 01:28:10,589 INFO >> org.apache.flink.fs.s3.common.writer.S3Committer - Committing >> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID >> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI >> >>>> >> > 2020-04-09 01:29:10,350 INFO >> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask >> 0 checkpointing for checkpoint with id=3 (max part counter=3). >> >>>> >> > 2020-04-09 01:29:10,520 INFO >> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask >> 0 received completion notification for checkpoint with id=3. >> >>>> >> > 2020-04-09 01:29:10,521 INFO >> org.apache.flink.fs.s3.common.writer.S3Committer - Committing >> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID >> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI >> >>>> >> > And a part of my code: >> >>>> >> > >> >>>> >> > ``` >> >>>> >> > >> >>>> >> > StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >>>> >> > >> >>>> >> > // env.setParallelism(2); >> >>>> >> > env.enableCheckpointing(60000L); >> >>>> >> > ///PROPERTIES Added >> >>>> >> > Schema schema = bro_conn.getClassSchema(); >> >>>> >> > >> >>>> >> > OutputFileConfig config = OutputFileConfig >> >>>> >> > .builder() >> >>>> >> > .withPartSuffix(".parquet") >> >>>> >> > .build(); >> >>>> >> > >> >>>> >> > final StreamingFileSink<GenericRecord> sink = >> StreamingFileSink >> >>>> >> > .forBulkFormat(new >> Path("s3a://<bucket>/bro_conn/"), >> ParquetAvroWriters.forGenericRecord(schema)) >> >>>> >> > // >> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >> >>>> >> > .withOutputFileConfig(config) >> >>>> >> > // .withBucketAssigner(new >> PartitioningBucketAssigner()) >> >>>> >> > .build(); >> >>>> >> > >> >>>> >> > DataStream<String> kinesis = env.addSource(new >> FlinkKinesisConsumer<>( >> >>>> >> > "kinesis", new SimpleStringSchema(), >> consumerConfig)); >> >>>> >> > >> >>>> >> > kinesis.flatMap(new JsonAvroParser()) >> >>>> >> > .addSink(sink); >> >>>> >> > >> >>>> >> > >> >>>> >> > env.execute("Bro Conn"); >> >>>> >> > >> >>>> >> > ``` >> >>>> >> > >> >>>> >> > I'm using Flink 1.10.0, and running in Kubernetes. I also >> created a custom image to add the presto/hadoop plugin. >> >>>> >> > >> >>>> >> > Thanks again! >> >
