Lastly, could it be the way I built the flink image for kube? I added both the presto and Hadoop plugins
On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <[email protected]> wrote: > Sorry realized this came off the user list by mistake. Adding the thread > back in. > > On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <[email protected]> wrote: > >> Yes sorry, no errors on the task manager. However, I am new to flink so >> don't know all the places to look for the logs. Been looking at the task >> manager logs and don't see any exceptions there. Not sure where to look for >> s3 exceptions in particular. >> >> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <[email protected]> wrote: >> >>> Yes, this is why I reached out for further information. >>> >>> Incrementing the part counter is the responsibility of the >>> StreamingFileSink, whose code is FS-agnostic, so it should also fail >>> in the local FS. >>> Now if it is on the S3 side, it would help if you have any more info, >>> for example any logs from S3, to see if anything went wrong on their >>> end. >>> >>> So your logs refer to normal execution, i.e. no failures and no >>> restarting, right? >>> >>> Cheers, >>> Kostas >>> >>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <[email protected]> >>> wrote: >>> > >>> > Surprisingly the same code running against the local filesystem works >>> perfectly. The part counter increments correctly. >>> > >>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <[email protected]> >>> wrote: >>> >> >>> >> Hi Roshan, >>> >> >>> >> Your logs refer to a simple run without any failures or re-running >>> >> from a savepoint, right? >>> >> >>> >> I am asking because I am trying to reproduce it by running a modified >>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot. >>> >> The ITCase runs against the local filesystem, and not S3, but I added >>> >> the OutputFileConfig and it seems that the part counter is increases >>> >> as expected. >>> >> >>> >> Is there any other information that would help us reproduce the issue? >>> >> >>> >> Cheers, >>> >> Kostas >>> >> >>> >> [1] >>> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java >>> >> >>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <[email protected]> >>> wrote: >>> >> > >>> >> > Hi, >>> >> > >>> >> > I am trying to get the parquet writer to write to s3; however, the >>> files do not seem to be rolling over. The same file "part-0-0.parquet" is >>> being created each time. Like the 'partCounter" is not being updated? Maybe >>> the Bucket is being recreated each time? I don't really know... Here are >>> some logs: >>> >> > >>> >> > 2020-04-09 01:28:10,350 INFO >>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask >>> 0 checkpointing for checkpoint with id=2 (max part counter=2). >>> >> > 2020-04-09 01:28:10,589 INFO >>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask >>> 0 received completion notification for checkpoint with id=2. >>> >> > 2020-04-09 01:28:10,589 INFO >>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing >>> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID >>> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI >>> >> > 2020-04-09 01:29:10,350 INFO >>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask >>> 0 checkpointing for checkpoint with id=3 (max part counter=3). >>> >> > 2020-04-09 01:29:10,520 INFO >>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask >>> 0 received completion notification for checkpoint with id=3. >>> >> > 2020-04-09 01:29:10,521 INFO >>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing >>> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID >>> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI >>> >> > And a part of my code: >>> >> > >>> >> > ``` >>> >> > >>> >> > StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >> > >>> >> > // env.setParallelism(2); >>> >> > env.enableCheckpointing(60000L); >>> >> > ///PROPERTIES Added >>> >> > Schema schema = bro_conn.getClassSchema(); >>> >> > >>> >> > OutputFileConfig config = OutputFileConfig >>> >> > .builder() >>> >> > .withPartSuffix(".parquet") >>> >> > .build(); >>> >> > >>> >> > final StreamingFileSink<GenericRecord> sink = >>> StreamingFileSink >>> >> > .forBulkFormat(new >>> Path("s3a://<bucket>/bro_conn/"), >>> ParquetAvroWriters.forGenericRecord(schema)) >>> >> > // >>> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >>> >> > .withOutputFileConfig(config) >>> >> > // .withBucketAssigner(new >>> PartitioningBucketAssigner()) >>> >> > .build(); >>> >> > >>> >> > DataStream<String> kinesis = env.addSource(new >>> FlinkKinesisConsumer<>( >>> >> > "kinesis", new SimpleStringSchema(), >>> consumerConfig)); >>> >> > >>> >> > kinesis.flatMap(new JsonAvroParser()) >>> >> > .addSink(sink); >>> >> > >>> >> > >>> >> > env.execute("Bro Conn"); >>> >> > >>> >> > ``` >>> >> > >>> >> > I'm using Flink 1.10.0, and running in Kubernetes. I also created a >>> custom image to add the presto/hadoop plugin. >>> >> > >>> >> > Thanks again! >>> >>
