Re: Dynamic StreamingFileSink

2021-02-06 Thread Sidney Feiner
If anybody is interested, I've implemented a StreamingFileSink with dynamic 
paths:
https://github.com/sidfeiner/DynamicPathFileSink



Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



From: Rafi Aroch 
Sent: Sunday, December 27, 2020 8:25 AM
To: Sidney Feiner 
Cc: flink-u...@apache.org 
Subject: Re: Dynamic StreamingFileSink

Hi Sidney,

Have a look at implementing a BucketAssigner for StreamingFileSink: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#bucket-assignment

Rafi


On Sat, Dec 26, 2020 at 11:48 PM Sidney Feiner 
mailto:sidney.fei...@startapp.com>> wrote:
Hey,
I would like to create a dynamic StreamingFileSink for my Streaming pipeline.
By dynamic, I mean that it will write to a different directory based on the 
input.
For example, redirect the row to a different directory based on the first 2 
characters of the input, so if the content I'm writing starts with "XX" then 
write it to a target /path/to/dir/XX, but if the content starts with "YY" then 
write it to target /path/to/dir/YY.

I've tried implementing a DynamicFileSink that internally holds a map of every 
combination of 2 letters that it meets, and every first time it meets them, it 
creates a StreamingFileSink and invokes it's invoke method.

Obviously, that didn't work because a StreamingFileSink should be initiated 
completely differently.

I'm guessing I could implement this completely by myself, but I feel it'd be a 
waste if there was some way that could utilize the existing StreamingFileSink.

BTW, this is part of an existing architecture where every pipeline needs an 
actual Sink, so it isn't possible for me to manipulate the datastream directly, 
use keyBy(2 first letters) and then write it's output to a file per key.

Any help would be much appreciated :)


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



Dynamic StreamingFileSink

2020-12-26 Thread Sidney Feiner
Hey,
I would like to create a dynamic StreamingFileSink for my Streaming pipeline.
By dynamic, I mean that it will write to a different directory based on the 
input.
For example, redirect the row to a different directory based on the first 2 
characters of the input, so if the content I'm writing starts with "XX" then 
write it to a target /path/to/dir/XX, but if the content starts with "YY" then 
write it to target /path/to/dir/YY.

I've tried implementing a DynamicFileSink that internally holds a map of every 
combination of 2 letters that it meets, and every first time it meets them, it 
creates a StreamingFileSink and invokes it's invoke method.

Obviously, that didn't work because a StreamingFileSink should be initiated 
completely differently.

I'm guessing I could implement this completely by myself, but I feel it'd be a 
waste if there was some way that could utilize the existing StreamingFileSink.

BTW, this is part of an existing architecture where every pipeline needs an 
actual Sink, so it isn't possible for me to manipulate the datastream directly, 
use keyBy(2 first letters) and then write it's output to a file per key.

Any help would be much appreciated :)


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



Re: Flink logs with extra pipeline property

2020-12-07 Thread Sidney Feiner
I'm using a dockerized HA cluster that I submit pipelines to through the CLI.
So where exactly can I configure the PIPELINE env variable? Seems like it needs 
to be set per container. But many different pipelines run on the same 
TaskManager (so also the same container).

And your example mentions log4j2 twice. Once without using the java dynamic 
options and the second twice saying it required setting the java dynamic 
version so I'm a bit confused here 邏

I appreciate the support btw 


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



From: Yang Wang 
Sent: Monday, December 7, 2020 4:53 AM
To: Sidney Feiner 
Cc: flink-u...@apache.org 
Subject: Re: Flink logs with extra pipeline property

I think you could use the following config options to set the environments for 
JobManager and TaskManager.
And then you could use the envs in the log4j configuration file. 
"${env:PIPELINE}" could be used in log4j2.

containerized.master.env.PIPELINE: my-flink-pipeline
containerized.taskmanager.env.PIPELINE: my-flink-pipeline


For log4j2, I am afraid you need to set the java dynamic option[1] to get a 
similar effect.

[1]. 
https://ci.apache.org/projects/flink/flink-docs-master/deployment/config.html#env-java-opts


Best,
Yang

Sidney Feiner mailto:sidney.fei...@startapp.com>> 
于2020年12月6日周日 下午10:13写道:

Hi,

We're using Apache Flink 1.9.2 and we've started logging everything as JSON 
with log4j (standard log4j1 that comes with Flink). When I say JSON logging, I 
just mean that I've formatted in according to:

log4j.appender.console.layout.ConversionPattern={"level": "%p", "ts": 
"%d{ISO8601}", "class": "%c", "line": "%L", "message": "%m"}%n


Now I would like to somehow add a field to this JSON to indicate which pipeline 
generated the log . At first I thought I'd add another field that logs some 
environment variable like such:

log4j.appender.console.layout.ConversionPattern={"level": "%p", "ts": 
"%d{ISO8601}", "class": "%c", "line": "%L", "pipeline: "${PIPELINE}", 
"message": "%m"}%n

But that doesn't seem to be working (is it because the TM is inited before the 
pipeline and that's when the placeholders are set?).

Do you know of a way I could add a field of the current pipeline running? In my 
"Main" I have access to the pipeline name and I also have access to this 
variable in the tasks themselves. I would prefer not needing to explicitly 
using this variable when I log, but that it would be automatic during logging.

If anybody has an idea, I'd love to hear it (we can use logback or anything 
else if necessary),

Thanks :)



Flink logs with extra pipeline property

2020-12-06 Thread Sidney Feiner
Hi,

We're using Apache Flink 1.9.2 and we've started logging everything as JSON 
with log4j (standard log4j1 that comes with Flink). When I say JSON logging, I 
just mean that I've formatted in according to:

log4j.appender.console.layout.ConversionPattern={"level": "%p", "ts": 
"%d{ISO8601}", "class": "%c", "line": "%L", "message": "%m"}%n


Now I would like to somehow add a field to this JSON to indicate which pipeline 
generated the log . At first I thought I'd add another field that logs some 
environment variable like such:

log4j.appender.console.layout.ConversionPattern={"level": "%p", "ts": 
"%d{ISO8601}", "class": "%c", "line": "%L", "pipeline: "${PIPELINE}", 
"message": "%m"}%n

But that doesn't seem to be working (is it because the TM is inited before the 
pipeline and that's when the placeholders are set?).

Do you know of a way I could add a field of the current pipeline running? In my 
"Main" I have access to the pipeline name and I also have access to this 
variable in the tasks themselves. I would prefer not needing to explicitly 
using this variable when I log, but that it would be automatic during logging.

If anybody has an idea, I'd love to hear it (we can use logback or anything 
else if necessary),

Thanks :)



Re: Increase in parallelism has very bad impact on performance

2020-11-04 Thread Sidney Feiner
You're right, this is scale problem (for me that's performance).

As for what you were saying about the data skew, that could be it so I tried 
running the job without using keyBy and I wrote an aggregator that accumulates 
the events per country and then wrote a FlatMap that takes that map and returns 
a stream of events per country. I was hoping that that way I won't have skewing 
problems as all the data is actually handled in the same tasks (and I don't 
mind that).

But even after this change, I'm experiencing the same scaling limit.

And I actually found something inefficient in my code and now that I've fixed 
it, the app seems to scale a bit better. I also decreased the time window which 
increased the scaling some more.

So now I still hit a scaling limit but it seems a bit better already:
Parallelism Throughput/sec  Throughput/slot/sec Increase in parallelism 
(%) Increase in events (%)  % Of expected increase
1   2,630   2,630   -   -   -
15  16,340  1,180   1500%   621%41.4%
30  22,100  736 200%135%67.5%
50  16,600  332 166%75% 45%

The last column is to check how "linearly" the app actually scales. Best case 
scenario is 100% when the increase in parallelism is 200% and the increase in 
handled events increases by 200% as well.

It is pretty clear to see that my app is far from scaling linearly, and its 
throughput even decreases from parallelism 30 to parallelism 50.

What could cause these weird and unstable numbers of % in expected increase 
even though I'm not using a KeyedWindow anymore?





Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



From: Arvid Heise 
Sent: Tuesday, November 3, 2020 8:54 PM
To: Sidney Feiner 
Cc: Yangze Guo ; user@flink.apache.org 

Subject: Re: Increase in parallelism has very bad impact on performance

Hi Sidney,

you might recheck your first message. Either it's incorrectly written or you 
are a victim of a fallacy.

With 1 slot, you have 1.6K events per slot = 1.6K overall.
With parallelism 5, you have 1.2K events per slot, so 6K overall. That's a 
decent speedup.
With 10, you still have 6K overall.

So you haven't experienced any performance degradation (what your title 
suggests). It's rather that you hit a practical scale-up/out boundary.

Now of course, you'd like to see your system to scale beyond that 6K into the 
realm of 45k per second and I can assure you that it's well possible in your 
setup. However, we need to figure out why it's not doing it.

The most likely reason that would explain the behavior is indeed data skew. 
Your observation also matches it: even though you distribute your job, some 
slots are doing much more work than other slots.

So the first thing that you should do is to plot a histogram over country 
codes. What you will likely see is that 20% of all records belong to the same 
country (US?). That's where your practical scale-up boundary comes from. Since 
you group by country, there is no way to calculate it in a distributed manner. 
Also check in Flink Web UI which tasks bottlenecks. I'm assuming it's the 
window operator (or rather everything after HASH) for now.

Btw concerning hash collisions: just because you have in theory some 26^2=676 
combinations in a 2-letter ASCII string, you have <200 countries = unique 
values. Moreover, two values with the same hash is very common as the hash is 
remapped to your parallelism. So if your parallelism is 5, you have only 5 hash 
buckets where you need to put in 40 countries on average. Let's assume you have 
US, CN, UK as your countries with most entries and a good hash function 
remapped to 5 buckets, then you have 4% probability of having them all assigned 
to the same bucket, but almost 60% of two of them being in the same bucket.

Nevertheless, even without collisions your scalability is limited by the 
largest country. That's independent of the used system and inherent to your 
query. So if you indeed see this data skew, then the best way is to modify the 
query. Possible options:
- You use a more fine-grain key (country + state). That may not be possible due 
to semantics.
- You use multiple aggregation steps (country + state), then country. 
Preaggregations are always good to have.
- You can reduce data volume by filtering before HASH. (You already have a 
filter, so I'm guessing it's not a valid option)
- You preaggregate per Kafka partition key before HASH.

If you absolutely cannot make the aggregations more fine-grain, you need to use 
machines that have strong CPU slots. (it's also no use to go beyond parallelism 
of 10)

I also noticed that you have several forward channels. There is usually no need 
for them. Task chaining is much faster. Especially if you enableObjectReuse [1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html


On Tue, Nov 3, 2020 at 3

Re: Increase in parallelism has very bad impact on performance

2020-11-03 Thread Sidney Feiner
Hey 


  1.  I have 150 partitions in the kafka topic
  2.  I'll check that soon but why doesn't the same happen when I use a smaller 
parallelism? If that was the reason, I'd expect the same behavior also if I had 
a parallelism of 5. How does the increase in parallelism, decrease the 
throughput per slot?
  3.  When I don't use a window function, it handles around 3K+ events per 
second per slot, so that shouldn't be the problem
  4.  Tried this one right now, and the througput remains 600 events per second 
per slot when parallelism is set to 15

Out of all those options, seems like I have to investigate the 2nd one. The key 
is a 2-character string representing a country so I don't think it's very 
probable for 2 different countries to have the same hash, but I know for a fact 
that the number of events is not evenly distributed between countries.

But still, why does the impact in performance appear only for higher 
parallelism?



Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



From: Arvid Heise 
Sent: Tuesday, November 3, 2020 12:09 PM
To: Yangze Guo 
Cc: Sidney Feiner ; user@flink.apache.org 

Subject: Re: Increase in parallelism has very bad impact on performance

Hi Sidney,

there could be a couple of reasons where scaling actually hurts. Let's include 
them one by one.

First, you need to make sure that your source actually supports scaling. Thus, 
your Kafka topic needs at least as many partitions as you want to scale. So if 
you want to scale at some point to 66 parallel instances. Your kafka topic must 
have at least 66 partitions. Ofc, you can also read from less partitions, but 
then some source subtasks are idling. That's valid if your downstream pipeline 
is much more resource intensive. Also note that it's really hard to increase 
the number of Kafka partitions later, so please plan accordingly.

Second, you have a Windowing operation that uses hashes. It's really important 
to check if the hashes are evenly distributed. So you first could have an issue 
that most records share the same key, but you could on top have the issue that 
different keys share the same hash. In these cases, most records are processed 
by the same subtask resulting in poor overall performance. (You can check for 
data skew incl. hash skew in Web UI).

Third, make sure that there is actually enough data to be processed. Does your 
topic contain enough data? If you want to process historic data, did you choose 
the correct consumer setting? Can your Kafka cluster provide enough data to the 
Flink job? If your max data rate is 6k records from Kafka, then ofc the per 
slot throughput decreases on scaling up.

Fourth, if you suspect that the clumping of used slots to one task manager may 
be an issue, try out the configuration cluster-evenly-spread-out-slots [1]. The 
basic idea is to use a TM fully first to allow easier scale-in. However, if for 
some reason your TM is more quickly saturated than it has slots, you may try to 
spread evenly. However, you may also want to check if you declare too many 
slots for each TM (in most cases slots = cores).

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#cluster-evenly-spread-out-slots.


On Tue, Nov 3, 2020 at 4:01 AM Yangze Guo 
mailto:karma...@gmail.com>> wrote:
Hi, Sidney,

What is the data generation rate of your Kafka topic? Is it a lot
bigger than 6000?

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Nov 3, 2020 at 8:45 AM Sidney Feiner 
mailto:sidney.fei...@startapp.com>> wrote:
>
> Hey,
> I'm writing a Flink app that does some transformation on an event consumed 
> from Kafka and then creates time windows keyed by some field, and apply an 
> aggregation on all those events.
> When I run it with parallelism 1, I get a throughput of around 1.6K events 
> per second (so also 1.6K events per slot). With parallelism 5, that goes down 
> to 1.2K events per slot, and when I increase the parallelism to 10, it drops 
> to 600 events per slot.
> Which means that parallelism 5 and parallelism 10, give me the same total 
> throughput (1.2x5 = 600x10).
>
> I noticed that although I have 3 Task Managers, all the all the tasks are run 
> on the same machine, causing it's CPU to spike and probably, this is the 
> reason that the throughput dramatically decreases. After increasing the 
> parallelism to 15 and now tasks run on 2/3 machines, the average throughput 
> per slot is still around 600.
>
> What could cause this dramatic decrease in performance?
>
> Extra info:
>
> Flink version 1.9.2
> Flink High Availability mode
> 3 task managers, 66 slots total
>
>
> Execution plan:
>
>
> Any help would be much appreciated
>
>
> Sidney Feiner / Data Platform Developer
> M: +972.528197

Re: Increase in parallelism has very bad impact on performance

2020-11-03 Thread Sidney Feiner
Hey, I just ran a simple consumer that does nothing but consume event event 
(without aggregating) and every slot handles above 3K per second, and with 
parallelism set to 15, it succesffully handles 45K events per second


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



From: Yangze Guo 
Sent: Tuesday, November 3, 2020 5:00 AM
To: Sidney Feiner 
Cc: user@flink.apache.org 
Subject: Re: Increase in parallelism has very bad impact on performance

Hi, Sidney,

What is the data generation rate of your Kafka topic? Is it a lot
bigger than 6000?

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Nov 3, 2020 at 8:45 AM Sidney Feiner  wrote:
>
> Hey,
> I'm writing a Flink app that does some transformation on an event consumed 
> from Kafka and then creates time windows keyed by some field, and apply an 
> aggregation on all those events.
> When I run it with parallelism 1, I get a throughput of around 1.6K events 
> per second (so also 1.6K events per slot). With parallelism 5, that goes down 
> to 1.2K events per slot, and when I increase the parallelism to 10, it drops 
> to 600 events per slot.
> Which means that parallelism 5 and parallelism 10, give me the same total 
> throughput (1.2x5 = 600x10).
>
> I noticed that although I have 3 Task Managers, all the all the tasks are run 
> on the same machine, causing it's CPU to spike and probably, this is the 
> reason that the throughput dramatically decreases. After increasing the 
> parallelism to 15 and now tasks run on 2/3 machines, the average throughput 
> per slot is still around 600.
>
> What could cause this dramatic decrease in performance?
>
> Extra info:
>
> Flink version 1.9.2
> Flink High Availability mode
> 3 task managers, 66 slots total
>
>
> Execution plan:
>
>
> Any help would be much appreciated
>
>
> Sidney Feiner / Data Platform Developer
> M: +972.528197720 / Skype: sidney.feiner.startapp
>
>


Increase in parallelism has very bad impact on performance

2020-11-02 Thread Sidney Feiner
Hey,
I'm writing a Flink app that does some transformation on an event consumed from 
Kafka and then creates time windows keyed by some field, and apply an 
aggregation on all those events.
When I run it with parallelism 1, I get a throughput of around 1.6K events per 
second (so also 1.6K events per slot). With parallelism 5, that goes down to 
1.2K events per slot, and when I increase the parallelism to 10, it drops to 
600 events per slot.
Which means that parallelism 5 and parallelism 10, give me the same total 
throughput (1.2x5 = 600x10).

I noticed that although I have 3 Task Managers, all the all the tasks are run 
on the same machine, causing it's CPU to spike and probably, this is the reason 
that the throughput dramatically decreases. After increasing the parallelism to 
15 and now tasks run on 2/3 machines, the average throughput per slot is still 
around 600.

What could cause this dramatic decrease in performance?

Extra info:

  *   Flink version 1.9.2
  *   Flink High Availability mode
  *   3 task managers, 66 slots total

Execution plan:
[cid:04ba7b84-819d-45b6-98cd-446127a0255b]

Any help would be much appreciated 



Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



Re: Windows on SinkFunctions

2020-03-29 Thread Sidney Feiner
Thanks!
What am I supposed to put in the apply/process function for the sink to be 
invoked on a List of items?


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



From: tison 
Sent: Sunday, March 22, 2020 4:19 PM
To: Sidney Feiner 
Cc: user@flink.apache.org 
Subject: Re: Windows on SinkFunctions

Hi Sidney,

For the case, you can exactly write

stream.
  ...
  .window()
  .apply()
  .addSink()

Operator chain will chain these operators into one so that you don't have to 
worry about the efficiency.

Best,
tison.


Sidney Feiner mailto:sidney.fei...@startapp.com>> 
于2020年3月22日周日 下午10:03写道:
Hey,
I wanted to know if it's possible to define a SinkFunction as a WindowFunction 
as well.
For example, I would like the sink to be invoked every 5 minute or once 500 
events reached the sink.
Is there a way to do this inside the sink implementation? Or do I have to 
create the windows prior in the pipeline?
Because if I have multiple sinks that that only for one of them I need a 
Window, the second solution might be problematic.

Thanks :)


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



Windows on SinkFunctions

2020-03-22 Thread Sidney Feiner
Hey,
I wanted to know if it's possible to define a SinkFunction as a WindowFunction 
as well.
For example, I would like the sink to be invoked every 5 minute or once 500 
events reached the sink.
Is there a way to do this inside the sink implementation? Or do I have to 
create the windows prior in the pipeline?
Because if I have multiple sinks that that only for one of them I need a 
Window, the second solution might be problematic.

Thanks :)


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



KafkaConsumer keeps getting InstanceAlreadyExistsException

2020-03-15 Thread Sidney Feiner
Hey,
I've been using Flink for a while now without any problems when running apps 
with a FlinkKafkaConsumer.
All my apps have the same overall logic (consume from kafka -> transform event 
-> write to file) and the only way they differ from each other is the topic 
they read (remaining kafka config remains identical) and the way they transform 
the event.
But suddenly, I've been starting to get the following error:


2020-03-15 12:13:56,911 WARN  org.apache.kafka.common.utils.AppInfoParser   
- Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: 
kafka.consumer:type=app-info,id=consumer-1
   at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
   at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
   at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
   at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
   at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
   at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
   at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:805)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:659)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:639)
   at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
   at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
   at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
   at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
   at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
   at java.lang.Thread.run(Thread.java:748)


I've tried setting the "client.id" on my consumer to a random UUID, making sure 
I don't have any duplicates but that didn't help either.
Any idea what could be causing this?

Thanks 

Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



Re: Flink Metrics - PrometheusReporter

2020-01-22 Thread Sidney Feiner
Ok, I configured the PrometheusReporter's ports to be a range and now every 
TaskManager has it's own port where I can see it's metrics. Thank you very much!


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



From: Chesnay Schepler 
Sent: Wednesday, January 22, 2020 6:07 PM
To: Sidney Feiner ; flink-u...@apache.org 

Subject: Re: Flink Metrics - PrometheusReporter

Metrics are exposed via reporters by each process separately, whereas the WebUI 
aggregates metrics.

As such you have to configure Prometheus to also scrape the TaskExecutors.

On 22/01/2020 16:58, Sidney Feiner wrote:
Hey,
I've been trying to use the PrometheusReporter and when I used in locally on my 
computer, I would access the port I configured and see all the metrics I've 
created.
In production, we use High Availability mode and when I try to access the 
JobManager's metrics in the port I've configured on the PrometheusReporter, I 
see some very basic metrics - default Flink metrics, but I can't see any of my 
custom metrics.

Weird thing is I can see those metrics through Flink's UI in the Metrics tab:
[cid:part1.8D6219CF.AA6B4229@apache.org]

Does anybody have a clue why my custom metrics are configured but not being 
reported in high availability but are reported when I run the job locally 
though IntelliJ?

Thanks 



Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]




Flink Metrics - PrometheusReporter

2020-01-22 Thread Sidney Feiner
Hey,
I've been trying to use the PrometheusReporter and when I used in locally on my 
computer, I would access the port I configured and see all the metrics I've 
created.
In production, we use High Availability mode and when I try to access the 
JobManager's metrics in the port I've configured on the PrometheusReporter, I 
see some very basic metrics - default Flink metrics, but I can't see any of my 
custom metrics.

Weird thing is I can see those metrics through Flink's UI in the Metrics tab:
[cid:dc6050e2-a947-4856-8339-5daea66b6a77]

Does anybody have a clue why my custom metrics are configured but not being 
reported in high availability but are reported when I run the job locally 
though IntelliJ?

Thanks 




Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



Different jobName per Job when reporting Flink metrics to PushGateway

2019-12-17 Thread Sidney Feiner
I'm using Flink 1.9.1 with PrometheusPushGateway to report my metrics. The 
jobName the metrics are reported with is defined in the flink-conf.yaml file 
which makes the jobName identical for all jobs who run on the cluster, but I 
want a different jobName to be reported for every running job. To do so, I 
tried doing the following in my code before executing the Stream:

Configuration conf = GlobalConfiguration.loadConfiguration();
conf.setString(
"metrics.reporter.promgateway.jobName",
conf.getString("metrics.reporter.promgateway.jobName", "") + "-" + 
pipeline
);
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(conf);


When pipeline is a String variable.

When running the job locally, it worked. But now I'm running flink in High 
Availability mode and it doesn't work anymore :( The config I override in the 
code is ignored.

So how can I change the jobName per job? And if I can't, is there a way to set 
additional Labels when reporting the metrics? Because I haven't seen an option 
for that as well.

Thanks :)


I've posted this on StackOverflow as well - 
here<https://stackoverflow.com/questions/59376693/different-jobname-per-job-when-reporting-flink-metrics-to-pushgateway>
 :)



Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



Re: Fw: Metrics based on data filtered from DataStreamSource

2019-12-16 Thread Sidney Feiner
You are right with everything you say!
The solution you propose is actually what I'm trying to avoid. I'd prefer not 
to consume messages I don't plan on actually handling.
But from what you say it sounds I have no other choice. Am I right? I MUST 
consume the messages, count those I want to filter out and then simply not 
handle them?
Which means I must filter them in the task itself and I have no way of 
filtering them directly from the data source?



Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



From: vino yang 
Sent: Monday, December 16, 2019 7:56 AM
To: Sidney Feiner 
Cc: user@flink.apache.org 
Subject: Re: Fw: Metrics based on data filtered from DataStreamSource

Hi Sidney,

Firstly, the `open` method of UDF's instance is always invoked when the task 
thread starts to run.

From the second code snippet image that you provided, I guess you are trying to 
get a dynamic handler with reflection technology, is that correct? I also guess 
that you want to get a dynamic instance of a handler in the runtime, correct me 
if I am wrong.

IMO, you may misunderstand the program you write and the runtime of Task, the 
purpose of your program is used to build the job graph. The business logic in 
UDF is used to describe the user's business logic.

For your scene, if many types of events exist in one topic, you can consume 
them and group by the type then count them?

Best,
Vino

Sidney Feiner mailto:sidney.fei...@startapp.com>> 
于2019年12月16日周一 上午12:54写道:
Hey,
I have a question about using metrics based on filtered data.
Basically, I have handlers for many types of events I get from my data source 
(in my case, Kafka), and every handler has it's own filter function.
That given handler also has a Counter, incrementing every time it filters out 
an event (as part of the FilterFunction).

Problem arrises when I use that FilterFunction on the DataSourceStream - the 
handler's open() function hasn't been called and thus the metrics have never 
been initiated.
Do I have a way of making this work? Or any other way of counting events that 
have been filtered out from the DataStreamSource?

Handler:

public abstract class Handler extends RichMapFunction {
private transient Counter filteredCounter;
private boolean isInit = false;

@Override
public void open(Configuration parameters) throws Exception {
if (!isInit) {
MetricGroup metricGroup = 
getRuntimeContext().getMetricGroup().addGroup(getClass().getSimpleName());
filteredCounter = 
metricGroup.counter(CustomMetricsManager.getFilteredSuffix());
isInit = true;
}
}

public final FilterFunction getFilter() {
return (FilterFunction) event -> {
boolean res = filter(event);
if (!res) {
filteredCounter.inc();
}
return res;
};
}

abstract protected boolean filter(Event event);
}

And when I init the DataStreamSource:

Handler handler = (Handler) 
Class.forName(handlerName).getConstructor().newInstance();
dataStream = dataStreamSource.filter(handler.getFilter()).map(handler);

Any help would be much appreciated!

Thanks 





Fw: Metrics based on data filtered from DataStreamSource

2019-12-15 Thread Sidney Feiner
Hey,
I have a question about using metrics based on filtered data.
Basically, I have handlers for many types of events I get from my data source 
(in my case, Kafka), and every handler has it's own filter function.
That given handler also has a Counter, incrementing every time it filters out 
an event (as part of the FilterFunction).

Problem arrises when I use that FilterFunction on the DataSourceStream - the 
handler's open() function hasn't been called and thus the metrics have never 
been initiated.
Do I have a way of making this work? Or any other way of counting events that 
have been filtered out from the DataStreamSource?

Handler:

public abstract class Handler extends RichMapFunction {
private transient Counter filteredCounter;
private boolean isInit = false;

@Override
public void open(Configuration parameters) throws Exception {
if (!isInit) {
MetricGroup metricGroup = 
getRuntimeContext().getMetricGroup().addGroup(getClass().getSimpleName());
filteredCounter = 
metricGroup.counter(CustomMetricsManager.getFilteredSuffix());
isInit = true;
}
}

public final FilterFunction getFilter() {
return (FilterFunction) event -> {
boolean res = filter(event);
if (!res) {
filteredCounter.inc();
}
return res;
};
}

abstract protected boolean filter(Event event);
}

And when I init the DataStreamSource:

Handler handler = (Handler) 
Class.forName(handlerName).getConstructor().newInstance();
dataStream = dataStreamSource.filter(handler.getFilter()).map(handler);

Any help would be much appreciated!

Thanks 