Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread ashwinkonale
Hi Arvid,
Thank you so much for your detailed reply. I think I will go with one schema
per topic using GenericRecordAvroTypeInfo for genericRecords and not do any
custom magic.

Approach of sending records as byte array also seems quite interesting.
Right now I am deserializing avro records so that I can pass it to
StreamingFileSink's AvroWriters(Which accepts only avro objects) so that it
merges bunch of avro records before dumping to sink. It seems unnecessary
for me, since there could be some bulk writer implementation which could do
this at byte level itself. Do you know any of such implementations ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread Arvid Heise
You need to differentiate two serialization abstractions (which I guess you
already know). One is coming from reading the source, where the
DeserializationSchema is used, and it translates the bytes of Kafka into
something that Flink can handle.

The second serialization occurs within Flink through the TypeSerializer,
such that Flink can pass data from one subtask to another subtask. That's
why your custom DeserializationSchema would need to provide
TypeInformation, which allows Flink to pick the TypeSerializer.

Now you would probably not be able to provide a consistent TypeInformation
for arbitrary types and Flink has to fall back to Kryo as you said. A
solution is to also provide a custom TypeSerializer that uses the Schema
registry (I wouldn't go the route of GenericRecords with schema again).

Note that because of the missing TypeInformation, you will never be able to
use Table API or SQL. If you ask me that's quite a bit of drawbacks coming
from that approach (no schema enforcement, no proper schema evolution
support, no schema compability enforcement, custom serializers, and clumsy
code using lots of string-based field accesses and casts).

---

I forgot to highlight another rather simple approach that works on very
generic workflows with few operations quite well: use byte[]. So
DeserializationSchema works as trivial as it sounds. You pass byte[] all
along until you have your FlatMap (assuming you are doing some filtering
validation) and only inside this flatmap you deserialize into Avro, do your
custom logic, and serialize it again into byte[]. You can use Table API /
SQL later on with UDFs that do the same thing. Using byte[] as the internal
serialization format of Flink is also blazingly fast (there is not much to
do except adding a header). The only downside is that you need to
deserialize manually in each operator, but that can usually be factored out.

I'd still recommend looking into using only one schema that captures all
events as subschemas.

On Thu, Nov 12, 2020 at 4:15 PM ashwinkonale 
wrote:

> So in this case, flink will fall back to default kyro serialiser right ?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread ashwinkonale
So in this case, flink will fall back to default kyro serialiser right ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread Arvid Heise
If you follow the best practices, then topics should never have different
schemas as you can't enforce schema compatibility. You also have very
limited processing capabilities and clumsy workflows attached to it.
If you want to encode different kinds of events, then the common approach
is to use some kind of envelope schema where different event types are
encoded as optional fields.

If you want to stick with your custom approach, then you probably want to
implement your own AvroDeserializationSchema that reuses the existing
CachedSchemaCoderProvider. If you check the code of
RegistryAvroDeserializationSchema, you will notice that the actual
implementation is rather slim.

@Override
public T deserialize(byte[] message) throws IOException {
  checkAvroInitialized();
  getInputStream().setBuffer(message);
  Schema writerSchema = schemaCoder.readSchema(getInputStream());

  GenericDatumReader datumReader = getDatumReader();

  datumReader.setSchema(writerSchema);
  datumReader.setExpected(writerSchema); // <-- the difference

  return datumReader.read(null, getDecoder());
}


On Thu, Nov 12, 2020 at 1:42 PM ashwinkonale 
wrote:

> Hi Arvid,
> Thanks a lot for your reply. And yes, we do use confluent schema registry
> extensively. But the `ConfluentRegistryAvroDeserializationSchema` expects
> reader schema to be provided. That means it reads the message using writer
> schema and converts to reader schema. But this is not what I want always.
> If
> I have messages of different schema in the same topic, I cannot apply
> `ConfluentRegistryAvroDeserializationSchema` correct ? I also came across
> this  question
> <
> https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without>
>
> . I am also doing the same thing in my pipeline by providing custom
> deserialiser using confluentSchemaRegistryClient. So as far as I
> understood,
> in this usecase there is no way to tell flink about
> `GenericRecordAvroTypeInfo` of the genericRecord which comes out of source
> function. Please tell me if my understanding is correct.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread ashwinkonale
Hi Arvid,
Thanks a lot for your reply. And yes, we do use confluent schema registry
extensively. But the `ConfluentRegistryAvroDeserializationSchema` expects
reader schema to be provided. That means it reads the message using writer
schema and converts to reader schema. But this is not what I want always. If
I have messages of different schema in the same topic, I cannot apply
`ConfluentRegistryAvroDeserializationSchema` correct ? I also came across
this  question

 
. I am also doing the same thing in my pipeline by providing custom
deserialiser using confluentSchemaRegistryClient. So as far as I understood,
in this usecase there is no way to tell flink about
`GenericRecordAvroTypeInfo` of the genericRecord which comes out of source
function. Please tell me if my understanding is correct.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread Arvid Heise
The common solution is to use a schema registry, like Confluent schema
registry [1]. All records have a small 5 byte prefix that identifies the
schema and that gets fetched by deserializer [2]. Here are some resources
on how to properly secure communication if needed [3].

[1] https://docs.confluent.io/current/schema-registry/index.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html
[3]
https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html

On Thu, Nov 12, 2020 at 10:11 AM ashwinkonale 
wrote:

> Hi,
> Thanks a lot for the reply. And you both are right. Serializing
> GenericRecord without specifying schema was indeed a HUGE bottleneck in my
> app. I got to know it through jfr analysis and then read the blog post you
> mentioned. Now I am able to pump in lot more data per second. (In my test
> setup atleast). I am going to try this with kafka.
> But now it poses me a problem, that my app cannot handle schema changes
> automatically since at the startup flink needs to know schema. If there is
> a
> backward compatible change in upstream, new messages will not be read
> properly. Do you know any workarounds for this ?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread ashwinkonale
Hi,
Thanks a lot for the reply. And you both are right. Serializing
GenericRecord without specifying schema was indeed a HUGE bottleneck in my
app. I got to know it through jfr analysis and then read the blog post you
mentioned. Now I am able to pump in lot more data per second. (In my test
setup atleast). I am going to try this with kafka. 
But now it poses me a problem, that my app cannot handle schema changes
automatically since at the startup flink needs to know schema. If there is a
backward compatible change in upstream, new messages will not be read
properly. Do you know any workarounds for this ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread Robert Metzger
Hi,
from my experience serialization contributes a lot to the maximum
achievable throughput. I can strongly recommend checking out this blog
post, which has a lot of details on the topic:
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html

On Tue, Nov 10, 2020 at 9:46 AM ashwinkonale 
wrote:

> Hey,
> I am reading messages with schema id and using confluent schema registry to
> deserialize to Genericrecord. After this point, pipelineline will have this
> objects moving across. Can you give me some examples of `special handling
> of
> avro messages` you mentioned ?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Help needed to increase throughput of simple flink app

2020-11-10 Thread ashwinkonale
Hey,
I am reading messages with schema id and using confluent schema registry to
deserialize to Genericrecord. After this point, pipelineline will have this
objects moving across. Can you give me some examples of `special handling of
avro messages` you mentioned ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Help needed to increase throughput of simple flink app

2020-11-09 Thread Jaffe, Julian
One thing to check is how much you're serializing to the network. If you're 
using Avro Generic records without special handling you can wind up serializing 
the schema with every record, greatly increasing the amount of data you're 
sending across the wire.

On 11/9/20, 8:14 AM, "ashwinkonale"  wrote:

Hi,
Thanks a lot for the reply. I added some more metrics to the pipeline to
understand bottleneck. Seems like avro deserialization introduces some
delay. With use of histogram I found processing of a single message takes
~300us(p99). ~180(p50). Which means a single slot can output at most 3000
messages per second. This essentially means, to support QPS of 3mil/s I will
need parallelism of 1000. Is my understanding correct ? Can I do anything
else apart from having so many slots in my job cluster ? Also do you have
any guides or pointers how to do such setups. eg, large number of
taskmanagers with smaller slots or bigger TMs with many slots and bigger
jvms, larger network buffers etc ? 



--
Sent from: 
https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_=DwICAg=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac=JLzzDt63U24H1L2TG-WER0CKB0WbqSbr0WnC6dIIwS4=vGEnh77tTs1Mdynjks6LhXUaNZRRBvj3pS5es-Bg3cI=
 



Re: Help needed to increase throughput of simple flink app

2020-11-09 Thread ashwinkonale
Hi,
Thanks a lot for the reply. I added some more metrics to the pipeline to
understand bottleneck. Seems like avro deserialization introduces some
delay. With use of histogram I found processing of a single message takes
~300us(p99). ~180(p50). Which means a single slot can output at most 3000
messages per second. This essentially means, to support QPS of 3mil/s I will
need parallelism of 1000. Is my understanding correct ? Can I do anything
else apart from having so many slots in my job cluster ? Also do you have
any guides or pointers how to do such setups. eg, large number of
taskmanagers with smaller slots or bigger TMs with many slots and bigger
jvms, larger network buffers etc ? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Help needed to increase throughput of simple flink app

2020-11-09 Thread Kostas Kloudas
Hi Ashwin,

Do you have any filtering or aggregation (or any operation that emits
less data than it receives) in your logic? If yes, you could for
example put if before the reschaling operation so that it gets chained
to your source and you reduce the amount of data you ship through the
network. After that, then it boils down to optimizing your code I
guess, as Till said. Also you can check if the rescaling has any
effect, because if not, then you could also remove it.

Kostas

On Mon, Nov 9, 2020 at 10:12 AM ashwinkonale  wrote:
>
> Hi Till,
> Thanks a lot for the reply. The problem I am facing is as soon as I add
> network(remove chaining) to discarding sink, I have huge problem with
> throughput. Do you have any pointers on how can I go about debugging this ?
>
> - Ashwin
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Help needed to increase throughput of simple flink app

2020-11-09 Thread ashwinkonale
Hi Till,
Thanks a lot for the reply. The problem I am facing is as soon as I add
network(remove chaining) to discarding sink, I have huge problem with
throughput. Do you have any pointers on how can I go about debugging this ?

- Ashwin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Help needed to increase throughput of simple flink app

2020-11-09 Thread Till Rohrmann
Hi Ashwin,

Thanks for reaching out to the Flink community. Since you have tested that
a kafka_source -> discarding_sink can process 10 Million records/s you
might also wanna test the write throughput to data_sink and dlq_sink. Maybe
these sinks are limiting your overall throughput by backpressuring the data
flow. If this is not the problem, then I believe that some profiling could
help pinpointing the bottleneck.

Cheers,
Till

On Sun, Nov 8, 2020 at 10:26 PM ashwin konale 
wrote:

> Hey guys,
> I am struggling to improve the throughput of my simple flink application.
> The target topology is this.
>
> read_from_kafka(byte array deserializer) --rescale-->
> processFunction(confluent avro deserialization) -> split -> 1.
> data_sink,2.dlq_sink
>
> Kafka traffic is pretty high
> Partitions: 128
> Traffic:  ~500k msg/s, 50Mbps.
>
> Flink is running on k8s writing to hdfs3. I have ~200CPU and 400G memory
> at hand. I have tried few configurations but I am not able to get the
> throughput more than 1mil per second. (Which I need for recovering from
> failures). I have tried increasing parallelism a lot (until 512), But it
> has very little impact on the throughput. Primary metric I am considering
> for throughput is kafka-source, numRecordsOut and message backlog. I have
> already increased default kafka consumer defaults like max.poll.records
> etc. Here are the few things I tried already.
> Try0: Check raw kafka consumer throughput (kafka_source -> discarding_sink)
> tm: 20, slots:4, parallelism 80
> throughput: 10Mil/s
>
> Try1: Disable chaining to introduce network related lag.
> tm: 20, slots:4, parallelism 80
> throughput: 1Mil/s
> Also tried with increasing floating-buffers to 100, and
> buffers-per-channel to 64. Increasing parallelism seems to have no effect.
> Observation: out/in buffers are always at 100% utilization.
>
> After this I have tried various different things with different network
> configs, parallelism,jvm sizes etc. But throughput seems to be stuck at
> 1Mil. Can someone please help me to figure out what key metrics to look for
> and how can I improve the situation. Happy to provide any details needed.
>
> Flink version: 1.11.2
>


Help needed to increase throughput of simple flink app

2020-11-08 Thread ashwin konale
Hey guys,
I am struggling to improve the throughput of my simple flink application.
The target topology is this.

read_from_kafka(byte array deserializer) --rescale-->
processFunction(confluent avro deserialization) -> split -> 1.
data_sink,2.dlq_sink

Kafka traffic is pretty high
Partitions: 128
Traffic:  ~500k msg/s, 50Mbps.

Flink is running on k8s writing to hdfs3. I have ~200CPU and 400G memory at
hand. I have tried few configurations but I am not able to get the
throughput more than 1mil per second. (Which I need for recovering from
failures). I have tried increasing parallelism a lot (until 512), But it
has very little impact on the throughput. Primary metric I am considering
for throughput is kafka-source, numRecordsOut and message backlog. I have
already increased default kafka consumer defaults like max.poll.records
etc. Here are the few things I tried already.
Try0: Check raw kafka consumer throughput (kafka_source -> discarding_sink)
tm: 20, slots:4, parallelism 80
throughput: 10Mil/s

Try1: Disable chaining to introduce network related lag.
tm: 20, slots:4, parallelism 80
throughput: 1Mil/s
Also tried with increasing floating-buffers to 100, and buffers-per-channel
to 64. Increasing parallelism seems to have no effect.
Observation: out/in buffers are always at 100% utilization.

After this I have tried various different things with different network
configs, parallelism,jvm sizes etc. But throughput seems to be stuck at
1Mil. Can someone please help me to figure out what key metrics to look for
and how can I improve the situation. Happy to provide any details needed.

Flink version: 1.11.2