Re: Dynamic file name prefix - StreamingFileSink

2020-10-13 Thread Ravi Bhushan Ratnakar
Hi Vijayendra,

OutputFileConfig provides a builder method to create immutable objects
with given 'prefix' and 'suffix'. The parameter which you are passing
to '*withPartPrefix*' will only be evaluated at the time of calling
this method '*withPartPrefix*'. So if you want to achieve a dynamic
'prefix' or 'suffix' then you may try to have your own custom
implementation of 'OutputFileConfig' which could provide a way to set
function definition for 'prefix' or 'suffix'. For the same, I am
attaching you a sample implementation. Kindly make sure that the
function definition which you are passing is serializable.


Use like this

val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig()
  
.withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))
  .withPartSuffixFunction(()=> ".ext")


Regards,
Ravi

On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav 
wrote:

> Hi Team,
>
> I have tried to assign a dynamic prefix for file name, which contains
> datetime components.
> *The Problem is Job always takes initial datetime when job first starts
> and never refreshes later. *
> *How can I get dynamic current datetime in filename at sink time ?*
>
> *.withPartPrefix
> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))*
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>
> val config = OutputFileConfig
>  .builder() .withPartPrefix("prefix")
>  .withPartSuffix(".ext")
>  .build()
> val sink = StreamingFileSink
>  .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
>  .withBucketAssigner(new KeyBucketAssigner())
>  .withRollingPolicy(OnCheckpointRollingPolicy.build()) 
> .withOutputFileConfig(config)
>  .build()
>
>
package com.example

import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig

class DynamicOutputFileConfig(partPrefix:String,partSuffix:String) extends OutputFileConfig(partPrefix,partSuffix) with Serializable {

  private var partPrefixFunction:() => String = _
  private var partSuffixFunction:() => String = _

  def this(){
this("","")
  }


  override def getPartPrefix: String = if(partPrefixFunction == null) partPrefix else partPrefixFunction.apply()

  /**
* The suffix for the part name.
*/
  override def getPartSuffix: String = if(partSuffixFunction == null) partSuffix else partSuffixFunction.apply()

  def withPartPrefixFunction(fun:() => String):DynamicOutputFileConfig={
this.partPrefixFunction = fun
this
  }

  def withPartSuffixFunction(fun:() => String):DynamicOutputFileConfig={
this.partSuffixFunction = fun
this
  }
}


Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

2020-07-30 Thread Ravi Bhushan Ratnakar
Hi Vijayendra,

There is an issue with the CustomeAvroWriters.java which i shared with you
earlier, i am sending you the fixed version, hope this will resolve the
issue of reading it from the avro tool.

Please use below supported possible string value for codecName

null - for nullCodec
deflate - for deflateCodec
snappy - for snappyCodec
bzip2 - for bzip2Codec
xz - for xzCodec


Regards,
Ravi

On Thu, Jul 30, 2020 at 8:21 AM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:

> If it is possible, please share the sample output file.
> Regards,
> Ravi
>
> On Thu, Jul 30, 2020 at 3:17 AM Vijayendra Yadav 
> wrote:
>
>> Hi Ravi,
>>
>> With CustomAvroWriters (SNAPPY) when I run on a cluster, it does create
>> files, but files are not recognized as avro files by avro tools jar, when I
>> try to deserialize it to json.
>>
>> Flink Logs shows:
>> 2020-07-29 23:54:23,270 INFO com.hadoop.compression.lzo.LzoCodec -
>> Successfully loaded & initialized native-lzo library [hadoop-lzo rev
>> ff8f5709577defb6b78cdc1f98cfe129c4b6fe46]
>> 2020-07-29 23:54:23,277 INFO org.apache.hadoop.io.compress.CodecPool - *Got
>> brand-new compressor [.snappy]*
>>
>> 020-07-29 23:54:28,931 INFO
>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
>> la/gold/vdcs_gold/test/bob/raw/year=2020/month=07/day=29/hour=23/ip-10-223-69-238-2020-07-29-23-54-00-121-5e51c2df-1-0.avro
>> with MPU ID
>>
>> *Avro tools:*
>>
>> java -jar avro-tools-1.7.4.jar *tojson*
>> /tmp/test-s3-flink-new/raw/year\=2020/month\=07/day\=29/hour\=20/ubuntu-2020-07-29-20-35-50-746-87802bc3-2-2.avro
>>
>> *Exception in thread "main" java.io.IOException: Not an Avro data file*
>>
>>
>> Am I missing something ?
>>
>> Regards,
>> Vijay
>>
>>
>>
>> On Wed, Jul 29, 2020 at 2:08 PM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Ravi,
>>>
>>> Thanks for details. CustomAvrowriter was working for now.  Although its
>>> failing for Snappy codec in local IDE with "java.lang.UnsatisfiedLinkError:
>>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z"
>>> I think I will have to try running it in an EMR/Hadoop environment to
>>> get the SNAPPY library resolved.
>>>
>>> *About this another approach of AvroOutputFormat.*
>>>
>>> Does it fit in streamingfilesink API?
>>>
>>> StreamingFileSink.forBulkFormat(new Path(outPath),CustomAvroWriters
>>> .forGenericRecord(schema, codecName))
>>>
>>> Or its different api. Could you send one sample if you have one for
>>> another sink api.
>>>
>>> Regards,
>>> Vijay
>>>
>>> On Wed, Jul 29, 2020 at 12:45 PM Ravi Bhushan Ratnakar <
>>> ravibhushanratna...@gmail.com> wrote:
>>>
>>>> There is another alternative which you could try like this
>>>>
>>>> val stream:DataStream[GenericRecord] = _
>>>>
>>>> val aof:AvroOutputFormat[GenericRecord] = new AvroOutputFormat(new 
>>>> org.apache.flink.core.fs.Path(""),classOf[GenericRecord])
>>>>
>>>> aof.setSchema(schema)
>>>>
>>>> aof.setCodec(AvroOutputFormat.Codec.SNAPPY)
>>>>
>>>> stream:DataStream.writeUsingOutputFormat(aof)
>>>>
>>>> Regards,
>>>>
>>>> Ravi
>>>>
>>>>
>>>>
>>>> On Wed, Jul 29, 2020 at 9:12 PM Ravi Bhushan Ratnakar <
>>>> ravibhushanratna...@gmail.com> wrote:
>>>>
>>>>> Hi Vijayendra,
>>>>>
>>>>> Currently AvroWriters doesn't support compression. If you want to use
>>>>> compression then you need to have a custom implementation of AvroWriter
>>>>> where you can add features of compression. Please find a sample
>>>>> customization for AvroWriters where you could use compression. You can use
>>>>> the example below.
>>>>>
>>>>> codeName = org.apache.hadoop.io.compress.SnappyCodec
>>>>>
>>>>> CustomAvroWriters.forGenericRecord(schema, codeName)
>>>>>
>>>>> Regards,
>>>>> Ravi
>>>>>
>>>>> On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav <
>>>>> contact@gmail.com> wrote:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> Could you please provide a sample for Enabling Compression (Snappy

Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

2020-07-29 Thread Ravi Bhushan Ratnakar
There is another alternative which you could try like this

val stream:DataStream[GenericRecord] = _

val aof:AvroOutputFormat[GenericRecord] = new AvroOutputFormat(new
org.apache.flink.core.fs.Path(""),classOf[GenericRecord])

aof.setSchema(schema)

aof.setCodec(AvroOutputFormat.Codec.SNAPPY)

stream:DataStream.writeUsingOutputFormat(aof)

Regards,

Ravi



On Wed, Jul 29, 2020 at 9:12 PM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:

> Hi Vijayendra,
>
> Currently AvroWriters doesn't support compression. If you want to use
> compression then you need to have a custom implementation of AvroWriter
> where you can add features of compression. Please find a sample
> customization for AvroWriters where you could use compression. You can use
> the example below.
>
> codeName = org.apache.hadoop.io.compress.SnappyCodec
>
> CustomAvroWriters.forGenericRecord(schema, codeName)
>
> Regards,
> Ravi
>
> On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav 
> wrote:
>
>> Hi Team,
>>
>> Could you please provide a sample for Enabling Compression (Snappy) of
>> Avro:
>> DataStream[GenericRecord]
>>
>> AvroWriters.forGenericRecord(schema)
>>
>> Regards,
>> Vijay
>>
>


Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

2020-07-29 Thread Ravi Bhushan Ratnakar
Hi Vijayendra,

Currently AvroWriters doesn't support compression. If you want to use
compression then you need to have a custom implementation of AvroWriter
where you can add features of compression. Please find a sample
customization for AvroWriters where you could use compression. You can use
the example below.

codeName = org.apache.hadoop.io.compress.SnappyCodec

CustomAvroWriters.forGenericRecord(schema, codeName)

Regards,
Ravi

On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav 
wrote:

> Hi Team,
>
> Could you please provide a sample for Enabling Compression (Snappy) of
> Avro:
> DataStream[GenericRecord]
>
> AvroWriters.forGenericRecord(schema)
>
> Regards,
> Vijay
>
package org.apache.flink.formats.avro;

import java.io.IOException;
import java.io.OutputStream;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;

public class CustomAvroWriters {
public static  AvroWriterFactory forSpecificRecord(Class type) {
String schemaString = SpecificData.get().getSchema(type).toString();
AvroBuilder builder = (out) -> {
return createAvroDataFileWriter(schemaString, SpecificDatumWriter::new, out);
};
return new AvroWriterFactory(builder);
}

public static AvroWriterFactory forGenericRecord(Schema schema) {
String schemaString = schema.toString();
AvroBuilder builder = (out) -> {
return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out);
};
return new AvroWriterFactory(builder);
}

public static AvroWriterFactory forGenericRecord(Schema schema, String codecName) {
String schemaString = schema.toString();
AvroBuilder builder = (out) -> {
return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out, codecName);
};
return new AvroWriterFactory(builder);
}

public static  AvroWriterFactory forReflectRecord(Class type) {
String schemaString = ReflectData.get().getSchema(type).toString();
AvroBuilder builder = (out) -> {
return createAvroDataFileWriter(schemaString, ReflectDatumWriter::new, out);
};
return new AvroWriterFactory(builder);
}

private static  DataFileWriter createAvroDataFileWriter(String schemaString, Function> datumWriterFactory, OutputStream out) throws IOException {
Schema schema = (new Parser()).parse(schemaString);
DatumWriter datumWriter = (DatumWriter)datumWriterFactory.apply(schema);
DataFileWriter dataFileWriter = new DataFileWriter(datumWriter);
dataFileWriter.create(schema, out);
return dataFileWriter;
}

private static  DataFileWriter createAvroDataFileWriter(String schemaString, Function> datumWriterFactory, OutputStream out, String codecName) throws IOException {
Schema schema = (new Parser()).parse(schemaString);
DatumWriter datumWriter = (DatumWriter)datumWriterFactory.apply(schema);
DataFileWriter dataFileWriter = new DataFileWriter(datumWriter);
CompressionCodec codec = getCompressionCodec(codecName);
CompressionOutputStream cos = codec.createOutputStream(out);
dataFileWriter.create(schema, cos);
return dataFileWriter;
}

private CustomAvroWriters() {
}

private static CompressionCodec getCompressionCodec(String codecName) {
CompressionCodecFactory codecFactory  = new CompressionCodecFactory(new Configuration());
CompressionCodec codec = codecFactory.getCodecByName(codecName);
if (codec == null) {
try {
codec = (CompressionCodec) Class.forName(codecName).newInstance();
}catch(Exception ex) {
throw new RuntimeException("Codec " + codecName + " not found.",ex);
}
}
return codec;
}
}


Re: Compression Streamingfilesink ROW-encoded format

2020-07-29 Thread Ravi Bhushan Ratnakar
Yes, flink-compress module is  supported from  1.10.0 and onward.

Regards,
Ravi

On Tue 28 Jul, 2020, 23:11 Vijayendra Yadav,  wrote:

> Thank You Ravi for Quick help. One Last Question is this compression
> supported with Flink Version 1.10.0 ?
>
> Regards,
> Vijay
>
> On Tue, Jul 28, 2020 at 1:20 PM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> Hi Vijayendra,
>>
>> As far as rowFormat is concerned, it doesn't support compression.
>>
>>
>> Regards,
>> Ravi
>>
>> On Tue 28 Jul, 2020, 22:08 Vijayendra Yadav, 
>> wrote:
>>
>>> Hi Ravi,
>>>
>>> Thanks for your response. But your example is for *forBulkForma**t*.
>>> How about  *forRowFormat* ?.
>>>
>>> Regards,
>>> Vijay
>>>
>>> On Tue, Jul 28, 2020 at 11:28 AM Ravi Bhushan Ratnakar <
>>> ravibhushanratna...@gmail.com> wrote:
>>>
>>>> Hi Vijayendra,
>>>>
>>>> You could achieve row encoded with like this as well
>>>>
>>>> codecName = "org.apache.hadoop.io.compress.GzipCodec"
>>>>
>>>> val streamingFileSink:StreamingFileSink[String] = 
>>>> StreamingFileSink.forBulkFormat(new 
>>>> Path(outputPath),CompressWriters.forExtractor(new 
>>>> DefaultExtractor[String]).withHadoopCompression(codecName)).build()
>>>>
>>>> Regards,
>>>> Ravi
>>>>
>>>> On Tue, Jul 28, 2020 at 8:03 PM Vijayendra Yadav 
>>>> wrote:
>>>>
>>>>> Hi Team,
>>>>>
>>>>> Is there a way to enable compression in StreamingFileSink API for
>>>>> Row-encoded formats ?.
>>>>>
>>>>> val sink: StreamingFileSink[String] = StreamingFileSink
>>>>> .forRowFormat(new Path(outputPath), new
>>>>> SimpleStringEncoder[String]("UTF-8"))
>>>>>
>>>>>
>>>>> Regards,
>>>>> Vijay
>>>>>
>>>>


Re: Compression Streamingfilesink ROW-encoded format

2020-07-28 Thread Ravi Bhushan Ratnakar
Hi Vijayendra,

As far as rowFormat is concerned, it doesn't support compression.


Regards,
Ravi

On Tue 28 Jul, 2020, 22:08 Vijayendra Yadav,  wrote:

> Hi Ravi,
>
> Thanks for your response. But your example is for *forBulkForma**t*. How
> about  *forRowFormat* ?.
>
> Regards,
> Vijay
>
> On Tue, Jul 28, 2020 at 11:28 AM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> Hi Vijayendra,
>>
>> You could achieve row encoded with like this as well
>>
>> codecName = "org.apache.hadoop.io.compress.GzipCodec"
>>
>> val streamingFileSink:StreamingFileSink[String] = 
>> StreamingFileSink.forBulkFormat(new 
>> Path(outputPath),CompressWriters.forExtractor(new 
>> DefaultExtractor[String]).withHadoopCompression(codecName)).build()
>>
>> Regards,
>> Ravi
>>
>> On Tue, Jul 28, 2020 at 8:03 PM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Team,
>>>
>>> Is there a way to enable compression in StreamingFileSink API for
>>> Row-encoded formats ?.
>>>
>>> val sink: StreamingFileSink[String] = StreamingFileSink
>>> .forRowFormat(new Path(outputPath), new
>>> SimpleStringEncoder[String]("UTF-8"))
>>>
>>>
>>> Regards,
>>> Vijay
>>>
>>


Re: Compression Streamingfilesink ROW-encoded format

2020-07-28 Thread Ravi Bhushan Ratnakar
Hi Vijayendra,

You could achieve row encoded with like this as well

codecName = "org.apache.hadoop.io.compress.GzipCodec"

val streamingFileSink:StreamingFileSink[String] =
StreamingFileSink.forBulkFormat(new
Path(outputPath),CompressWriters.forExtractor(new
DefaultExtractor[String]).withHadoopCompression(codecName)).build()

Regards,
Ravi

On Tue, Jul 28, 2020 at 8:03 PM Vijayendra Yadav 
wrote:

> Hi Team,
>
> Is there a way to enable compression in StreamingFileSink API for
> Row-encoded formats ?.
>
> val sink: StreamingFileSink[String] = StreamingFileSink
> .forRowFormat(new Path(outputPath), new
> SimpleStringEncoder[String]("UTF-8"))
>
>
> Regards,
> Vijay
>


Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-11-21 Thread Ravi Bhushan Ratnakar
Hi congxian,

Thank you for your reply. As I shared details in my previous mail, in my
case, last successful checkpoint is missing details for some of the shards.
I am not doing any up scale or down scale of kinesis shard. I always run
with fix number of shards, so there is no possibility of new shard
discovery which could cause such problem.

Thanks,
Ravi



On Fri 22 Nov, 2019, 02:53 Congxian Qiu,  wrote:

> Hi
>
> For idle shards, I think restore from the previous not consumed data is
> ok, because Flink did not consume any data before, but for not idle shards
> this is problematic. From my experience of other connectors, could you
> check whether the "error" shards are newly split? maybe the newly split
> shards were not contained in the checkpoint.
>
> Best,
> Congxian
>
>
> Steven Nelson  于2019年10月17日周四 上午2:19写道:
>
>> In my situation I believe it's because we have idle shards (it's a
>> testing environment). I dug into the connector code and it looks like it
>> only updates the shard state when a record is processed or when the shard
>> hits shard_end. So, for an idle shard it would never get a checkpointed
>> state. I guess this is okay since in production we won't have idle shards,
>> but it might be better to send through a empty record that doesn't get
>> emitted, but it does trigger a state update.
>>
>> -Steve
>>
>>
>> On Wed, Oct 16, 2019 at 12:54 PM Ravi Bhushan Ratnakar <
>> ravibhushanratna...@gmail.com> wrote:
>>
>>> Do you know step by step process to reproduce this problem?
>>>
>>> -Ravi
>>>
>>>
>>> On Wed 16 Oct, 2019, 17:40 Steven Nelson, 
>>> wrote:
>>>
>>>> I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2.
>>>>
>>>> About half my shards start over at trim horizon. Why would some shard
>>>> statuses appear to not exist in a savepoints? This seems like a big 
>>>> problem.
>>>>
>>>> -Steve
>>>>
>>>> On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar <
>>>> ravibhushanratna...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am also facing the same problem. I am using Flink 1.9.0 and
>>>>> consuming from Kinesis source with retention of 1 day. I am observing that
>>>>> when the job is submitted with "latest" initial stream position, the job
>>>>> starts well and keep on processing data from all the shards for very long
>>>>> period of time without any lag. When the job fails then it also recovery
>>>>> well with last successful checkpointed state. But i am also experiencing
>>>>> that very rarely when the job fails and it recovers from the last
>>>>> successful checkpointed state, i noticed a hug lag( 1 day as per 
>>>>> retention)
>>>>> on one of the stream. For me, to reproduce this issue is still unknown to
>>>>> defined a step by step process.
>>>>>
>>>>> So far now, as per the analysis i gathered some  more information by
>>>>> customizing the FlinkKinesisConsumer to put additional log message, I
>>>>> noticed that the number of shards details which is loaded from checkpoint
>>>>> data during recovering is less than than the actual number of shards in 
>>>>> the
>>>>> stream. I have fixed number of shards in kinesis stream.
>>>>>
>>>>> i added one line of debug log at line 408 to print the size of
>>>>> variable "sequenceNumsToRestore" which was populated with shard
>>>>> details from checkpoint data.
>>>>>
>>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408
>>>>>
>>>>> In this consumer class, when the "run" method is called, it does
>>>>> following
>>>>>
>>>>>-  it discover shards from kinesis stream and selects all those
>>>>>shards which a subtask can scheduled
>>>>>- then one by one it iterates over the discovers shards and checks
>>>>>that whether that shards state is available in recovered state
>>>>>"sequenceNumsToRestore"
>>>>>
>>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/Fl

Re: Issue with BulkWriter

2019-10-22 Thread Ravi Bhushan Ratnakar
Hi,

If possible, kindly share one output file to inspect, in the meanwhile you
could also give a try with "org.apache.hadoop.io.compress.GzipCodec"

Regards,
Ravi

On Tue, Oct 22, 2019 at 7:25 PM amran dean  wrote:

>
> Hello,
> These changes result in the following error:
> $ lzop -d part-1-0
> lzop: part-1-0: not a lzop file
>
>
> public class BulkRecordLZOSerializer implements BulkWriter {
>
> private final CompressionOutputStream compressedStream;
>
> public BulkRecordLZOSerializer(OutputStream stream) {
> CompressionCodecFactory factory = new CompressionCodecFactory(new 
> Configuration());
> try {
> compressedStream = 
> factory.getCodecByClassName("com.hadoop.compression.lzo.LzoCodec").createOutputStream(stream);
> } catch (IOException e) {
> throw new IllegalStateException("Unable to create LZO 
> OutputStream");
> }
> }
>
> public void addElement(KafkaRecord record) throws IOException {
> compressedStream.write(record.getValue());
> compressedStream.write('\n');
> }
>
> public void finish() throws IOException {
> compressedStream.flush();
> compressedStream.finish();
> }
>
> public void flush() throws IOException {
> compressedStream.flush();
> }
> }
>
>
> On Mon, Oct 21, 2019 at 11:17 PM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> Hi,
>>
>> Seems like that you want to use "com.hadoop.compression.lzo.LzoCodec"
>> instead of "com.hadoop.compression.lzo.*LzopCodec*" in the below line.
>>
>> compressedStream = 
>> factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
>>
>>
>> Regarding "lzop: unexpected end of file" problem, kindly add
>> "compressedStream.flush()" in the below method to flush any leftover data
>> before finishing.
>>
>> public void finish() throws IOException {
>>   compressedStream.flush();
>>   compressedStream.finish();
>> }
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/serialization/BulkWriter.html#finish--
>>
>> Regards,
>> Ravi
>>
>> On Tue, Oct 22, 2019 at 4:10 AM amran dean 
>> wrote:
>>
>>> Hello,
>>> I'm using BulkWriter to write newline-delimited, LZO-compressed files.
>>> The logic is very straightforward (See code below).
>>>
>>> I am experiencing an issue decompressing the created files created in
>>> this manner, consistently getting "lzop: unexpected end of file". Is this
>>> an issue with caller of BulkWriter?
>>>
>>> (As an aside), using com.hadoop.compression.lzo.LzoCodec instead results
>>> in gibberish. I'm very confused what is going on.
>>>
>>> private final CompressionOutputStream compressedStream;
>>>
>>> public BulkRecordLZOSerializer(OutputStream stream) {
>>> CompressionCodecFactory factory = new CompressionCodecFactory(new 
>>> Configuration());
>>> try {
>>> compressedStream = 
>>> factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
>>> } catch (IOException e) {
>>> throw new IllegalStateException("Unable to create LZO 
>>> OutputStream");
>>> }
>>> }
>>>
>>> public void addElement(KafkaRecord record) throws IOException {
>>> compressedStream.write(record.getValue());
>>> compressedStream.write('\n');
>>> }
>>>
>>> public void finish() throws IOException {
>>> compressedStream.finish();
>>> }
>>>
>>> public void flush() throws IOException {
>>> compressedStream.flush();
>>> }
>>>
>>>


Re: Issue with BulkWriter

2019-10-22 Thread Ravi Bhushan Ratnakar
Hi,

Seems like that you want to use "com.hadoop.compression.lzo.LzoCodec"
instead of "com.hadoop.compression.lzo.*LzopCodec*" in the below line.

compressedStream =
factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);


Regarding "lzop: unexpected end of file" problem, kindly add
"compressedStream.flush()" in the below method to flush any leftover data
before finishing.

public void finish() throws IOException {
  compressedStream.flush();
  compressedStream.finish();
}

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/serialization/BulkWriter.html#finish--

Regards,
Ravi

On Tue, Oct 22, 2019 at 4:10 AM amran dean  wrote:

> Hello,
> I'm using BulkWriter to write newline-delimited, LZO-compressed files. The
> logic is very straightforward (See code below).
>
> I am experiencing an issue decompressing the created files created in this
> manner, consistently getting "lzop: unexpected end of file". Is this an
> issue with caller of BulkWriter?
>
> (As an aside), using com.hadoop.compression.lzo.LzoCodec instead results
> in gibberish. I'm very confused what is going on.
>
> private final CompressionOutputStream compressedStream;
>
> public BulkRecordLZOSerializer(OutputStream stream) {
> CompressionCodecFactory factory = new CompressionCodecFactory(new 
> Configuration());
> try {
> compressedStream = 
> factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
> } catch (IOException e) {
> throw new IllegalStateException("Unable to create LZO OutputStream");
> }
> }
>
> public void addElement(KafkaRecord record) throws IOException {
> compressedStream.write(record.getValue());
> compressedStream.write('\n');
> }
>
> public void finish() throws IOException {
> compressedStream.finish();
> }
>
> public void flush() throws IOException {
> compressedStream.flush();
> }
>
>


Re: Customize Part file naming (Flink 1.9.0)

2019-10-19 Thread Ravi Bhushan Ratnakar
Hi,

As an alternative, you may use BucketingSink which provides you the
provision to customize suffix/prefix.
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html

Regards,
Ravi

On Sat, Oct 19, 2019 at 3:54 AM amran dean  wrote:

> Hello,
> StreamingFileSink's part file naming convention is not adjustable. It has
> form: *part--. *
>
> My use case for StreamingFileSink is a Kafka -> S3 pipeline, and files are
> read and processed from S3 using spark. In almost all cases, I want to
> compress raw data before writing to S3 using the BulkFormat.
>
> Spark relies on filename extensions to do compression inference, so the
> current naming scheme results in gibberish. I see that 1.10 currently
> provides the ability to customize the suffix/prefix, but I really need an
> alternative solution to this as soon as possible. Can this be backported to
> 1.9, or are there alternatives?
>
>
>


Re: Flink S3 sink unable to compress data

2019-10-18 Thread Ravi Bhushan Ratnakar
Hi,

As per my understanding, Encoder's encode method is called for each and
every message and hence it is not logical to create compressor around given
output stream which will lead into unpredictable erroneous situation.
Encode responsibility is to encode the given object, not to compress. It
seems like at the moment RowFormat does not support compression.

https://issues.apache.org/jira/plugins/servlet/mobile#issue/FLINK-11794

If you want to write compressed output,  you could have your own
implementation for BulkFormat

Regards,
Ravi


On Fri 18 Oct, 2019, 20:30 amran dean,  wrote:

> Hello,
> I am writing a custom S3 object encoder (code here:
> https://pastebin.com/raw/9Ag4ZVpX) used via:
>
> StreamingFileSink dataSink = StreamingFileSink
>   .forRowFormat(new Path("s3a://some_path"), new RecordDataSerializer())
>
>
> During execution, it does not produce any data in S3. This pipeline works
> if the data is not compressed beforehand, and no exceptions are thrown in
> Flink logs or in the webUI. I am confused what is happening and would
> appreciate any help.
>


Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-16 Thread Ravi Bhushan Ratnakar
Do you know step by step process to reproduce this problem?

-Ravi


On Wed 16 Oct, 2019, 17:40 Steven Nelson,  wrote:

> I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2.
>
> About half my shards start over at trim horizon. Why would some shard
> statuses appear to not exist in a savepoints? This seems like a big problem.
>
> -Steve
>
> On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> Hi,
>>
>> I am also facing the same problem. I am using Flink 1.9.0 and consuming
>> from Kinesis source with retention of 1 day. I am observing that when the
>> job is submitted with "latest" initial stream position, the job starts well
>> and keep on processing data from all the shards for very long period of
>> time without any lag. When the job fails then it also recovery well with
>> last successful checkpointed state. But i am also experiencing that very
>> rarely when the job fails and it recovers from the last successful
>> checkpointed state, i noticed a hug lag( 1 day as per retention) on one of
>> the stream. For me, to reproduce this issue is still unknown to defined a
>> step by step process.
>>
>> So far now, as per the analysis i gathered some  more information by
>> customizing the FlinkKinesisConsumer to put additional log message, I
>> noticed that the number of shards details which is loaded from checkpoint
>> data during recovering is less than than the actual number of shards in the
>> stream. I have fixed number of shards in kinesis stream.
>>
>> i added one line of debug log at line 408 to print the size of variable "
>> sequenceNumsToRestore" which was populated with shard details from
>> checkpoint data.
>>
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408
>>
>> In this consumer class, when the "run" method is called, it does following
>>
>>-  it discover shards from kinesis stream and selects all those
>>shards which a subtask can scheduled
>>- then one by one it iterates over the discovers shards and checks
>>that whether that shards state is available in recovered state
>>"sequenceNumsToRestore"
>>
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295
>>- if it is available then it scheduled that shard with the recovered
>>state
>>- if it is not available in the state then it shcedule that shard
>>with "EARLIEST_SEQUENCE_NUMBER"
>>
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308
>>
>> As in my case, the recovered number of shard details from the checkpoint
>> data is less than the actual number of shards which results into scheduling
>> those shards with earliest stream position.
>> I am suspecting that somehow the checkpoint is missing state for some of
>> the shards. But if this is the case then that checkpoint should had failed.
>>
>> Any further information to resolve this issue would be highly
>> appreciated...
>>
>> Regards,
>> Ravi
>>
>> On Wed, Oct 16, 2019 at 5:57 AM Yun Tang  wrote:
>>
>>> Hi Steven
>>>
>>> If you restore savepoint/checkpoint successfully, I think this might due
>>> to the shard wasn't discovered in the previous run, therefore it would be
>>> consumed from the beginning. Please refer to the implementation here: [1]
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307
>>>
>>> Best
>>> Yun Tang
>>> --
>>> *From:* Steven Nelson 
>>> *Sent:* Wednesday, October 16, 2019 4:31
>>> *To:* user 
>>> *Subject:* Kinesis Connector and Savepoint/Checkpoint restore.
>>>
>>> Hello, we currently use Flink 1.9.0 with Kinesis to process data.
>>>
>>> We have extended data retention on the Kinesis stream, which gives us 7
>>> days of data.
>>>
>>> We have found that when a savepoint/checkpoint is restored that it
>>> appears to be restart

Re: Jar Uploads in High Availability (Flink 1.7.2)

2019-10-15 Thread Ravi Bhushan Ratnakar
Hi,

i was also experiencing with the similar behavior. I adopted following
approach

   -  used a distributed file system(in my case aws efs) and set the
   attribute "web.upload.dir", this way both the job manager have same
   location.
   - on the load balancer side(aws elb), i used "readiness probe" based on
   zookeeper entry for active jobmanager address, this way elb always point to
   the active job manager and if the active jobmanager changes then it
   automatically point to the new active jobmanager and as both are using the
   same location by configuring distributed file system so new active job is
   able to find the same jar.


Regards,
Ravi

On Wed, Oct 16, 2019 at 1:15 AM Martin, Nick J [US] (IS) <
nick.mar...@ngc.com> wrote:

> I’m seeing that when I upload a jar through the rest API, it looks like
> only the Jobmanager that received the upload request is aware of the newly
> uploaded jar. That worked fine for me in older versions where all clients
> were redirected to connect to the leader, but now that each Jobmanager
> accepts requests, if I send a jar upload request, it could end up on any
> one (and only one) of the Jobmanagers, not necessarily the leader. Further,
> each Jobmanager responds to a GET request on the /jars endpoint with its
> own local list of jars. If I try and use one of the Jar IDs from that
> request, my next request may not go to the same Jobmanager (requests are
> going through Docker and being load-balanced), and so the Jar ID isn’t
> found on the new Jobmanager handling that request.
>
>
>
>
>
>
>
>
>


Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-15 Thread Ravi Bhushan Ratnakar
Hi,

I am also facing the same problem. I am using Flink 1.9.0 and consuming
from Kinesis source with retention of 1 day. I am observing that when the
job is submitted with "latest" initial stream position, the job starts well
and keep on processing data from all the shards for very long period of
time without any lag. When the job fails then it also recovery well with
last successful checkpointed state. But i am also experiencing that very
rarely when the job fails and it recovers from the last successful
checkpointed state, i noticed a hug lag( 1 day as per retention) on one of
the stream. For me, to reproduce this issue is still unknown to defined a
step by step process.

So far now, as per the analysis i gathered some  more information by
customizing the FlinkKinesisConsumer to put additional log message, I
noticed that the number of shards details which is loaded from checkpoint
data during recovering is less than than the actual number of shards in the
stream. I have fixed number of shards in kinesis stream.

i added one line of debug log at line 408 to print the size of variable "
sequenceNumsToRestore" which was populated with shard details from
checkpoint data.
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408

In this consumer class, when the "run" method is called, it does following

   -  it discover shards from kinesis stream and selects all those shards
   which a subtask can scheduled
   - then one by one it iterates over the discovers shards and checks that
   whether that shards state is available in recovered state
   "sequenceNumsToRestore"
   
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295
   - if it is available then it scheduled that shard with the recovered
   state
   - if it is not available in the state then it shcedule that shard with
   "EARLIEST_SEQUENCE_NUMBER"
   
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308

As in my case, the recovered number of shard details from the checkpoint
data is less than the actual number of shards which results into scheduling
those shards with earliest stream position.
I am suspecting that somehow the checkpoint is missing state for some of
the shards. But if this is the case then that checkpoint should had failed.

Any further information to resolve this issue would be highly appreciated...

Regards,
Ravi

On Wed, Oct 16, 2019 at 5:57 AM Yun Tang  wrote:

> Hi Steven
>
> If you restore savepoint/checkpoint successfully, I think this might due
> to the shard wasn't discovered in the previous run, therefore it would be
> consumed from the beginning. Please refer to the implementation here: [1]
>
> [1]
> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307
>
> Best
> Yun Tang
> --
> *From:* Steven Nelson 
> *Sent:* Wednesday, October 16, 2019 4:31
> *To:* user 
> *Subject:* Kinesis Connector and Savepoint/Checkpoint restore.
>
> Hello, we currently use Flink 1.9.0 with Kinesis to process data.
>
> We have extended data retention on the Kinesis stream, which gives us 7
> days of data.
>
> We have found that when a savepoint/checkpoint is restored that it appears
> to be restarting the Kinesis Consumer from the start of the stream.
> The 
> flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest
> property reports to Prometheus that it is behind by 7 days when the process
> starts back up from a savepoint.
>
> We have some logs that say:
>
> Subtask 3 will start consuming seeded shard 
> StreamShardHandle{streamName='TheStream',
> shard='{ShardId: shardId-0083,HashKeyRange: {StartingHashKey:
> 220651847300296034902031972006537199616,EndingHashKey:
> 223310303291865866647839586127097888767},SequenceNumberRange:
> {StartingSequenceNumber:
> 49597946220601502339755334362523522663986150244033234226,}}'} from sequence
> number EARLIEST_SEQUENCE_NUM with ShardConsumer 20
>
> This seems to indicate that this shard is starting from the beginning of
> the stream
>
> and some logs that say:
> Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='
> TheStream ', shard='{ShardId: shardId-0087,HashKeyRange:
> {StartingHashKey: 231285671266575361885262428488779956224,EndingHashKey:
> 233944127258145193631070042609340645375},SequenceNumberRange:
> {StartingSequenceNumber:
> 49597946220690705320549456855089665537076743690057155954,}}'} from sequence
> number 49599841594208637293623823226010128300928335129272649074 with
> ShardConsumer 21
>

Re: Checkpointing is not performing well

2019-09-11 Thread Ravi Bhushan Ratnakar
What is the upper limit of checkpoint size of Flink System?

Regards,
Ravi

On Wed 11 Sep, 2019, 06:48 Vijay Bhaskar,  wrote:

> You crossed  the upper limits of the check point system of Flink a way
> high. Try to distribute events equally over time by adding some sort of
> controlled back pressure after receiving data from kinesis streams.
> Otherwise the spike coming during 5 seconds time would always create
> problems. Tomorrow it may double so best solution in your case is to
> deliver at configurable constant rate after receiving messages from kinesis
> streams. Otherwise i am sure its always the problem whatever the kind of
> streaming engine you use. Tune your configuration to get the optimal rate
> so that flink checkpoint state is healthier.
>
> Regards
> Bhaskar
>
> On Tue, Sep 10, 2019 at 11:16 PM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> @Rohan - I am streaming data to kafka sink after applying business logic.
>> For checkpoint, I am using s3 as a distributed file system. For local
>> recovery, I am using Optimized iops ebs volume.
>>
>> @Vijay - I forget to mention that incoming data volume is ~ 10 to 21GB
>> per minute compressed(lz4) avro message. Generally 90% correlated events
>> come within 5 seconds and 10% of the correlated events get extended to 65
>> minute. Due to this business requirement, the state size keep growing till
>> 65 minutes, after that the state size becomes more or less stable. As the
>> state size is growing and is around 350gb at peak load, checkpoint is not
>> able to complete within 1 minutes. I want to check as quick as possible
>> like every 5 second.
>>
>> Thanks,
>> Ravi
>>
>>
>> On Tue 10 Sep, 2019, 11:37 Vijay Bhaskar, 
>> wrote:
>>
>>> For me task count seems to be huge in number with the mentioned resource
>>> count. To rule out the possibility of issue with state backend can you
>>> start writing sink data as  , i.e., data ignore sink. And try
>>> whether you could run it for longer duration without any issue. You can
>>> start decreasing the task manager count until you find descent count of it
>>> without having any side effects. Use that value as task manager count and
>>> then start adding your state backend. First you can try with Rocks DB. With
>>> reduced task manager count you might get good results.
>>>
>>> Regards
>>> Bhaskar
>>>
>>> On Sun, Sep 8, 2019 at 10:15 AM Rohan Thimmappa <
>>> rohan.thimma...@gmail.com> wrote:
>>>
>>>> Ravi, have you looked at the io operation(iops) rate of the disk? You
>>>> can monitoring the iops performance and tune it accordingly with your work
>>>> load. This helped us in our project when we hit the wall tuning prototype
>>>> much all the parameters.
>>>>
>>>> Rohan
>>>>
>>>>
>>>> --
>>>> *From:* Ravi Bhushan Ratnakar 
>>>> *Sent:* Saturday, September 7, 2019 5:38 PM
>>>> *To:* Rafi Aroch
>>>> *Cc:* user
>>>> *Subject:* Re: Checkpointing is not performing well
>>>>
>>>> Hi Rafi,
>>>>
>>>> Thank you for your quick response.
>>>>
>>>> I have tested with rocksdb state backend. Rocksdb required
>>>> significantly more taskmanager to perform as compare to filesystem state
>>>> backend. The problem here is that checkpoint process is not fast enough to
>>>> complete.
>>>>
>>>> Our requirement is to do checkout as soon as possible like in 5 seconds
>>>> to flush the output to output sink. As the incoming data rate is high, it
>>>> is not able to complete quickly. If I increase the checkpoint duration, the
>>>> state size grows much faster and hence takes much longer time to complete
>>>> checkpointing. I also tried to use AT LEAST ONCE mode, but does not improve
>>>> much. Adding more taskmanager to increase parallelism also does not improve
>>>> the checkpointing performance.
>>>>
>>>> Is it possible to achieve checkpointing as short as 5 seconds with such
>>>> high input volume?
>>>>
>>>> Regards,
>>>> Ravi
>>>>
>>>> On Sat 7 Sep, 2019, 22:25 Rafi Aroch,  wrote:
>>>>
>>>>> Hi Ravi,
>>>>>
>>>>> Consider moving to RocksDB state backend, where you can enable
>>>>> incremental checkpointing. This will make you checkpoints size

Re: Checkpointing is not performing well

2019-09-10 Thread Ravi Bhushan Ratnakar
@Rohan - I am streaming data to kafka sink after applying business logic.
For checkpoint, I am using s3 as a distributed file system. For local
recovery, I am using Optimized iops ebs volume.

@Vijay - I forget to mention that incoming data volume is ~ 10 to 21GB per
minute compressed(lz4) avro message. Generally 90% correlated events come
within 5 seconds and 10% of the correlated events get extended to 65
minute. Due to this business requirement, the state size keep growing till
65 minutes, after that the state size becomes more or less stable. As the
state size is growing and is around 350gb at peak load, checkpoint is not
able to complete within 1 minutes. I want to check as quick as possible
like every 5 second.

Thanks,
Ravi


On Tue 10 Sep, 2019, 11:37 Vijay Bhaskar,  wrote:

> For me task count seems to be huge in number with the mentioned resource
> count. To rule out the possibility of issue with state backend can you
> start writing sink data as  , i.e., data ignore sink. And try
> whether you could run it for longer duration without any issue. You can
> start decreasing the task manager count until you find descent count of it
> without having any side effects. Use that value as task manager count and
> then start adding your state backend. First you can try with Rocks DB. With
> reduced task manager count you might get good results.
>
> Regards
> Bhaskar
>
> On Sun, Sep 8, 2019 at 10:15 AM Rohan Thimmappa 
> wrote:
>
>> Ravi, have you looked at the io operation(iops) rate of the disk? You can
>> monitoring the iops performance and tune it accordingly with your work
>> load. This helped us in our project when we hit the wall tuning prototype
>> much all the parameters.
>>
>> Rohan
>>
>>
>> --
>> *From:* Ravi Bhushan Ratnakar 
>> *Sent:* Saturday, September 7, 2019 5:38 PM
>> *To:* Rafi Aroch
>> *Cc:* user
>> *Subject:* Re: Checkpointing is not performing well
>>
>> Hi Rafi,
>>
>> Thank you for your quick response.
>>
>> I have tested with rocksdb state backend. Rocksdb required significantly
>> more taskmanager to perform as compare to filesystem state backend. The
>> problem here is that checkpoint process is not fast enough to complete.
>>
>> Our requirement is to do checkout as soon as possible like in 5 seconds
>> to flush the output to output sink. As the incoming data rate is high, it
>> is not able to complete quickly. If I increase the checkpoint duration, the
>> state size grows much faster and hence takes much longer time to complete
>> checkpointing. I also tried to use AT LEAST ONCE mode, but does not improve
>> much. Adding more taskmanager to increase parallelism also does not improve
>> the checkpointing performance.
>>
>> Is it possible to achieve checkpointing as short as 5 seconds with such
>> high input volume?
>>
>> Regards,
>> Ravi
>>
>> On Sat 7 Sep, 2019, 22:25 Rafi Aroch,  wrote:
>>
>>> Hi Ravi,
>>>
>>> Consider moving to RocksDB state backend, where you can enable
>>> incremental checkpointing. This will make you checkpoints size stay pretty
>>> much constant even when your state becomes larger.
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-rocksdbstatebackend
>>>
>>>
>>> Thanks,
>>> Rafi
>>>
>>> On Sat, Sep 7, 2019, 17:47 Ravi Bhushan Ratnakar <
>>> ravibhushanratna...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am writing a streaming application using Flink 1.9. This application
>>>> consumes data from kinesis stream which is basically avro payload.
>>>> Application is using KeyedProcessFunction to execute business logic on the
>>>> basis of correlation id using event time characteristics with below
>>>> configuration --
>>>> StateBackend - filesystem with S3 storage
>>>> registerTimeTimer duration for each key is  -  currentWatermark  + 15
>>>> seconds
>>>> checkpoint interval - 1min
>>>> minPauseBetweenCheckpointInterval - 1 min
>>>> checkpoint timeout - 10mins
>>>>
>>>> incoming data rate from kinesis -  ~10 to 21GB/min
>>>>
>>>> Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)
>>>>
>>>> First 2-4 checkpoints get completed within 1mins where the state size
>>>> is usually 50GB. As the state size grows beyond 50GB, then checkpointing
>>>> time starts taking more than 1mins and it increased till 10 mins and then
>>>> checkpoint fails. The moment the checkpoint starts taking more than 1 mins
>>>> to complete then application starts processing slow and start lagging in
>>>> output.
>>>>
>>>> Any suggestion to fine tune checkpoint performance would be highly
>>>> appreciated.
>>>>
>>>> Regards,
>>>> Ravi
>>>>
>>>


Re: Checkpointing is not performing well

2019-09-07 Thread Ravi Bhushan Ratnakar
Hi Rafi,

Thank you for your quick response.

I have tested with rocksdb state backend. Rocksdb required significantly
more taskmanager to perform as compare to filesystem state backend. The
problem here is that checkpoint process is not fast enough to complete.

Our requirement is to do checkout as soon as possible like in 5 seconds to
flush the output to output sink. As the incoming data rate is high, it is
not able to complete quickly. If I increase the checkpoint duration, the
state size grows much faster and hence takes much longer time to complete
checkpointing. I also tried to use AT LEAST ONCE mode, but does not improve
much. Adding more taskmanager to increase parallelism also does not improve
the checkpointing performance.

Is it possible to achieve checkpointing as short as 5 seconds with such
high input volume?

Regards,
Ravi

On Sat 7 Sep, 2019, 22:25 Rafi Aroch,  wrote:

> Hi Ravi,
>
> Consider moving to RocksDB state backend, where you can enable incremental
> checkpointing. This will make you checkpoints size stay pretty much
> constant even when your state becomes larger.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-rocksdbstatebackend
>
>
> Thanks,
> Rafi
>
> On Sat, Sep 7, 2019, 17:47 Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am writing a streaming application using Flink 1.9. This application
>> consumes data from kinesis stream which is basically avro payload.
>> Application is using KeyedProcessFunction to execute business logic on the
>> basis of correlation id using event time characteristics with below
>> configuration --
>> StateBackend - filesystem with S3 storage
>> registerTimeTimer duration for each key is  -  currentWatermark  + 15
>> seconds
>> checkpoint interval - 1min
>> minPauseBetweenCheckpointInterval - 1 min
>> checkpoint timeout - 10mins
>>
>> incoming data rate from kinesis -  ~10 to 21GB/min
>>
>> Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)
>>
>> First 2-4 checkpoints get completed within 1mins where the state size is
>> usually 50GB. As the state size grows beyond 50GB, then checkpointing time
>> starts taking more than 1mins and it increased till 10 mins and then
>> checkpoint fails. The moment the checkpoint starts taking more than 1 mins
>> to complete then application starts processing slow and start lagging in
>> output.
>>
>> Any suggestion to fine tune checkpoint performance would be highly
>> appreciated.
>>
>> Regards,
>> Ravi
>>
>


Checkpointing is not performing well

2019-09-07 Thread Ravi Bhushan Ratnakar
Hi All,

I am writing a streaming application using Flink 1.9. This application
consumes data from kinesis stream which is basically avro payload.
Application is using KeyedProcessFunction to execute business logic on the
basis of correlation id using event time characteristics with below
configuration --
StateBackend - filesystem with S3 storage
registerTimeTimer duration for each key is  -  currentWatermark  + 15
seconds
checkpoint interval - 1min
minPauseBetweenCheckpointInterval - 1 min
checkpoint timeout - 10mins

incoming data rate from kinesis -  ~10 to 21GB/min

Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)

First 2-4 checkpoints get completed within 1mins where the state size is
usually 50GB. As the state size grows beyond 50GB, then checkpointing time
starts taking more than 1mins and it increased till 10 mins and then
checkpoint fails. The moment the checkpoint starts taking more than 1 mins
to complete then application starts processing slow and start lagging in
output.

Any suggestion to fine tune checkpoint performance would be highly
appreciated.

Regards,
Ravi


StreamingFileSink not committing file to S3

2019-08-05 Thread Ravi Bhushan Ratnakar
Thanks for your quick response. I am using custom implementation of
BoundedOutOfOrderenessTimestampExtractor and also tweaked to return initial
watermark not a negative value.

One more observation that,  when the job's parallelism is around 120, then
it works well even with idle stream and Flink UI shows watermark. But when
I increase the parallelism above 180 then with idle stream it doesn't write
any file to S3. But the moment I remove idle stream then it works fine with
any number of parallelism.

I have also observed that when the parallelism is above 180, Flink UI never
shows watermark although everything is working fine without idle stream.

Regards,
Ravi

On Sun 4 Aug, 2019, 09:53 Rafi Aroch,  wrote:

> Hi Ravi,
>
> This sounds related an issue where the watermark is not advancing. This
> may happen when you have an idle source. An idle source would report a
> Long.MIN_VALUE, therefore the overall min watermark across all consumer
> subtask will never proceed.
>
> First, verify this is indeed the case by looking at the watermarks
> reported. You can try to assign a custom watermark emitter logic as seen
> here [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records
>
> Thanks,
> Rafi
>
>
> On Sat, Aug 3, 2019 at 8:23 PM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am designing a streaming pipeline using Flink 1.8.1, which consumes
>> messages from Kinesis and apply some business logic on per key basis using
>> KeyedProcessFunction and Checkpointing(HeapStateBackend). It is consuming
>> messages around 7GB per minutes from multiple Kinesis streams. I am using
>> only one Kinesis Source which is configured with multiple streams.
>>
>> The pipeline processes data and writes output to s3 as expected but I am
>> experiencing a very weird issue when one of the stream is completely empty
>> then it doesn't flush any file to s3 however it is consuming data from rest
>> of the streams. When i remove only this empty stream and again submit the
>> job then everything works fine and it writes output to s3.
>>
>> Regards,
>> Ravi
>>
>


Re: Flink Kafka to BucketingSink to S3 - S3Exception

2018-11-05 Thread Ravi Bhushan Ratnakar
o.
>
> So yeah, to reiterate, no out-of-the-box S3 stuff works ATM, but that
> should hopefully be fixed *soon*. If you can wait, that is the easiest, if
> you can't, building either your own custom sink or your own flink with the
> backport isn't a terrible option.
>
> Hope that helps!
>
> Adddison
>
>
>
>
> On Sun, Nov 4, 2018 at 3:09 AM Flink Developer <
> developer...@protonmail.com> wrote:
>
>> Hi Ravi, some questions:
>>
>>1. Is this using Flink 1.6.2 with dependencies (flink-s3-fs-hadoop,
>>flink-statebackend-rocksdb, hadoop-common, hadoop-aws, hadoop-hdfs,
>>hadoop-common) ? If so, could you please share your dependency versioning?
>>2. Does this use a kafka source with high flink parallelism (~400)
>>for all kafka partitions and does it run continuously for several days?
>>    3. Could you please share your checkpoint interval configuration,
>>batch file size, batch rollover interval configuration, and sink prefix
>>(s3:// ,  s3a://)
>>
>> Thank you
>> ‐‐‐ Original Message ‐‐‐
>> On Saturday, November 3, 2018 7:18 AM, Ravi Bhushan Ratnakar <
>> ravibhushanratna...@gmail.com> wrote:
>>
>> I have done little changes in BucketingSink and implemented as new
>> CustomBucketingSink to use in my project which works fine with s3 and s3a
>> protocol.  This implementation doesn't require xml file configuration,
>> rather than it uses configuration provided using flink configuration object
>> by calling setConfig method of BucketingSink.
>>
>> On Sat 3 Nov, 2018, 09:24 Flink Developer > wrote:
>>
>>> It seems the issue also appears when using
>>> *Flink version 1.6.2 . *
>>> ‐‐‐ Original Message ‐‐‐
>>> On Tuesday, October 30, 2018 10:26 PM, Flink Developer <
>>> developer...@protonmail.com> wrote:
>>>
>>> Hi, thanks for the info Rafi, that seems to be related.  I hope *Flink
>>> version 1.6.2* fixes this. Has anyone encountered this before?
>>>
>>> I would also like to note that my jar includes a *core-site.xml* file
>>> that uses **s3a**. Is this the recommended configuration to use with
>>> BucketingSink?   Should the sink be specified using
>>> *s3a:///* or  *s3:/// *?
>>>
>>>- 
>>>- 
>>>- fs.s3.impl
>>>- org.apache.hadoop.fs.s3a.S3AFileSystem
>>>- 
>>>- 
>>>- fs.s3a.buffer.dir
>>>- /tmp
>>>- 
>>>- 
>>>- fs.s3a.access.key
>>>- x
>>>- 
>>>- 
>>>- fs.s3a.secret.key
>>>- x
>>>- 
>>>- 
>>>- fs.s3a.buffer.dir
>>>- /tmp
>>>- 
>>>- 
>>>
>>>
>>> And my pom.xml uses:
>>>
>>>- flink-s3-fs-hadoop
>>>- ...
>>>- flink-statebackend-rocksdb_2.11
>>>- ...
>>>- hadoop-hdfs
>>>- ...
>>>- hadoop-common
>>>- ...
>>>- hadoop-core
>>>- ...
>>>- hadoop-aws
>>>- ...
>>>
>>>
>>> ‐‐‐ Original Message ‐‐‐
>>> On Sunday, October 28, 2018 8:08 AM, Rafi Aroch 
>>> wrote:
>>>
>>> Hi,
>>>
>>> I'm also experiencing this with Flink 1.5.2. This is probably related to
>>> BucketingSink not working properly with S3 as filesystem because of the
>>> eventual-consistency of S3.
>>>
>>> I see that https://issues.apache.org/jira/browse/FLINK-9752 will be
>>> part of 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and
>>> not presto).
>>>
>>> Does anyone know if this fix would solve this issue?
>>>
>>> Thanks,
>>> Rafi
>>>
>>>
>>> On Sun, Oct 28, 2018 at 12:08 AM Flink Developer <
>>> developer...@protonmail.com> wrote:
>>>
>>>> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17,
>>>> hadoop 2.8.4)  with flink parallelization set to 400. The source is a Kafka
>>>> topic and sinks to S3 in the format of:
>>>> s3:/. There's potentially 400 files
>>>> writing simultaneously.
>>>>
>>>> *Configuration:*
>>>> - Flink v1.5.2
>>>> - Checkpoin

Re: Flink Kafka to BucketingSink to S3 - S3Exception

2018-11-03 Thread Ravi Bhushan Ratnakar
I have done little changes in BucketingSink and implemented as new
CustomBucketingSink to use in my project which works fine with s3 and s3a
protocol.  This implementation doesn't require xml file configuration,
rather than it uses configuration provided using flink configuration object
by calling setConfig method of BucketingSink.

On Sat 3 Nov, 2018, 09:24 Flink Developer  It seems the issue also appears when using
> *Flink version 1.6.2 . *
> ‐‐‐ Original Message ‐‐‐
> On Tuesday, October 30, 2018 10:26 PM, Flink Developer <
> developer...@protonmail.com> wrote:
>
> Hi, thanks for the info Rafi, that seems to be related.  I hope *Flink
> version 1.6.2* fixes this. Has anyone encountered this before?
>
> I would also like to note that my jar includes a *core-site.xml* file
> that uses **s3a**. Is this the recommended configuration to use with
> BucketingSink?   Should the sink be specified using
> *s3a:///* or  *s3:/// *?
>
>- 
>- 
>- fs.s3.impl
>- org.apache.hadoop.fs.s3a.S3AFileSystem
>- 
>- 
>- fs.s3a.buffer.dir
>- /tmp
>- 
>- 
>- fs.s3a.access.key
>- x
>- 
>- 
>- fs.s3a.secret.key
>- x
>- 
>- 
>- fs.s3a.buffer.dir
>- /tmp
>- 
>- 
>
>
> And my pom.xml uses:
>
>- flink-s3-fs-hadoop
>- ...
>- flink-statebackend-rocksdb_2.11
>- ...
>- hadoop-hdfs
>- ...
>- hadoop-common
>- ...
>- hadoop-core
>- ...
>- hadoop-aws
>- ...
>
>
> ‐‐‐ Original Message ‐‐‐
> On Sunday, October 28, 2018 8:08 AM, Rafi Aroch 
> wrote:
>
> Hi,
>
> I'm also experiencing this with Flink 1.5.2. This is probably related to
> BucketingSink not working properly with S3 as filesystem because of the
> eventual-consistency of S3.
>
> I see that https://issues.apache.org/jira/browse/FLINK-9752 will be part
> of 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and
> not presto).
>
> Does anyone know if this fix would solve this issue?
>
> Thanks,
> Rafi
>
>
> On Sun, Oct 28, 2018 at 12:08 AM Flink Developer <
> developer...@protonmail.com> wrote:
>
>> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop
>> 2.8.4)  with flink parallelization set to 400. The source is a Kafka topic
>> and sinks to S3 in the format of:
>> s3:/. There's potentially 400 files
>> writing simultaneously.
>>
>> *Configuration:*
>> - Flink v1.5.2
>> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11,
>> v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause
>> between checkpoints in 2 mins. Timeout is set to 2 mins.
>> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
>> - Batch file size is set to 5mb.
>> - Batch rollover interval is set to 30min
>> - Writer uses GZip compression
>> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1,
>> hadoop-core v1.2.1, hadoop-aws v3.1.1)
>>
>> The app is able to run for hours straight, but occasionally (once or
>> twice a day), it displays the following exception. When this happens, the
>> app is able to recover from previous checkpoint, but I am concerned about
>> the exception:
>>
>> *Caused by: java.io.IOException:
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>> Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID:
>> , S3 Extended Request ID: xx*
>>
>>- *at
>>
>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)*
>>- *at
>>
>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)*
>>- *at
>>
>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)*
>>- *at
>>com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)*
>>- *at
>>
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)*
>>
>> *Caused by:
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>> Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID:
>> , S3 Extended Request ID: xx*
>>
>>- *at
>>
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)*
>>- *at
>>
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)*
>>- *at
>>
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)*
>>- *at
>>
>> 

Need help regarding Flink Batch Application

2018-08-08 Thread Ravi Bhushan Ratnakar
Hi Everybody,

Currently I am working on a project where i need to write a Flink Batch
Application which has to process hourly data around 400GB of compressed
sequence file. After processing, it has write it as compressed parquet
format in S3.

I have managed to write the application in Flink and able to run
successfully process the whole hour data and write in Parquet format in S3.
But the problem is this that it is not able to meet the performance of the
existing application which is written using Spark Batch(running in
production).

Current Spark Batch
Cluster size - Aws EMR - 1 Master + 100 worker node of m4.4xlarge ( 16vCpu,
64GB RAM), each instance with 160GB disk volume
Input data - Around 400GB
Time Taken to process - Around 36 mins



Flink Batch
Cluster size - Aws EMR - 1 Master + 100 worker node of r4.4xlarge ( 16vCpu,
64GB RAM), each instance with 630GB disk volume
Transient Job -  flink run -m yarn-cluster -yn 792 -ys 2 -ytm 14000 -yjm
114736
Input data - Around 400GB
Time Taken to process - Around 1 hour


I have given all the node memory to jobmanager just to make sure that there
is a dedicated node for jobmanager so that it doesn't face any issue
related to resources.


We are already running Flink Batch job with double RAM compare to Spark
Batch however we are not able get the same performance.

Kindly suggest on this to achieve the same performance as we are getting
from Spark Batch


Thanks,
Ravi