Re: Flink 1.14.4 HybridSource consumes lots of CPU resources

2022-05-03 Thread Thomas Weise
Thank you for reporting the issue. Mason has already identified the root
cause and the JIRA is now assigned to him:
https://issues.apache.org/jira/browse/FLINK-27479

Thomas

On Tue, May 3, 2022 at 4:02 AM Martijn Visser 
wrote:

> I'm looping in @Thomas Weise  since he has expertise on
> the HybridSource.
>
> On Tue, 3 May 2022 at 12:04, Arthur Li  wrote:
>
>> Hi Mason,
>>
>> I upload  the code and resource files to AwesomeArthurLi/quickstart:
>> quickstart (github.com) ,
>> may it will help you reproduce the issue.
>>
>> BR.
>> Arthur Li
>>
>>
>> 2022年5月3日 15:48,Mason Chen  写道:
>>
>> Hi Arthur,
>>
>> Coincidentally, I also encountered a similar issue recently. For my
>> issue, I noticed that the source implementation always marks itself as
>> having data available causing the Flink runtime to repeatedly loop in
>> succession and causing high CPU utilization. More details in here:
>> https://issues.apache.org/jira/browse/FLINK-27479
>>
>> Can you provide a minimal working example to reproduce this issue? I
>> presume you notice high CPU utilization before switching from FileSource
>> and also after switching to KafkaSource?
>>
>> Best,
>> Mason
>>
>> On Sun, May 1, 2022 at 6:24 AM Arthur Li  wrote:
>>
>>> Following snapshot is the java process’s frame graph.
>>>
>>> <粘贴的图形-1.png>
>>>
>>>
>>> 2022年5月1日 09:14,Arthur Li  写道:
>>>
>>> Hi all,
>>>
>>>  the Hybrid Source | Apache Flink
>>> 
>>>  is
>>> one of new features of Flink 1.14.x,  but one problem is it takes over*
>>> 700% CPU* which is almost 5 times than these two splits.
>>>
>>>
>>> My Environment:
>>> JDK:  11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12"
>>> Scala: Scala code runner version 2.12.14
>>> OS: MacOS Monterey
>>>
>>>
>>> Hybrid Source Code:
>>>
>>> object HelloHybrid {
>>>
>>>   def main(args: Array[String]): Unit = {
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> val kafka =
>>>   KafkaSource.builder[String]()
>>> .setBootstrapServers("localhost:9092")
>>> .setTopics("lab-flink-sensor-iot")
>>> .setGroupId("sensor-iot-group")
>>> .setStartingOffsets(OffsetsInitializer.earliest())
>>> .setValueOnlyDeserializer(new SimpleStringSchema())
>>> .build()
>>>
>>> val sensorDataFile = 
>>> "/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv"
>>> val fileData = FileSource.forRecordStreamFormat(
>>>   new TextLineFormat(),
>>>   Path.fromLocalFile(new File(sensorDataFile)))
>>>   .build()
>>>
>>> val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build()
>>>
>>> env.fromSource(hybridSrc,
>>>   WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),
>>>   "kafka & file hybrid source")
>>>   .map(data => {
>>> val arr = data.split(",").map(_.trim)
>>> SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
>>>   })
>>>   .print("hybrid")
>>>
>>> env.execute("Hello kafka & file hybrid source")
>>>   }
>>> }
>>>
>>>
>>>
>>>
>>>
>>


Re: Migrating Flink apps across cloud with state

2022-05-03 Thread Austin Cawley-Edwards
Hey Hemanga,

That's quite annoying of MirrorMaker to change the offsets on you. One
solution would be to use the State Processor API[1] to read the savepoint
and update the offsets to the new ones — does MirrorMaker give you these
ahead of time? There might also be more specific tricks people could give
if you're able to share which cloud/ cloud services you're migrating to and
from.

Best,
Austin

[1]:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/

On Tue, May 3, 2022 at 5:11 PM Hemanga Borah 
wrote:

> Any ideas, guys?
>
> On Mon, May 2, 2022 at 6:11 PM Hemanga Borah 
> wrote:
>
>> Hello,
>>  We are attempting to port our Flink applications from one cloud provider
>> to another.
>>
>>  These Flink applications consume data from Kafka topics and output to
>> various destinations (Kafka or databases). The applications have states
>> stored in them. Some of these stored states are aggregations, for example,
>> at times we store hours (or days) worth of data to aggregate over time.
>> Some other applications have cached information for data enrichment, for
>> example, we store data in Flink state for days, so that we can join them
>> with newly arrived data. The amount of data on the input topics is a lot,
>> and it will be expensive to reprocess the data from the beginning of the
>> topic.
>>
>>  As such, we want to retain the state of the application when we move to
>> a different cloud provider so that we can retain the aggregations and
>> cache, and do not have to start from the beginning of the input topics.
>>
>>  We are replicating the Kafka topics using MirrorMaker 2. This is our
>> procedure:
>>
>>- Replicate the input topics of each Flink application from source
>>cloud to destination cloud.
>>- Take a savepoint of the Flink application on the source cloud
>>provider.
>>- Start the Flink application on the destination cloud provider using
>>the savepoint from the source cloud provider.
>>
>>
>> However, this does not work as we want because there is a difference in
>> offset in the new topics in the new cloud provider (because of MirrorMaker
>> implementation). The offsets of the new topic do not match the ones stored
>> on the Flink savepoint, hence, Flink cannot map to the offsets of the new
>> topic during startup.
>>
>> Has anyone tried to move clouds while retaining the Flink state?
>>
>> Thanks,
>> Hemanga
>>
>


Flink serialization errors at a batch job

2022-05-03 Thread Yunus Olgun
Hi,

We're running a large Flink batch job and sometimes it throws serialization
errors in the middle of the job. It is always the same operator but the
error can be different. Then the following attempts work. Or sometimes
attempts get exhausted, then retrying the job.

The job is basically reading a list of filenames, downloading them from
GCS, doing a groupBy- reduce and then writing it. The error happens at the
reducing operator.

We use Flink 1.13.6 and Beam 2.35.

1 - Do you know what may be going wrong here or how to debug it further?
2 - Attempts require reading all data again. Is there any way to fasten the
recovery time in cases like this?

Thanks,

>> Example stacktrace 1:

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
(GroupReduce at groupByKey@{xxx}' , caused an error:
java.util.concurrent.ExecutionException: java.lang.RuntimeException:
Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
terminated due to an exception: Serializer consumed more bytes than
the record had. This indicates broken serialization. If you are using
custom serialization types (Value or Writable), check their
serialization methods. If you are using a Kryo-serialized type, check
the corresponding Kryo serializer.
  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:492)
  at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
  at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.WrappingRuntimeException:
java.util.concurrent.ExecutionException: java.lang.RuntimeException:
Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
terminated due to an exception: Serializer consumed more bytes than
the record had. This indicates broken serialization. If you are using
custom serialization types (Value or Writable), check their
serialization methods. If you are using a Kryo-serialized type, check
the corresponding Kryo serializer.
  at 
org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:260)
  at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1227)
  at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:105)
  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484)
  ... 4 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Error obtaining the sorted input: Thread
'SortMerger Reading Thread' terminated due to an exception: Serializer
consumed more bytes than the record had. This indicates broken
serialization. If you are using custom serialization types (Value or
Writable), check their serialization methods. If you are using a
Kryo-serialized type, check the corresponding Kryo serializer.
  at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source)
  at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
  at 
org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:257)
  ... 7 more
Caused by: java.lang.RuntimeException: Error obtaining the sorted
input: Thread 'SortMerger Reading Thread' terminated due to an
exception: Serializer consumed more bytes than the record had. This
indicates broken serialization. If you are using custom serialization
types (Value or Writable), check their serialization methods. If you
are using a Kryo-serialized type, check the corresponding Kryo
serializer.
  at 
org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:254)
  at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(Unknown
Source)
  at 
java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown
Source)
  at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
Source)
  at 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
Source)
  at 
org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:392)
  at 
org.apache.flink.runtime.operators.sort.ThreadBase.internalHandleException(ThreadBase.java:121)
  at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:75)
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
terminated due to an exception: Serializer consumed more bytes than
the record had. This indicates broken serialization. If you are using
custom serialization types (Value or Writable), check their
serialization methods. If you are using a Kryo-serialized type, check
the corresponding Kryo serializer.
  at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:80)
Caused by: java.io.IOException: Serializer consumed more bytes than
the record had. This indicates broken serialization. If you are using
custom serialization types (Value or Writable), 

Re: Migrating Flink apps across cloud with state

2022-05-03 Thread Hemanga Borah
Any ideas, guys?

On Mon, May 2, 2022 at 6:11 PM Hemanga Borah 
wrote:

> Hello,
>  We are attempting to port our Flink applications from one cloud provider
> to another.
>
>  These Flink applications consume data from Kafka topics and output to
> various destinations (Kafka or databases). The applications have states
> stored in them. Some of these stored states are aggregations, for example,
> at times we store hours (or days) worth of data to aggregate over time.
> Some other applications have cached information for data enrichment, for
> example, we store data in Flink state for days, so that we can join them
> with newly arrived data. The amount of data on the input topics is a lot,
> and it will be expensive to reprocess the data from the beginning of the
> topic.
>
>  As such, we want to retain the state of the application when we move to a
> different cloud provider so that we can retain the aggregations and cache,
> and do not have to start from the beginning of the input topics.
>
>  We are replicating the Kafka topics using MirrorMaker 2. This is our
> procedure:
>
>- Replicate the input topics of each Flink application from source
>cloud to destination cloud.
>- Take a savepoint of the Flink application on the source cloud
>provider.
>- Start the Flink application on the destination cloud provider using
>the savepoint from the source cloud provider.
>
>
> However, this does not work as we want because there is a difference in
> offset in the new topics in the new cloud provider (because of MirrorMaker
> implementation). The offsets of the new topic do not match the ones stored
> on the Flink savepoint, hence, Flink cannot map to the offsets of the new
> topic during startup.
>
> Has anyone tried to move clouds while retaining the Flink state?
>
> Thanks,
> Hemanga
>


Setting boundedness for legacy Hadoop sequence file sources

2022-05-03 Thread Ken Krugler
Hi all,

I’m converting several batch Flink workflows to streaming, with bounded sources.

Some of our sources are reading Hadoop sequence files via 
StreamExecutionEnvironment.createInput(HadoopInputFormat).

The problem is that StreamGraphGenerator.existsUnboundedSource is returning 
true, because the LegacySourceTransformation for this source says it’s 
CONTINUOUS_UNBOUNDED. So the workflow fails to run, because I’ve set the 
execution mode to batch.

The root cause is that StreamExecutionEnvironment.createInput() checks if the 
input format extends FileInputFormat, and only sets up a bounded source if it 
does. HadoopInputFormat doesn’t extend FileInputFormat, so boundedness gets set 
to CONTINUOUS_UNBOUNDED, which is wrong.

This looks like a bug in StreamExecutionEnvironment.createInput(), though not 
sure how best to fix it. Relying on class checks feels brittle.

Regards,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Flink checkpointing with slow pipelines

2022-05-03 Thread Deepak Nagaraj
Hi Flink users,

Greetings. I have a question on how Flink invokes checkpoints with a slow
pipeline.

I have a Beam streaming pipeline with one Map() call. It is a Python
program running on Flink with PortableRunner. I’ve experimented with
varying amounts of sleep inside this call to simulate slowness. The
pipeline reads from Kafka, windows into 1-minute fixed windows, and writes
to a file. The pipeline parallelism is 1, and bundle size is 2. Checkpoint
interval is 30s and timeout is 1min.

I post messages to Kafka with kcat utility. The messages are all 32 bytes,
but I can vary the number of messages posted.

With sleep() < 0.6 seconds, i.e. a fast pipeline, I see checkpoints getting
started even when Kafka backlog > 0, i.e. when all the Kafka messages are
not fully drained.

However, with longer sleep() i.e. slower pipeline, I don’t see a checkpoint
getting started until the backlog goes all the way down to 0. I also don’t
see a “Received barrier” message until backlog gets to zero.

Annotated example logs later below. I’m happy to provide additional details
and logs, or run experiments on my setup.


My question is: what causes a fast pipeline to be able to start checkpoints
even when there are outstanding Kafka messages, but this fails on a slow
pipeline?

Thanks,

Deepak

# Posting to Kafka

2022-05-03 09:53:37,184 DEBUG
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader[] -
Reader-0:  backlog 17157

# Checkpoint was triggered

2022-05-03 09:53:38,924 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Trigger
checkpoint 2@1651596818922 for fb40570da5f4dd41e458af269c1a2eaf.

# Messages were slowly getting drained

# ...

2022-05-03 09:53:53,184 DEBUG
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader[] -
Reader-0:  backlog 11696

# Even with non-zero backlog, I saw a checkpoint getting triggered (this is
what I want!)

2022-05-03 09:53:53,369 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask  [] - Starting
checkpoint (2) CHECKPOINT on task Source:
ReadMessages/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource)
-> Flat Map -> Map -> [1]ReadMessages/Remove Kafka Metadata ->
[7]{CreateKafkaRecord, Process1, Window, Write to file} -> ToKeyedWorkItem
(1/1)#0

# Q: I cannot get Beam/Flink to behave this way, i.e. trigger checkpoint
with non-zero backlog,

#   with slower pipelines (sleep() > 0.6s). Why?


Issue with HybridSource recovering from Savepoint

2022-05-03 Thread Kevin Lam
Hi,

We're encountering an error using a HybridSource that is composed of a
FileSource + KafkaSource, only when recovering from a savepoint [0]. This
HybridSource is used to read from a Kafka topic's archives hosted on GCS
via a bounded FileSource, and then automatically switch over to the data
stream from the Kafka associated topic.

Has anyone seen this error before?

[0]
```
2022-05-03 09:47:57
org.apache.flink.util.FlinkException: Global failure triggered by
OperatorCoordinator for 'Source: ShopAppTrackingEventUpdate_V1' (operator
afb3208349a953c47059c1994f800aa2).
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545)
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:358)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException: Source for index=0 not available
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
at
org.apache.flink.connector.base.source.hybrid.SwitchedSources.sourceOf(SwitchedSources.java:36)
at
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:148)
at
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:222)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344)
... 3 more
```


Re: Using the official flink operator and kubernetes secrets

2022-05-03 Thread Yang Wang
Flink could not support environment replacement in the args. I think you
could access the env via "*System.getenv()*" in the user main method.
It should work since the user main method is executed in the JobManager
side.

Best,
Yang

Őrhidi Mátyás  于2022年4月28日周四 19:27写道:

> Also,
>
> just declaring it in the flink configs should be sufficient, no need to
> define it in the pod templates:
>
> flinkConfiguration:
> kubernetes.env.secretKeyRef: 
> "env:DJANGO_TOKEN,secret:switchdin-django-token,key:token"
>
>
> Best,
> Matyas
>
> On Thu, Apr 28, 2022 at 1:17 PM Őrhidi Mátyás 
> wrote:
>
>> Hi Francis,
>>
>> I suggest accessing the environment variables directly, no need to pass
>> them as command arguments I guess.
>>
>> Best,
>> Matyas
>>
>> On Thu, Apr 28, 2022 at 11:31 AM Francis Conroy <
>> francis.con...@switchdin.com> wrote:
>>
>>> Hi all,
>>>
>>> I'm trying to use a kubernetes secret as a command line argument in my
>>> job and the text replacement doesn't seem to be happening. I've verified
>>> passing the custom args via the command line on my local flink cluster but
>>> can't seem to get the environment var replacement to work.
>>>
>>> apiVersion: flink.apache.org/v1alpha1
>>> kind: FlinkDeployment
>>> metadata:
>>>   namespace: default
>>>   name: http-over-mqtt
>>> spec:
>>>   image: flink:1.14.4-scala_2.12-java11
>>>   flinkVersion: v1_14
>>>   flinkConfiguration:
>>> taskmanager.numberOfTaskSlots: "2"
>>> kubernetes.env.secretKeyRef: 
>>> "env:DJANGO_TOKEN,secret:switchdin-django-token,key:token"
>>> #containerized.taskmanager.env.DJANGO_TOKEN: "$DJANGO_TOKEN"
>>>   serviceAccount: flink
>>>   jobManager:
>>> replicas: 1
>>> resource:
>>>   memory: "1024m"
>>>   cpu: 1
>>>   taskManager:
>>> resource:
>>>   memory: "1024m"
>>>   cpu: 1
>>>   podTemplate:
>>> spec:
>>>   serviceAccount: flink
>>>   containers:
>>> - name: flink-main-container
>>>   volumeMounts:
>>> - mountPath: /flink-job
>>>   name: flink-jobs
>>>   env:
>>> - name: DJANGO_TOKEN  # kubectl create secret generic 
>>> switchdin-django-token --from-literal=token='[TOKEN]'
>>>   valueFrom:
>>> secretKeyRef:
>>>   name: switchdin-django-token
>>>   key: token
>>>   optional: false
>>>   initContainers:
>>> - name: grab-mqtt-over-http-jar
>>>   image: docker-push.k8s.local/test/switchdin/platform_flink:job-41
>>>   command: [ "/bin/sh", "-c",
>>>  "cp /opt/switchdin/* /tmp/job/." ]  # Copies the jar 
>>> in the init container to the flink-jobs volume
>>>   volumeMounts:
>>> - name: flink-jobs
>>>   mountPath: /tmp/job
>>>   volumes:
>>> - name: flink-jobs
>>>   emptyDir: { }
>>>   job:
>>> jarURI: local:///flink-job/switchdin-topologies-1.0-SNAPSHOT.jar
>>> args: ["--swit-django-token", "$DJANGO_TOKEN",
>>>"--swit-prod","false"]
>>> entryClass: org.switchdin.HTTPOverMQTT
>>> parallelism: 1
>>> upgradeMode: stateless
>>> state: running
>>>
>>> In the logs I can see:
>>>
>>> 2022-04-28 08:43:02,329 WARN org.switchdin.HTTPOverMQTT [] - ARGS ARE {}
>>> 2022-04-28 08:43:02,329 WARN org.switchdin.HTTPOverMQTT [] -
>>> --swit-django-token
>>> 2022-04-28 08:43:02,330 WARN org.switchdin.HTTPOverMQTT [] -
>>> $DJANGO_TOKEN
>>> 2022-04-28 08:43:02,330 WARN org.switchdin.HTTPOverMQTT [] - --swit-prod
>>> 2022-04-28 08:43:02,330 WARN org.switchdin.HTTPOverMQTT [] - false
>>>
>>> Anyone know how I can do this? I'm considering mounting it in a volume,
>>> but that seems like a lot of hassle for such a small thing.
>>>
>>> Thanks in advance!
>>>
>>>
>>> This email and any attachments are proprietary and confidential and are
>>> intended solely for the use of the individual to whom it is addressed. Any
>>> views or opinions expressed are solely those of the author and do not
>>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>>> received this email in error, please let us know immediately by reply email
>>> and delete it from your system. You may not use, disseminate, distribute or
>>> copy this message nor disclose its contents to anyone.
>>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>>> Australia
>>>
>>


Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-03 Thread Chesnay Schepler
> I noticed that my config of the PrometheusReporter is different here. 
I have: `metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter`. I will 
investigate if this is a problem.


That's not a problem.

> Which trace logs are interesting?

The logging config I provided should highlight the relevant bits 
(com.sun.net.httpserver).
At least in my local tests this is where any interesting things were 
logged.

Note that this part of the code uses java.util.logging, not slf4j/log4j.

> When running a local flink (start-cluster.sh), I do not have a 
certain url/port to access the taskmanager, right?


If you configure a port range it should be as simple as curl 
localhost:.

You can find the used port in the taskmanager logs.
Or just try the first N ports in the range ;)

On 03/05/2022 14:11, Peter Schrott wrote:

Hi Chesnay,

Thanks for the code snipped. Which trace logs are interesting? Of 
"org.apache.flink.metrics.prometheus.PrometheusReporter"?
I could also add this logger settings in the environment where the 
problem is present.


Other than that, I am not sure how to reproduce this issue in a local 
setup. In the cluster where the metrics are missing I am navigating to 
the certain taskmanager and try to access the metrics via the 
configured prometheus port. When running a local flink 
(start-cluster.sh), I do not have a certain url/port to access the 
taskmanager, right?


I noticed that my config of the PrometheusReporter is different here. 
I have: `metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter`. I will 
investigate if this is a problem.


Unfortunately I can not provide my job at the moment. It 
contains business logic and it is tightly coupled with our Kafka 
systems. I will check the option of creating a sample job to reproduce 
the problem.


Best, Peter

On Tue, May 3, 2022 at 12:48 PM Chesnay Schepler  
wrote:


You'd help me out greatly if you could provide me with a sample
job that runs into the issue.

So far I wasn't able to reproduce the issue,
but it should be clear that there is some given 3 separate reports,
although it is strange that so far it was only reported for
Prometheus.

If one of you is able to reproduce the issue within a Test and is
feeling adventurous,
then you might be able to get more information by forwarding the
java.util.logging
to SLF4J. Below is some code to get you started.

DebuggingTest.java:

class DebuggingTest {

 static {
 LogManager.getLogManager().getLogger("").setLevel(Level.FINEST);
 SLF4JBridgeHandler.removeHandlersForRootLogger();
 SLF4JBridgeHandler.install();
 miniClusterExtension =
 new MiniClusterExtension(
 new MiniClusterResourceConfiguration.Builder()
 .setConfiguration(getConfiguration())
 .setNumberSlotsPerTaskManager(1)
 .build());
 }

 @RegisterExtension private static final MiniClusterExtension 
miniClusterExtension;

 private static Configuration getConfiguration() {
 final Configuration configuration = new Configuration();

 configuration.setString(
 "metrics.reporter.prom.factory.class", 
PrometheusReporterFactory.class.getName());
 configuration.setString("metrics.reporter.prom.port", "9200-9300");

 return configuration;
 }

 @Test
 void runJob() throws Exception {
 
 }
}


pom.xml:


org.slf4j
jul-to-slf4j
1.7.32

log4j2-test.properties:

rootLogger.level = off
rootLogger.appenderRef.test.ref = TestLogger

logger.http.name    = com.sun.net.httpserver
logger.http.level = trace

appender.testlogger.name    = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n

On 03/05/2022 10:41, ChangZhuo Chen (陳昌倬) wrote:

On Tue, May 03, 2022 at 10:32:03AM +0200, Peter Schrott wrote:

Hi!

I also discovered problems with the PrometheusReporter on Flink 1.15.0,
coming from 1.14.4. I already consulted the mailing list:
https://lists.apache.org/thread/m8ohrfkrq1tqgq7lowr9p226z3yc0fgc
I have not found the underlying problem or a solution to it.

Actually, after re-checking, I see the same log WARNINGS as
ChangZhou described.

As I described, it seems to be an issue with my job. If no job, or an
example job runs on the taskmanager the basic metrics work just fine. Maybe
ChangZhou can confirm this?

@ChangZhou what's your job setup? I am running a streaming SQL 

Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-03 Thread Peter Schrott
Hi Chesnay,

Thanks for the code snipped. Which trace logs are interesting? Of "
org.apache.flink.metrics.prometheus.PrometheusReporter"?
I could also add this logger settings in the environment where the problem
is present.

Other than that, I am not sure how to reproduce this issue in a local
setup. In the cluster where the metrics are missing I am navigating to the
certain taskmanager and try to access the metrics via the configured
prometheus port. When running a local flink (start-cluster.sh), I do not
have a certain url/port to access the taskmanager, right?

I noticed that my config of the PrometheusReporter is different here. I
have: `metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter`. I will investigate
if this is a problem.

Unfortunately I can not provide my job at the moment. It
contains business logic and it is tightly coupled with our Kafka systems. I
will check the option of creating a sample job to reproduce the problem.

Best, Peter

On Tue, May 3, 2022 at 12:48 PM Chesnay Schepler  wrote:

> You'd help me out greatly if you could provide me with a sample job that
> runs into the issue.
>
> So far I wasn't able to reproduce the issue,
> but it should be clear that there is some given 3 separate reports,
> although it is strange that so far it was only reported for Prometheus.
>
> If one of you is able to reproduce the issue within a Test and is feeling
> adventurous,
> then you might be able to get more information by forwarding the
> java.util.logging
> to SLF4J. Below is some code to get you started.
>
> DebuggingTest.java:
>
> class DebuggingTest {
>
> static {
> LogManager.getLogManager().getLogger("").setLevel(Level.FINEST);
> SLF4JBridgeHandler.removeHandlersForRootLogger();
> SLF4JBridgeHandler.install();
> miniClusterExtension =
> new MiniClusterExtension(
> new MiniClusterResourceConfiguration.Builder()
> .setConfiguration(getConfiguration())
> .setNumberSlotsPerTaskManager(1)
> .build());
> }
>
> @RegisterExtension private static final MiniClusterExtension 
> miniClusterExtension;
>
> private static Configuration getConfiguration() {
> final Configuration configuration = new Configuration();
>
> configuration.setString(
> "metrics.reporter.prom.factory.class", 
> PrometheusReporterFactory.class.getName());
> configuration.setString("metrics.reporter.prom.port", "9200-9300");
>
> return configuration;
> }
>
> @Test
> void runJob() throws Exception {
> 
> }
> }
>
>
> pom.xml:
>
> 
>org.slf4j
>jul-to-slf4j
>1.7.32
> 
>
> log4j2-test.properties:
>
> rootLogger.level = off
> rootLogger.appenderRef.test.ref = TestLogger
> logger.http.name = com.sun.net.httpserver
> logger.http.level = trace
> appender.testlogger.name = TestLogger
> appender.testlogger.type = CONSOLE
> appender.testlogger.target = SYSTEM_ERR
> appender.testlogger.layout.type = PatternLayout
> appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
>
> On 03/05/2022 10:41, ChangZhuo Chen (陳昌倬) wrote:
>
> On Tue, May 03, 2022 at 10:32:03AM +0200, Peter Schrott wrote:
>
> Hi!
>
> I also discovered problems with the PrometheusReporter on Flink 1.15.0,
> coming from 1.14.4. I already consulted the mailing 
> list:https://lists.apache.org/thread/m8ohrfkrq1tqgq7lowr9p226z3yc0fgc
> I have not found the underlying problem or a solution to it.
>
> Actually, after re-checking, I see the same log WARNINGS as
> ChangZhou described.
>
> As I described, it seems to be an issue with my job. If no job, or an
> example job runs on the taskmanager the basic metrics work just fine. Maybe
> ChangZhou can confirm this?
>
> @ChangZhou what's your job setup? I am running a streaming SQL job, but
> also using data streams API to create the streaming environment and from
> that the table environment and finally using a StatementSet to execute
> multiple SQL statements in one job.
>
> We are running a streaming application with low level API with
> Kubernetes operator FlinkDeployment.
>
>
>
>
>


REMINDER - Travel Assistance available for ApacheCon NA New Orleans 2022

2022-05-03 Thread Gavin McDonald
Hi All Contributors and Committers,

This is a first reminder email that travel
assistance applications for ApacheCon NA 2022 are now open!

We will be supporting ApacheCon North America in New Orleans, Louisiana,
on October 3rd through 6th, 2022.

TAC exists to help those that would like to attend ApacheCon events, but
are unable to do so for financial reasons. This year, We are supporting
both committers and non-committers involved with projects at the
Apache Software Foundation, or open source projects in general.

For more info on this year's applications and qualifying criteria, please
visit the TAC website at http://www.apache.org/travel/
Applications are open and will close on the 1st of July 2022.

Important: Applicants have until the closing date above to submit their
applications (which should contain as much supporting material as required
to efficiently and accurately process their request), this will enable TAC
to announce successful awards shortly afterwards.

As usual, TAC expects to deal with a range of applications from a diverse
range of backgrounds. We therefore encourage (as always) anyone thinking
about sending in an application to do so ASAP.

Why should you attend as a TAC recipient? We encourage you to read stories
from
past recipients at https://apache.org/travel/stories/ . Also note that
previous TAC recipients have gone on to become Committers, PMC Members, ASF
Members, Directors of the ASF Board and Infrastructure Staff members.
Others have gone from Committer to full time Open Source Developers!

How far can you go! - Let TAC help get you there.


Re: Flink 1.14.4 HybridSource consumes lots of CPU resources

2022-05-03 Thread Martijn Visser
I'm looping in @Thomas Weise  since he has expertise on the
HybridSource.

On Tue, 3 May 2022 at 12:04, Arthur Li  wrote:

> Hi Mason,
>
> I upload  the code and resource files to AwesomeArthurLi/quickstart:
> quickstart (github.com) ,
> may it will help you reproduce the issue.
>
> BR.
> Arthur Li
>
>
> 2022年5月3日 15:48,Mason Chen  写道:
>
> Hi Arthur,
>
> Coincidentally, I also encountered a similar issue recently. For my issue,
> I noticed that the source implementation always marks itself as having data
> available causing the Flink runtime to repeatedly loop in succession
> and causing high CPU utilization. More details in here:
> https://issues.apache.org/jira/browse/FLINK-27479
>
> Can you provide a minimal working example to reproduce this issue? I
> presume you notice high CPU utilization before switching from FileSource
> and also after switching to KafkaSource?
>
> Best,
> Mason
>
> On Sun, May 1, 2022 at 6:24 AM Arthur Li  wrote:
>
>> Following snapshot is the java process’s frame graph.
>>
>> <粘贴的图形-1.png>
>>
>>
>> 2022年5月1日 09:14,Arthur Li  写道:
>>
>> Hi all,
>>
>>  the Hybrid Source | Apache Flink
>> 
>>  is
>> one of new features of Flink 1.14.x,  but one problem is it takes over*
>> 700% CPU* which is almost 5 times than these two splits.
>>
>>
>> My Environment:
>> JDK:  11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12"
>> Scala: Scala code runner version 2.12.14
>> OS: MacOS Monterey
>>
>>
>> Hybrid Source Code:
>>
>> object HelloHybrid {
>>
>>   def main(args: Array[String]): Unit = {
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val kafka =
>>   KafkaSource.builder[String]()
>> .setBootstrapServers("localhost:9092")
>> .setTopics("lab-flink-sensor-iot")
>> .setGroupId("sensor-iot-group")
>> .setStartingOffsets(OffsetsInitializer.earliest())
>> .setValueOnlyDeserializer(new SimpleStringSchema())
>> .build()
>>
>> val sensorDataFile = 
>> "/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv"
>> val fileData = FileSource.forRecordStreamFormat(
>>   new TextLineFormat(),
>>   Path.fromLocalFile(new File(sensorDataFile)))
>>   .build()
>>
>> val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build()
>>
>> env.fromSource(hybridSrc,
>>   WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),
>>   "kafka & file hybrid source")
>>   .map(data => {
>> val arr = data.split(",").map(_.trim)
>> SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
>>   })
>>   .print("hybrid")
>>
>> env.execute("Hello kafka & file hybrid source")
>>   }
>> }
>>
>>
>>
>>
>>
>


Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-03 Thread Chesnay Schepler
You'd help me out greatly if you could provide me with a sample job that 
runs into the issue.


So far I wasn't able to reproduce the issue,
but it should be clear that there is some given 3 separate reports,
although it is strange that so far it was only reported for Prometheus.

If one of you is able to reproduce the issue within a Test and is 
feeling adventurous,
then you might be able to get more information by forwarding the 
java.util.logging

to SLF4J. Below is some code to get you started.

DebuggingTest.java:

class DebuggingTest {

static {
LogManager.getLogManager().getLogger("").setLevel(Level.FINEST);
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
miniClusterExtension =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfiguration())
.setNumberSlotsPerTaskManager(1)
.build());
}

@RegisterExtension private static final MiniClusterExtension 
miniClusterExtension;

private static Configuration getConfiguration() {
final Configuration configuration = new Configuration();

configuration.setString(
"metrics.reporter.prom.factory.class", 
PrometheusReporterFactory.class.getName());
configuration.setString("metrics.reporter.prom.port", "9200-9300");

return configuration;
}

@Test
void runJob() throws Exception {

}
}


pom.xml:


   org.slf4j
   jul-to-slf4j
   1.7.32

log4j2-test.properties:

rootLogger.level = off
rootLogger.appenderRef.test.ref = TestLogger

logger.http.name = com.sun.net.httpserver
logger.http.level = trace

appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n

On 03/05/2022 10:41, ChangZhuo Chen (陳昌倬) wrote:

On Tue, May 03, 2022 at 10:32:03AM +0200, Peter Schrott wrote:

Hi!

I also discovered problems with the PrometheusReporter on Flink 1.15.0,
coming from 1.14.4. I already consulted the mailing list:
https://lists.apache.org/thread/m8ohrfkrq1tqgq7lowr9p226z3yc0fgc
I have not found the underlying problem or a solution to it.

Actually, after re-checking, I see the same log WARNINGS as
ChangZhou described.

As I described, it seems to be an issue with my job. If no job, or an
example job runs on the taskmanager the basic metrics work just fine. Maybe
ChangZhou can confirm this?

@ChangZhou what's your job setup? I am running a streaming SQL job, but
also using data streams API to create the streaming environment and from
that the table environment and finally using a StatementSet to execute
multiple SQL statements in one job.


We are running a streaming application with low level API with
Kubernetes operator FlinkDeployment.




Re: Flink 1.14.4 HybridSource consumes lots of CPU resources

2022-05-03 Thread Arthur Li
Hi Mason,

I upload  the code and resource files to AwesomeArthurLi/quickstart: quickstart 
(github.com) , may it will help 
you reproduce the issue.

BR.
Arthur Li


> 2022年5月3日 15:48,Mason Chen  写道:
> 
> Hi Arthur,
> 
> Coincidentally, I also encountered a similar issue recently. For my issue, I 
> noticed that the source implementation always marks itself as having data 
> available causing the Flink runtime to repeatedly loop in succession and 
> causing high CPU utilization. More details in here: 
> https://issues.apache.org/jira/browse/FLINK-27479 
> 
> 
> Can you provide a minimal working example to reproduce this issue? I presume 
> you notice high CPU utilization before switching from FileSource and also 
> after switching to KafkaSource?
> 
> Best,
> Mason
> 
> On Sun, May 1, 2022 at 6:24 AM Arthur Li  > wrote:
> Following snapshot is the java process’s frame graph.
> 
> <粘贴的图形-1.png>
> 
> 
>> 2022年5月1日 09:14,Arthur Li mailto:lianyou1...@126.com>> 
>> 写道:
>> 
>> Hi all,
>> 
>>  the Hybrid Source | Apache Flink 
>> 
>>  is one of new features of Flink 1.14.x,  but one problem is it takes over 
>> 700% CPU which is almost 5 times than these two splits.
>> 
>> 
>> My Environment:
>> JDK:  11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12"
>> Scala: Scala code runner version 2.12.14
>> OS: MacOS Monterey
>> 
>> 
>> Hybrid Source Code:
>> 
>> object HelloHybrid {
>>   def main(args: Array[String]): Unit = {
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val kafka =
>>   KafkaSource.builder[String]()
>> .setBootstrapServers("localhost:9092")
>> .setTopics("lab-flink-sensor-iot")
>> .setGroupId("sensor-iot-group")
>> .setStartingOffsets(OffsetsInitializer.earliest())
>> .setValueOnlyDeserializer(new SimpleStringSchema())
>> .build()
>> 
>> val sensorDataFile = 
>> "/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv"
>> val fileData = FileSource.forRecordStreamFormat(
>>   new TextLineFormat(),
>>   Path.fromLocalFile(new File(sensorDataFile)))
>>   .build()
>> 
>> val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build()
>> 
>> env.fromSource(hybridSrc,
>>   WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),
>>   "kafka & file hybrid source")
>>   .map(data => {
>> val arr = data.split(",").map(_.trim)
>> SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
>>   })
>>   .print("hybrid")
>> 
>> env.execute("Hello kafka & file hybrid source")
>>   }
>> }
>> 
>> 
> 



Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-03 Thread 陳昌倬
On Tue, May 03, 2022 at 10:32:03AM +0200, Peter Schrott wrote:
> Hi!
> 
> I also discovered problems with the PrometheusReporter on Flink 1.15.0,
> coming from 1.14.4. I already consulted the mailing list:
> https://lists.apache.org/thread/m8ohrfkrq1tqgq7lowr9p226z3yc0fgc
> I have not found the underlying problem or a solution to it.
> 
> Actually, after re-checking, I see the same log WARNINGS as
> ChangZhou described.
> 
> As I described, it seems to be an issue with my job. If no job, or an
> example job runs on the taskmanager the basic metrics work just fine. Maybe
> ChangZhou can confirm this?
> 
> @ChangZhou what's your job setup? I am running a streaming SQL job, but
> also using data streams API to create the streaming environment and from
> that the table environment and finally using a StatementSet to execute
> multiple SQL statements in one job.


We are running a streaming application with low level API with
Kubernetes operator FlinkDeployment.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-03 Thread 陳昌倬
On Tue, May 03, 2022 at 10:28:18AM +0200, Chesnay Schepler wrote:
> Is there any warning in the logs containing "Error while handling metric"?

No, we don't find any "Error while handling metric"


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-03 Thread Peter Schrott
Hi!

I also discovered problems with the PrometheusReporter on Flink 1.15.0,
coming from 1.14.4. I already consulted the mailing list:
https://lists.apache.org/thread/m8ohrfkrq1tqgq7lowr9p226z3yc0fgc
I have not found the underlying problem or a solution to it.

Actually, after re-checking, I see the same log WARNINGS as
ChangZhou described.

As I described, it seems to be an issue with my job. If no job, or an
example job runs on the taskmanager the basic metrics work just fine. Maybe
ChangZhou can confirm this?

@ChangZhou what's your job setup? I am running a streaming SQL job, but
also using data streams API to create the streaming environment and from
that the table environment and finally using a StatementSet to execute
multiple SQL statements in one job.

@Mason, naming the operators with `.name(.)` is not possible using the
table API.

@Chesnay, in my case there are no error logs.

Best & thanks,
Peter

On Tue, May 3, 2022 at 10:28 AM Chesnay Schepler  wrote:

> Is there any warning in the logs containing "Error while handling metric"?
>
> On 03/05/2022 10:18, ChangZhuo Chen (陳昌倬) wrote:
> > On Tue, May 03, 2022 at 01:00:42AM -0700, Mason Chen wrote:
> >> Hi ChangZhou,
> >>
> >> The warning log indicates that the metric was previously defined and so
> the
> >> runtime is handling the "duplicate" metric by ignoring it. This is
> >> typically a benign message unless you rely on this metric. Is it
> possible
> >> that you are using the same task name for different tasks? It would be
> >> defined by the `.name(...)` API in your job graph instantiation.
> >>
> >> Can you clarify what it means that your endpoint isn't working--some
> >> metrics missing, endpoint is timing out, etc.? Also, can you confirm
> from
> >> logs that the PrometheusReporter was created properly?
> > Endpoint isn't working means we got empty reply from Prometheus
> > endpoint. The following is our testing for taskmanager Prometheus
> > endpoint.
> >
> >  curl localhost:9249
> >  curl: (52) Empty reply from server
> >
> > We have the following log in taskmanager, so PrometheusReporter was
> > created properly.
> >
> >  2022-05-03 01:48:16,678 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: metrics.reporter.prom.class,
> org.apache.flink.metrics.prometheus.PrometheusReporter
> >  ...
> >  2022-05-03 01:48:23,665 INFO
> org.apache.flink.metrics.prometheus.PrometheusReporter   [] - Started
> PrometheusReporter HTTP server on port 9249.
> >  2022-05-03 01:48:23,669 INFO
> org.apache.flink.runtime.metrics.MetricRegistryImpl  [] - Reporting
> metrics for reporter prom of type
> org.apache.flink.metrics.prometheus.PrometheusReporter.
> >
> >
>
>


Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-03 Thread Chesnay Schepler

Is there any warning in the logs containing "Error while handling metric"?

On 03/05/2022 10:18, ChangZhuo Chen (陳昌倬) wrote:

On Tue, May 03, 2022 at 01:00:42AM -0700, Mason Chen wrote:

Hi ChangZhou,

The warning log indicates that the metric was previously defined and so the
runtime is handling the "duplicate" metric by ignoring it. This is
typically a benign message unless you rely on this metric. Is it possible
that you are using the same task name for different tasks? It would be
defined by the `.name(...)` API in your job graph instantiation.

Can you clarify what it means that your endpoint isn't working--some
metrics missing, endpoint is timing out, etc.? Also, can you confirm from
logs that the PrometheusReporter was created properly?

Endpoint isn't working means we got empty reply from Prometheus
endpoint. The following is our testing for taskmanager Prometheus
endpoint.

 curl localhost:9249
 curl: (52) Empty reply from server

We have the following log in taskmanager, so PrometheusReporter was
created properly.

 2022-05-03 01:48:16,678 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: metrics.reporter.prom.class, 
org.apache.flink.metrics.prometheus.PrometheusReporter
 ...
 2022-05-03 01:48:23,665 INFO  
org.apache.flink.metrics.prometheus.PrometheusReporter   [] - Started 
PrometheusReporter HTTP server on port 9249.
 2022-05-03 01:48:23,669 INFO  
org.apache.flink.runtime.metrics.MetricRegistryImpl  [] - Reporting 
metrics for reporter prom of type 
org.apache.flink.metrics.prometheus.PrometheusReporter.






Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-03 Thread 陳昌倬
On Tue, May 03, 2022 at 01:00:42AM -0700, Mason Chen wrote:
> Hi ChangZhou,
> 
> The warning log indicates that the metric was previously defined and so the
> runtime is handling the "duplicate" metric by ignoring it. This is
> typically a benign message unless you rely on this metric. Is it possible
> that you are using the same task name for different tasks? It would be
> defined by the `.name(...)` API in your job graph instantiation.
> 
> Can you clarify what it means that your endpoint isn't working--some
> metrics missing, endpoint is timing out, etc.? Also, can you confirm from
> logs that the PrometheusReporter was created properly?

Endpoint isn't working means we got empty reply from Prometheus
endpoint. The following is our testing for taskmanager Prometheus
endpoint.

curl localhost:9249
curl: (52) Empty reply from server

We have the following log in taskmanager, so PrometheusReporter was
created properly.

2022-05-03 01:48:16,678 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: metrics.reporter.prom.class, 
org.apache.flink.metrics.prometheus.PrometheusReporter
...
2022-05-03 01:48:23,665 INFO  
org.apache.flink.metrics.prometheus.PrometheusReporter   [] - Started 
PrometheusReporter HTTP server on port 9249.
2022-05-03 01:48:23,669 INFO  
org.apache.flink.runtime.metrics.MetricRegistryImpl  [] - Reporting 
metrics for reporter prom of type 
org.apache.flink.metrics.prometheus.PrometheusReporter.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: About job execution

2022-05-03 Thread Zhanghao Chen
Hi Jack,

I'm supposing you are actually referring to JobMaster by the term JobManager in 
your context (the JobManager referred in the doc is usually a collection of 
components that comprises the Dispatcher, ResourceManager, and a set of 
one-per-job JobMasters).

The jobmaster's lifecycle is bound to the job's lifecycle, so yes, after the 
job is finished, its jobmaster will be shutdown. Taskmanagers, on the other 
hand, are shared among jobs.

Hope that helps.

Best,
Zhanghao Chen

From: Ww J 
Sent: Monday, May 2, 2022 14:15
To: user 
Subject: About job execution

Hello,

I read some articles on internet about job execution of Flink and have some 
questions. When the dispatcher receives a job, the dispatcher will start a 
JobManager. After the job is finished, will the JobManager be shut down? For 
the TaskManager, after the job is finished, will the TaskManager be shut down? 
Or is the TaskManager shared among all the tasks?

Thanks.

Jack




Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-03 Thread Mason Chen
Hi ChangZhou,

The warning log indicates that the metric was previously defined and so the
runtime is handling the "duplicate" metric by ignoring it. This is
typically a benign message unless you rely on this metric. Is it possible
that you are using the same task name for different tasks? It would be
defined by the `.name(...)` API in your job graph instantiation.

Can you clarify what it means that your endpoint isn't working--some
metrics missing, endpoint is timing out, etc.? Also, can you confirm from
logs that the PrometheusReporter was created properly?

Best,
Mason

On Mon, May 2, 2022 at 7:25 PM ChangZhuo Chen (陳昌倬) 
wrote:

> Hi,
>
> We found that taskmanager Prometheus endpoint does not work after
> upgrading from 1.14.3 to 1.15.0. Jobmanager Prometheus endpoint is okay
> in 1.15.0, so we think the problem is not in image we used. Any idea how
> to fix this problem?
>
>
> Also, we found the following log in taskmanager, but not jobmanager. Not
> sure if they are related to this issue.
>
> 2022-05-03 01:48:32,839 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric
> with the name 'numBytesInLocal'. Metric will not be
> reported.[10.210.47.134, taskmanager, , , ,
> 8, Shuffle, Netty, Input]
> 2022-05-03 01:48:32,839 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric
> with the name 'numBytesInLocalPerSecond'. Metric will not be
> reported.[10.210.47.134, taskmanager, , , ,
> 8, Shuffle, Netty, Input]
> ...
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>


Re: How should I call external HTTP services with PyFlink?

2022-05-03 Thread Dhavan Vaidya
Hey Francis!

Thanks for the insights! I am thinking of using Java / Scala for this
scenario given your input. Introducing a new language to the team, however,
is going to be a big ask :-D

Another option that you mentioned is pushing enrichment data instead of
pulling. That would be excellent, I will try to model the pipes and see if
that works.

Thanks again!

On Tue, 3 May 2022 at 05:53, Francis Conroy 
wrote:

> Hi Dhavan,
>
> We have looked at using pyflink for data stream enrichment and found the
> performance lacking compared to the java counterpart. One option for you
> might be to use statefun for the enrichment stages. We've also changed our
> model for enrichment, we're pushing the enrichment data into the pipeline
> instead of pulling it, but this won't work in a lot of situations.
>
> Hope that gives you some ideas.
>
> On Mon, 2 May 2022 at 22:54, Dhavan Vaidya 
> wrote:
>
>> Hello!
>>
>> I want to make HTTP(S) calls to enrich data streams. The HTTP services
>> are running on our VPC, so the delay is limited, but sometimes these
>> services end up calling third party APIs, and latencies become high.
>>
>> From documentation (
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/)
>> it seems PyFlink does not support "asyncio operator" like Java does (
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/).
>> Am I missing something? How should this be approached?
>>
>> Thanks!
>>
>> --
>> Dhavan
>>
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>