Lastly, could it be the way I built the flink image for kube? I added both
the presto and Hadoop plugins

On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <[email protected]> wrote:

> Sorry realized this came off the user list by mistake. Adding the thread
> back in.
>
> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <[email protected]> wrote:
>
>> Yes sorry, no errors on the task manager. However, I am new to flink so
>> don't know all the places to look for the logs. Been looking at the task
>> manager logs and don't see any exceptions there. Not sure where to look for
>> s3 exceptions in particular.
>>
>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <[email protected]> wrote:
>>
>>> Yes, this is why I reached out for further information.
>>>
>>> Incrementing the part counter is the responsibility of the
>>> StreamingFileSink, whose code is FS-agnostic, so it should also fail
>>> in the local FS.
>>> Now if it is on the S3 side, it would help if you have any more info,
>>> for example any logs from S3, to see if anything went wrong on their
>>> end.
>>>
>>> So your logs refer to normal execution, i.e. no failures and no
>>> restarting, right?
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <[email protected]>
>>> wrote:
>>> >
>>> > Surprisingly the same code running against the local filesystem works
>>> perfectly. The part counter increments correctly.
>>> >
>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <[email protected]>
>>> wrote:
>>> >>
>>> >> Hi Roshan,
>>> >>
>>> >> Your logs refer to a simple run without any failures or re-running
>>> >> from a savepoint, right?
>>> >>
>>> >> I am asking because I am trying to reproduce it by running a modified
>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>>> >> The ITCase runs against the local filesystem, and not S3, but I added
>>> >> the OutputFileConfig and it seems that the part counter is increases
>>> >> as expected.
>>> >>
>>> >> Is there any other information that would help us reproduce the issue?
>>> >>
>>> >> Cheers,
>>> >> Kostas
>>> >>
>>> >> [1]
>>> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>>> >>
>>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <[email protected]>
>>> wrote:
>>> >> >
>>> >> > Hi,
>>> >> >
>>> >> > I am trying to get the parquet writer to write to s3; however, the
>>> files do not seem to be rolling over. The same file "part-0-0.parquet" is
>>> being created each time. Like the 'partCounter" is not being updated? Maybe
>>> the Bucket is being recreated each time? I don't really know... Here are
>>> some logs:
>>> >> >
>>> >> > 2020-04-09 01:28:10,350 INFO
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>>> 0 checkpointing for checkpoint with id=2 (max part counter=2).
>>> >> > 2020-04-09 01:28:10,589 INFO
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>>> 0 received completion notification for checkpoint with id=2.
>>> >> > 2020-04-09 01:28:10,589 INFO
>>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
>>> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID
>>> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>> >> > 2020-04-09 01:29:10,350 INFO
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>>> 0 checkpointing for checkpoint with id=3 (max part counter=3).
>>> >> > 2020-04-09 01:29:10,520 INFO
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>>> 0 received completion notification for checkpoint with id=3.
>>> >> > 2020-04-09 01:29:10,521 INFO
>>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
>>> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID
>>> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>> >> > And a part of my code:
>>> >> >
>>> >> > ```
>>> >> >
>>> >> > StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> >> >
>>> >> > //        env.setParallelism(2);
>>> >> >         env.enableCheckpointing(60000L);
>>> >> > ///PROPERTIES Added
>>> >> >         Schema schema = bro_conn.getClassSchema();
>>> >> >
>>> >> >         OutputFileConfig config = OutputFileConfig
>>> >> >                 .builder()
>>> >> >                 .withPartSuffix(".parquet")
>>> >> >                 .build();
>>> >> >
>>> >> >         final StreamingFileSink<GenericRecord> sink =
>>> StreamingFileSink
>>> >> >                 .forBulkFormat(new
>>> Path("s3a://<bucket>/bro_conn/"),
>>> ParquetAvroWriters.forGenericRecord(schema))
>>> >> > //
>>> .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>> >> >                 .withOutputFileConfig(config)
>>> >> > //                .withBucketAssigner(new
>>> PartitioningBucketAssigner())
>>> >> >                 .build();
>>> >> >
>>> >> >         DataStream<String> kinesis = env.addSource(new
>>> FlinkKinesisConsumer<>(
>>> >> >                 "kinesis", new SimpleStringSchema(),
>>> consumerConfig));
>>> >> >
>>> >> >         kinesis.flatMap(new JsonAvroParser())
>>> >> >                 .addSink(sink);
>>> >> >
>>> >> >
>>> >> >         env.execute("Bro Conn");
>>> >> >
>>> >> > ```
>>> >> >
>>> >> > I'm using Flink 1.10.0, and running in Kubernetes. I also created a
>>> custom image to add the presto/hadoop plugin.
>>> >> >
>>> >> > Thanks again!
>>>
>>

Reply via email to