Re: How to debug Metaspace exception?

2022-03-29 Thread 胡伟华
Hi, John

Could you tell us you application scenario? Is it a flink session cluster with 
a lot of jobs?

Maybe you can try to dump the memory with jmap and use tools such as MAT to 
analyze whether there are abnormal classes and classloaders


> 2022年3月30日 上午6:09,John Smith  写道:
> 
> Hi running 1.14.4
> 
> My tasks manager still fails with java.lang.OutOfMemoryError: Metaspace. The 
> metaspace out-of-memory error has occurred. This can mean two things: either 
> the job requires a larger size of JVM metaspace to load classes or there is a 
> class loading leak.
> 
> I have 2GB of metaspace configed taskmanager.memory.jvm-metaspace.size: 2048m
> 
> But the task nodes still fail.
> 
> When looking at the UI metrics, the metaspace starts low. Now I see 85% 
> usage. It seems to be a class loading leak at this point, how can we debug 
> this issue?



Re: Pyflink elastic search connectors

2022-03-29 Thread Sandeep Sharat
Hi,

Thank you for the quick responses. We are using the datastream api for
pyflink. We are trying to implement a wrapper in python for the same as we
speak. Hopefully it will work out. 😊

On Wed, 30 Mar, 2022, 8:02 am Xingbo Huang,  wrote:

> Hi,
>
> Are you using datastream api or table api?If you are using the table api,
> you can use the connector by executing sql[1]. If you are using the
> datastream api, there is really no es connector api provided, you need to
> write python wrapper code, but the wrapper code is very simple. The
> underlying code takes use of py4j to call the java api of es connector. For
> details, you can refer to the wrapper code in kafka or pulsar[2].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/
> [2]
> https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py
>
> Best,
> Xingbo
>
> Sandeep Sharat  于2022年3月29日周二 20:51写道:
>
>> Hello Everyone,
>>
>> I have been working on a streaming application using elasticsearch as the
>> sink. I had achieved it using the java api quite easily. But due to a
>> recent policy change we are moving towards the python api for flink,
>> however we were unable to find any python elastic search connectors for
>> flink. We were able to find support for the kafka connectors in python.
>> Does it mean that we have to write our own connectors in python  to
>> make use of the flink-elasticsearch connector jar?
>>
>> Thanks in advance
>> --
>> Thanks & Regards
>> Sandeep Sharat Kumar
>>
>


Re: Pyflink elastic search connectors

2022-03-29 Thread Xingbo Huang
Hi,

Are you using datastream api or table api?If you are using the table api,
you can use the connector by executing sql[1]. If you are using the
datastream api, there is really no es connector api provided, you need to
write python wrapper code, but the wrapper code is very simple. The
underlying code takes use of py4j to call the java api of es connector. For
details, you can refer to the wrapper code in kafka or pulsar[2].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/
[2]
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py

Best,
Xingbo

Sandeep Sharat  于2022年3月29日周二 20:51写道:

> Hello Everyone,
>
> I have been working on a streaming application using elasticsearch as the
> sink. I had achieved it using the java api quite easily. But due to a
> recent policy change we are moving towards the python api for flink,
> however we were unable to find any python elastic search connectors for
> flink. We were able to find support for the kafka connectors in python.
> Does it mean that we have to write our own connectors in python  to
> make use of the flink-elasticsearch connector jar?
>
> Thanks in advance
> --
> Thanks & Regards
> Sandeep Sharat Kumar
>


How to debug Metaspace exception?

2022-03-29 Thread John Smith
Hi running 1.14.4

My tasks manager still fails with java.lang.OutOfMemoryError: Metaspace.
The metaspace out-of-memory error has occurred. This can mean two things:
either the job requires a larger size of JVM metaspace to load classes or
there is a class loading leak.

I have 2GB of metaspace configed taskmanager.memory.jvm-metaspace.size:
2048m

But the task nodes still fail.

When looking at the UI metrics, the metaspace starts low. Now I see 85%
usage. It seems to be a class loading leak at this point, how can we debug
this issue?


Re: Exactly-once sink sync checkpoint stacking time effect

2022-03-29 Thread Filip Karnicki
Thank you very much for your answer.

I was able to reduce the number of sinks as you described. That helped a
lot, thank you.

I think you must be right with regards to (2) - opening a new transaction
being the culprit. It's unlikely to be (1) since this behaviour occurs even
when there are 0 messages going through a brand new, locally running kafka
cluster.

Kind regards,
Fil

On Tue, 29 Mar 2022 at 09:34, Arvid Heise  wrote:

> Hi Filip,
>
> two things will impact sync time for Kafka:
> 1. Flushing all old data [1], in particular flushing all in-flight
> partitions [2]. However, that shouldn't cause a stacking effect except when
> the brokers are overloaded on checkpoint.
> 2. Opening a new transaction [3]. Since all transactions are linearized on
> the Kafka brokers, this is the most likely root cause. Note that aborted
> checkpoints may require multiple transactions to be opened. So you could
> check if you have them quite often aborted.
>
> If you want to know more, I suggest you attach a profiler and find the
> specific culprit and report back [4]. There is a low probability that the
> sink framework has a bug that causes this behavior. In that case, we can
> fix it more easily than if it's a fundamental issue with Kafka. In general,
> exactly-once and low latency are somewhat contradicting requirements, so
> there is only so much you can do.
>
> Not knowing your topology but maybe you can reduce the number of sinks?
> With the KafkaRecordSerializationSchema you can set different topics for
> different ProducerRecords of the same DataStream.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L190-L190
> [2]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java#L177-L183
> [3]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L302-L321
> [4]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/debugging/application_profiling/
>
>
> On Sat, Mar 26, 2022 at 2:11 PM Filip Karnicki 
> wrote:
>
>> Hi, I noticed that with each added (kafka) sink with exactly-once
>> guarantees, there looks to be a penalty of ~100ms in terms of sync
>> checkpointing time.
>>
>> Would anyone be able to explain and/or point me in the right direction in
>> the source code so that I could understand why that is? Specifically, why
>> there appears to be a 100ms added for _each_ sink, and not a flat 100ms for
>> all sinks, potentially pointing to a sequential set of IO calls (wld
>> guess)
>>
>> I would be keen to understand if there's anything I could do (incl.
>> contributing code) that would parallelise this penalty in terms of sync
>> checkpointing time.
>>
>> Alternatively, is there any setting that would help me bring the sync
>> checkpointing time down (and still get exactly-once guarantees)?
>>
>> Many thanks,
>> Fil
>>
>


Re: Parallel processing in a 2 node cluster apparently not working

2022-03-29 Thread David Anderson
In this situation, changing your configuration [1] to include

cluster.evenly-spread-out-slots: true

should change the scheduling behavior to what you are looking for.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#cluster-evenly-spread-out-slots

Regards,
David

On Tue, Mar 29, 2022 at 1:30 PM HG  wrote:

>
> Hi,
>
> I have a 2 node cluster just for testing.
> When I start the cluster and the job I see that the parallelism is 2 as
> expected.
> But only they are both on the same node.
> When I stop the taskmanager on that node it switches to the other one.
> But I expected both nodes to have a subtask.
>
> See below.
>
> Any clues?
>
> Regards Hans-Peter
>
> [image: image.png]
> [image: image.png]
>


Re: SQL Client Kafka (UPSERT?) Sink for confluent-avro

2022-03-29 Thread Georg Heiler
I got it working now: It needs to be specified both for the key and value



thanks

Am Mo., 28. März 2022 um 13:33 Uhr schrieb Ingo Bürk :

> Hi Georg,
>
> which Flink version are you using? The missing property is for the
> avro-confluent format, and if I recall correctly, how these are passed
> has changed in recent versions, so it'd be good to double check you are
> using the documentation for the version you are running on.
>
>
> Best
> Ingo
>
> On 24.03.22 11:57, Georg Heiler wrote:
> > Hi,
> >
> > how can I get Flinks SQL client to nicely sink some data to either the
> > regular kafka or the kafka-upsert connector?
> >
> > I have a table/ topic with dummy data:
> > CREATE TABLE metrics_brand_stream (
> >  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
> >  WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE,
> >`partition` BIGINT METADATA VIRTUAL,
> >`offset` BIGINT METADATA VIRTUAL,
> >  brand string,
> >  duration int,
> >  rating int
> >
> > ) WITH (
> >  'connector' = 'kafka',
> >  'topic' = 'commercials_avro',
> >  'scan.startup.mode' = 'earliest-offset',
> >  'format' = 'avro-confluent',
> >  'avro-confluent.schema-registry.url' = 'http://localhost:8081/
> > ',
> >  'properties.group.id ' =
> 'flink-test-001',
> >  'properties.bootstrap.servers' = 'localhost:9092'
> > );
> >
> > And the following aggregation:
> >
> > SELECT brand,
> >   COUNT(*) AS cnt,
> >   AVG(duration) AS  duration_mean,
> >   AVG(rating) AS rating_mean
> >FROM metrics_brand_stream
> >GROUP BY brand;
> >
> > When trying to define an output table:
> >
> > CREATE TABLE metrics_per_brand (
> >  brand string,
> >  cnt BIGINT,
> >  duration_mean DOUBLE,
> >  rating_mean DOUBLE
> >
> > ) WITH (
> >  'connector' = 'upsert-kafka',
> >  'topic' = 'metrics_per_brand',
> >  'avro-confluent.schema-registry.url' = 'http://localhost:8081/
> > ',
> >  'properties.group.id ' =
> 'flink-test-001',
> >  'properties.bootstrap.servers' = 'localhost:9092',
> >  'key.format' = 'avro-confluent',
> >  'value.format' = 'avro-confluent'
> > );
> >
> > And trying to INSERT some result data:
> >
> > INSERT INTO metrics_per_brand
> >SELECT brand,
> >   COUNT(*) AS cnt,
> >   AVG(duration) AS  duration_mean,
> >   AVG(rating) AS rating_mean
> >FROM metrics_brand_stream
> >GROUP BY brand;
> >
> > The query fails with:
> >
> > org.apache.flink.table.api.ValidationException: One or more required
> > options are missing.
> >
> > Missing required options are:
> >
> > url
> >
> > But neither:
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/
> > <
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/>
>
> > nor
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/
> > <
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/>
>
> > nor
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
> > <
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/>
>
> > seems to list the right configuration (or I am misreading the
> > documentation).
> >
> >
> > How can I sink data to kafka after some arbitrary computation using the
> > flink-sql client using either the kafka or upsert-kafka connector where
> > the input is AVRO with a schema from the confluent schema registry and
> > the output should store its schema there as well (and serialize using
> AVRO).
> >
> >
> > Best,
> > Georg
>


Re: flink docker image (1.14.4) unable to access other pods from flink program (job and task manager access is fine)

2022-03-29 Thread 胡伟华
Are you referring to creating Flink cluster on Kubernetes by yaml file?

How did you submit the job to Flink cluster? Not via the command line (flink 
run xxx)?

> 2022年3月29日 下午10:38,Jin Yi  写道:
> 
> no they are not.  b/c we are using k8s, we use kubectl apply commands with a 
> yaml file to specify the startup.
> 
> On Tue, Mar 29, 2022 at 7:37 AM 胡伟华  > wrote:
> I see, can you provide the startup command for 1.12.3 and 1.14.4?
> Are these startup commands running on the same node?
> 
>> 2022年3月29日 下午10:32,Jin Yi mailto:j...@promoted.ai>> 写道:
>> 
>> it's running in k8s.  we're not running in app mode b/c we have many jobs 
>> running in the same flink cluster.
>> 
>> On Tue, Mar 29, 2022 at 4:29 AM huweihua > > wrote:
>> Hi, Jin
>> 
>> Can you provide more information about Flink cluster deployment modes? Is it 
>> running in Kubernetes/YARN or standalone mode?
>> Maybe you can use application mode to keeps the environment (network 
>> accessibility) always keep same. Application mode will run the user-main 
>> method in the JobManager,
>> 
>> [1]https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#application-mode
>>  
>> 
>> 
>> 
>> 
>> Jin Yi mailto:j...@promoted.ai>> 于2022年3月29日周二 11:23写道:
>> i have a flink job that uses redis as a sink.  i optionally do some wiping 
>> and metadata writing from the job submitting flink program before it 
>> actually executes/submits the job to the job manager.  when i don't do this 
>> redis preparation, the redis sink works completely fine.  that is, the redis 
>> commands work fine from the taskmanager tasks.  however, if i enable the 
>> option redis preparation from within the flink job program, it fails to 
>> contact redis and hits a timeout exception.
>> 
>> previously, we were using 1.12.3, and this behavior worked fine.  is the 
>> 1.14.4 flink docker image pretty restrictive when it comes to network access 
>> at the job submission client layer?
>> 
>> thanks.
> 



Re: flink docker image (1.14.4) unable to access other pods from flink program (job and task manager access is fine)

2022-03-29 Thread Jin Yi
no they are not.  b/c we are using k8s, we use kubectl apply commands with
a yaml file to specify the startup.

On Tue, Mar 29, 2022 at 7:37 AM 胡伟华  wrote:

> I see, can you provide the startup command for 1.12.3 and 1.14.4?
> Are these startup commands running on the same node?
>
> 2022年3月29日 下午10:32,Jin Yi  写道:
>
> it's running in k8s.  we're not running in app mode b/c we have many jobs
> running in the same flink cluster.
>
> On Tue, Mar 29, 2022 at 4:29 AM huweihua  wrote:
>
>> Hi, Jin
>>
>> Can you provide more information about Flink cluster deployment modes? Is
>> it running in Kubernetes/YARN or standalone mode?
>> Maybe you can use application mode to keeps the environment (network
>> accessibility) always keep same. Application mode will run the user-main
>> method in the JobManager,
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#application-mode
>>
>>
>>
>> Jin Yi  于2022年3月29日周二 11:23写道:
>>
>>> i have a flink job that uses redis as a sink.  i optionally do some
>>> wiping and metadata writing from the job submitting flink program before it
>>> actually executes/submits the job to the job manager.  when i don't do this
>>> redis preparation, the redis sink works completely fine.  that is, the
>>> redis commands work fine from the taskmanager tasks.  however, if i enable
>>> the option redis preparation from within the flink job program, it fails to
>>> contact redis and hits a timeout exception.
>>>
>>> previously, we were using 1.12.3, and this behavior worked fine.  is the
>>> 1.14.4 flink docker image pretty restrictive when it comes to network
>>> access at the job submission client layer?
>>>
>>> thanks.
>>>
>>
>


Re: flink docker image (1.14.4) unable to access other pods from flink program (job and task manager access is fine)

2022-03-29 Thread 胡伟华
I see, can you provide the startup command for 1.12.3 and 1.14.4?
Are these startup commands running on the same node?

> 2022年3月29日 下午10:32,Jin Yi  写道:
> 
> it's running in k8s.  we're not running in app mode b/c we have many jobs 
> running in the same flink cluster.
> 
> On Tue, Mar 29, 2022 at 4:29 AM huweihua  > wrote:
> Hi, Jin
> 
> Can you provide more information about Flink cluster deployment modes? Is it 
> running in Kubernetes/YARN or standalone mode?
> Maybe you can use application mode to keeps the environment (network 
> accessibility) always keep same. Application mode will run the user-main 
> method in the JobManager,
> 
> [1]https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#application-mode
>  
> 
> 
> 
> 
> Jin Yi mailto:j...@promoted.ai>> 于2022年3月29日周二 11:23写道:
> i have a flink job that uses redis as a sink.  i optionally do some wiping 
> and metadata writing from the job submitting flink program before it actually 
> executes/submits the job to the job manager.  when i don't do this redis 
> preparation, the redis sink works completely fine.  that is, the redis 
> commands work fine from the taskmanager tasks.  however, if i enable the 
> option redis preparation from within the flink job program, it fails to 
> contact redis and hits a timeout exception.
> 
> previously, we were using 1.12.3, and this behavior worked fine.  is the 
> 1.14.4 flink docker image pretty restrictive when it comes to network access 
> at the job submission client layer?
> 
> thanks.



Re: flink docker image (1.14.4) unable to access other pods from flink program (job and task manager access is fine)

2022-03-29 Thread Jin Yi
it's running in k8s.  we're not running in app mode b/c we have many jobs
running in the same flink cluster.

On Tue, Mar 29, 2022 at 4:29 AM huweihua  wrote:

> Hi, Jin
>
> Can you provide more information about Flink cluster deployment modes? Is
> it running in Kubernetes/YARN or standalone mode?
> Maybe you can use application mode to keeps the environment (network
> accessibility) always keep same. Application mode will run the user-main
> method in the JobManager,
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#application-mode
>
>
>
> Jin Yi  于2022年3月29日周二 11:23写道:
>
>> i have a flink job that uses redis as a sink.  i optionally do some
>> wiping and metadata writing from the job submitting flink program before it
>> actually executes/submits the job to the job manager.  when i don't do this
>> redis preparation, the redis sink works completely fine.  that is, the
>> redis commands work fine from the taskmanager tasks.  however, if i enable
>> the option redis preparation from within the flink job program, it fails to
>> contact redis and hits a timeout exception.
>>
>> previously, we were using 1.12.3, and this behavior worked fine.  is the
>> 1.14.4 flink docker image pretty restrictive when it comes to network
>> access at the job submission client layer?
>>
>> thanks.
>>
>


Re: Watermarks event time vs processing time

2022-03-29 Thread HG
Hello Matthias,

I am still using ProcessingTimeSessionWindow.
But it turns out I was wrong.
I tested a couple of times and it did not seem to work.
But now it does with both watermarkstrategies removed.

My apologies.'
Regards Hans-Peter

This is the code:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setMaxParallelism(Integer.parseInt(envMaxParallelism));
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(Integer.parseInt(envEnableCheckpointing));


Properties kafkaProps  = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
inputBrokers);
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
inputGroupId);
kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
autoCommit);

kafkaProps.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
autoCommitInterval);
kafkaProps.setProperty("ssl.truststore.type", inputTrustStoreType);
kafkaProps.setProperty("ssl.truststore.password",
inputTrustStorePassword);
kafkaProps.setProperty("ssl.truststore.location",
inputTrustStoreLocation);
kafkaProps.setProperty("security.protocol", inputSecurityProtocol);
kafkaProps.setProperty("ssl.enabled.protocols",
inputSslEnabledProtocols);

KafkaSource source = KafkaSource.builder()
.setProperties(kafkaProps)
.setGroupId(inputGroupId)
.setClientIdPrefix(clientId)
.setTopics(kafkaInputTopic)
.setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();


/* Use the watermark stragegy to create a datastream */
DataStream ds = env.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka Source");

/* Split the ObjectNode into a Tuple4 */
DataStream> tuple4ds =
ds.flatMap(new Splitter())

DataStream  tuple4DsWmKeyedbytr =  tuple4ds
.keyBy(new KeySelector,
String>() {
@Override
public String getKey(Tuple4
value) throws Exception {
return value.f2;
}
})

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap

.allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
.process(new MyProcessWindowFunction());



Properties sinkkafkaProps  = new Properties();
sinkkafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
outputBrokers);
sinkkafkaProps.setProperty("ssl.truststore.type",
outputTrustStoreType);
sinkkafkaProps.setProperty("ssl.truststore.location",
outputTrustStoreLocation);
sinkkafkaProps.setProperty("ssl.truststore.password",
outputTrustStorePassword);
sinkkafkaProps.setProperty("security.protocol",
outputSecurityProtocol);
sinkkafkaProps.setProperty("max.request.size", maxRequestSize);
sinkkafkaProps.setProperty("ssl.enabled.protocols",
outputSslEnabledProtocols);


KafkaSink kSink = KafkaSink.builder()
.setBootstrapServers(outputBrokers)
.setKafkaProducerConfig(sinkkafkaProps)

.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(kafkaOutputTopic)
.setValueSerializationSchema(new
SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();

// Sink to the Kafka topic
tuple4DsWmKeyedbytr.sinkTo(kSink);


// Splits the Object node into a Tuple4
private static class Splitter implements FlatMapFunction> {
@Override
public void flatMap(ObjectNode json, Collector> out) throws Exception {
// retrieved handling_time twice intentionally one of which
will be used for the watermark strategy and the other for the calculation
of the elapsed time
out.collect(new Tuple4(json.get("value").get("handling_time").asLong(),
json.get("value").get("handling_time").asLong(),
json.get("value").get("transaction_id").asText(),
 json.get("value").get("original_event").toPrettyString()));
}
}

// Class to sort the events that belong to the same transactions
public static class SortEventsHandlingTime implements
Comparator> {

// Let's compare 2 Tuple4 objects
public int compare(Tuple4 o1,
Tuple4 o2) {
int result =
Long.compare(Long.parseLong(o1.getField(0).toString()),
Long.parseLong(o2.getField(0).toString()));
if (result > 0) {
return 1;
} else

RE: Watermarks event time vs processing time

2022-03-29 Thread Schwalbe Matthias
Hello Hans-Peter,

I’m a little confused which version of your code you are testing against:

  *   ProcessingTimeSessionWindows or EventTimeSessionWindows?
  *   did you keep the withIdleness() ??

As said before:

  *   for ProcessingTimeSessionWindows, watermarks play no role
  *   if you keep withIdleness(), then the respective sparse DataStream is 
event-time-less most of the time, i.e. no triggers fire to close a session 
window
  *   withIdleness() makes only sense if you merge/union/connect multiple 
DataStream where at least one stream has their watermarks updated regularly 
(i.e. it is not withIdleness())
 *   this is not your case, your DAG is linear, no union nor connects
  *   in event-time mode processing time plays no role, watermarks exclusively 
take the role of the progress of model (event) time and hence the triggering of 
windows
  *   in order to trigger a (session-)window at time A the window operator 
needs to receive a watermark of at least time A
  *   next catch regards partitioning
 *   your first watermark strategy kafkaWmstrategy generates 
per-Kafka-partition watermarks
 *   a keyBy() reshuffles these partitions onto the number of subtasks 
according to the hash of the key
 *   this results in a per subtask calculation of the lowest watermark of 
all Kafka partitions that happen to be processed by that subtask
 *   i.e. if a single Kafka partition makes no watermark progress the 
subtask watermark makes no progress
 *   this surfaces in sparse data as in your case
  *   your second watermark strategy wmStrategy makes things worse because
 *   it discards the correct watermarks of the first watermark strategy
 *   and replaces it with something that is arbitrary (at this point it is 
hard to guess the correct max lateness that is a mixture of the events from 
multiple Kafka partitions)

Concusion:
The only way to make the event time session windows work for you in a timely 
manner is to make sure watermarks on all involved partitions make progress, 
i.e. new events arrive on all partitions in a regular manner.

Hope this helps

Thias


From: HG 
Sent: Tuesday, March 29, 2022 1:07 PM
To: Schwalbe Matthias 
Cc: user 
Subject: Re: Watermarks event time vs processing time

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hello Matthias,

When I remove all the watermark strategies it does not process anything .
For example when I use WatermarkStrategy.noWatermarks() instead of the one I 
build nothing seems to happen at all.

 Also when I skip the part where I add wmStrategy  to create tuple4dswm:
 DataStream> tuple4dswm = 
tuple4ds.assignTimestampsAndWatermarks(wmStrategy);

Nothing is processed.

Regards Hans-Peter

Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>>:
Hi Hanspeter,

Let me relate some hints that might help you getting concepts clearer.

From your description I make following assumptions where your are not specific 
enough (please confirm or correct in your answer):

a.   You store incoming events in state per transaction_id to be 
sorted/aggregated(min/max time) by event time later on

b.   So far you used a session window to determine the point in time when 
to emit the stored/enriched/sorted events

c.Watermarks are generated with bounded out of orderness

d.   You use session windows with a specific gap

e.   In your experiment you ever only send 1000 events and then stop 
producing incoming events

Now to your questions:

  *   For processing time session windows, watermarks play no role whatsoever, 
you simply assume that you’ve seen all events belonging so a single transaction 
id if the last such event for a specific transaction id was processed 
sessionWindowGap milliseconds ago
  *   Therefore you see all enriched incoming events the latest 
sessionWindowGap ms after the last incoming event (+ some latency)
  *   In event time mode and resp event time session windows the situation is 
exactly the same, only that processing time play no role
  *   A watermark means (ideally) that no event older than the watermark time 
ever follows the watermark (which itself is a meta-event that flows with the 
proper events on the same channels)
  *   In order for a session gap window to forward the enriched events the 
window operator needs to receive a watermark that is sessionWindowGap 
milliseconds beyond the latest incoming event (in terms of the respective event 
time)
  *   The watermark generator in order to generate a new watermark that 
triggers this last session window above needs to encounter an (any) event that 
has a timestamp of ( + outOfOrderness + 
sessionWindowGap + 1ms)
  *   Remember, the watermark generator never generated watermarks based on 
processing time, but only based on the timestamps it has seen in events 
actually encountered
  *   Coming back to your idleness configuration: it only means that the 
incoming stream becomes idle == timeless 

Pyflink elastic search connectors

2022-03-29 Thread Sandeep Sharat
Hello Everyone,

I have been working on a streaming application using elasticsearch as the
sink. I had achieved it using the java api quite easily. But due to a
recent policy change we are moving towards the python api for flink,
however we were unable to find any python elastic search connectors for
flink. We were able to find support for the kafka connectors in python.
Does it mean that we have to write our own connectors in python  to
make use of the flink-elasticsearch connector jar?

Thanks in advance
-- 
Thanks & Regards
Sandeep Sharat Kumar


Parallel processing in a 2 node cluster apparently not working

2022-03-29 Thread HG
Hi,

I have a 2 node cluster just for testing.
When I start the cluster and the job I see that the parallelism is 2 as
expected.
But only they are both on the same node.
When I stop the taskmanager on that node it switches to the other one.
But I expected both nodes to have a subtask.

See below.

Any clues?

Regards Hans-Peter

[image: image.png]
[image: image.png]


Re: flink docker image (1.14.4) unable to access other pods from flink program (job and task manager access is fine)

2022-03-29 Thread huweihua
Hi, Jin

Can you provide more information about Flink cluster deployment modes? Is
it running in Kubernetes/YARN or standalone mode?
Maybe you can use application mode to keeps the environment (network
accessibility) always keep same. Application mode will run the user-main
method in the JobManager,

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#application-mode



Jin Yi  于2022年3月29日周二 11:23写道:

> i have a flink job that uses redis as a sink.  i optionally do some wiping
> and metadata writing from the job submitting flink program before it
> actually executes/submits the job to the job manager.  when i don't do this
> redis preparation, the redis sink works completely fine.  that is, the
> redis commands work fine from the taskmanager tasks.  however, if i enable
> the option redis preparation from within the flink job program, it fails to
> contact redis and hits a timeout exception.
>
> previously, we were using 1.12.3, and this behavior worked fine.  is the
> 1.14.4 flink docker image pretty restrictive when it comes to network
> access at the job submission client layer?
>
> thanks.
>


Re: Watermarks event time vs processing time

2022-03-29 Thread HG
Hello Matthias,

When I remove all the watermark strategies it does not process anything .
For example when I use WatermarkStrategy.noWatermarks() instead of the one
I build nothing seems to happen at all.

 Also when I skip the part where I add wmStrategy  to create tuple4dswm:
 DataStream> tuple4dswm =
tuple4ds.assignTimestampsAndWatermarks(wmStrategy);

Nothing is processed.

Regards Hans-Peter

Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Hi Hanspeter,
>
>
>
> Let me relate some hints that might help you getting concepts clearer.
>
>
>
> From your description I make following assumptions where your are not
> specific enough (please confirm or correct in your answer):
>
>1. You store incoming events in state per transaction_id to be
>sorted/aggregated(min/max time) by event time later on
>2. So far you used a session window to determine the point in time
>when to emit the stored/enriched/sorted events
>3. Watermarks are generated with bounded out of orderness
>4. You use session windows with a specific gap
>5. In your experiment you ever only send 1000 events and then stop
>producing incoming events
>
>
>
> Now to your questions:
>
>- For processing time session windows, watermarks play no role
>whatsoever, you simply assume that you’ve seen all events belonging so a
>single transaction id if the last such event for a specific transaction id
>was processed sessionWindowGap milliseconds ago
>- Therefore you see all enriched incoming events the latest
>sessionWindowGap ms after the last incoming event (+ some latency)
>- In event time mode and resp event time session windows the situation
>is exactly the same, only that processing time play no role
>- A watermark means (ideally) that no event older than the watermark
>time ever follows the watermark (which itself is a meta-event that flows
>with the proper events on the same channels)
>- In order for a session gap window to forward the enriched events the
>window operator needs to receive a watermark that is sessionWindowGap
>milliseconds beyond the latest incoming event (in terms of the respective
>event time)
>- The watermark generator in order to generate a new watermark that
>triggers this last session window above needs to encounter an (any) event
>that has a timestamp of ( + outOfOrderness
>+ sessionWindowGap + 1ms)
>- Remember, the watermark generator never generated watermarks based
>on processing time, but only based on the timestamps it has seen in events
>actually encountered
>- Coming back to your idleness configuration: it only means that the
>incoming stream becomes idle == timeless after a while … i.e. watermarks
>won’t make progress from this steam, and it tells all downstream operators
>- Idleness specification is only useful if a respective operator has
>another source of valid watermarks (i.e. after a union of two streams, one
>active/one idle ….). this is not your case
>
>
>
> I hope this clarifies your situation.
>
>
>
> Cheers
>
>
>
>
>
> Thias
>
>
>
>
>
> *From:* HG 
> *Sent:* Mittwoch, 16. März 2022 10:06
> *To:* user 
> *Subject:* Watermarks event time vs processing time
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi,
>
>
>
> I read from a Kafka topic events that are in JSON format
>
> These event contain a handling time (aka event time) in epoch
> milliseconds, a transaction_id and a large nested JSON structure.
>
> I need to group the events by transaction_id, order them by handling time
> and calculate the differences in handling time.
>
> The events are updated with this calculated elapsed time and pushed
> further.
>
> So all events that go in should come out with the elapsed time added.
>
>
>
> For testing I use events that are old (so handling time is not nearly the
> wall clock time)
>
> Initially I used EventTimeSessionWindows but somehow the processing did
> not run as expected.
>
> When I pushed 1000 events eventually 800 or so would appear at the output.
> This was resolved by switching to ProcessingTimeSessionWindows .
> My thought was then that I could remove the watermarkstrategies with
> watermarks with timestamp assigners (handling time) for the Kafka input
> stream and the data stream.
>
> However this was not the case.
>
>
>
> Can anyone enlighten me as to why the watermark strategies are still
> needed?
>
>
>
> Below the code
>
>
>
> KafkaSource source = KafkaSource.builder()
> .setProperties(kafkaProps)
> .setProperty("ssl.truststore.type", trustStoreType)
> .setProperty("ssl.truststore.password", trustStorePassword)
> .setProperty("ssl.truststore.location", trustStoreLocation)
> .setProperty("security.protocol", securityProtocol)
> .setProperty("partition.discovery.interval.ms",
> partitionDis

Re: Exactly-once sink sync checkpoint stacking time effect

2022-03-29 Thread Arvid Heise
Hi Filip,

two things will impact sync time for Kafka:
1. Flushing all old data [1], in particular flushing all in-flight
partitions [2]. However, that shouldn't cause a stacking effect except when
the brokers are overloaded on checkpoint.
2. Opening a new transaction [3]. Since all transactions are linearized on
the Kafka brokers, this is the most likely root cause. Note that aborted
checkpoints may require multiple transactions to be opened. So you could
check if you have them quite often aborted.

If you want to know more, I suggest you attach a profiler and find the
specific culprit and report back [4]. There is a low probability that the
sink framework has a bug that causes this behavior. In that case, we can
fix it more easily than if it's a fundamental issue with Kafka. In general,
exactly-once and low latency are somewhat contradicting requirements, so
there is only so much you can do.

Not knowing your topology but maybe you can reduce the number of sinks?
With the KafkaRecordSerializationSchema you can set different topics for
different ProducerRecords of the same DataStream.

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L190-L190
[2]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java#L177-L183
[3]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L302-L321
[4]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/debugging/application_profiling/


On Sat, Mar 26, 2022 at 2:11 PM Filip Karnicki 
wrote:

> Hi, I noticed that with each added (kafka) sink with exactly-once
> guarantees, there looks to be a penalty of ~100ms in terms of sync
> checkpointing time.
>
> Would anyone be able to explain and/or point me in the right direction in
> the source code so that I could understand why that is? Specifically, why
> there appears to be a 100ms added for _each_ sink, and not a flat 100ms for
> all sinks, potentially pointing to a sequential set of IO calls (wld
> guess)
>
> I would be keen to understand if there's anything I could do (incl.
> contributing code) that would parallelise this penalty in terms of sync
> checkpointing time.
>
> Alternatively, is there any setting that would help me bring the sync
> checkpointing time down (and still get exactly-once guarantees)?
>
> Many thanks,
> Fil
>


Re: Wrong format when passing arguments with space

2022-03-29 Thread David Morávek
cc Kevin

On Tue, Mar 29, 2022 at 9:15 AM David Morávek  wrote:

> Hi Kevin,
>
> -dev@f.a.o +user@f.a.o
>
> Thanks for the report! I've run some experiments and unfortunately I'm not
> able to reproduce the behavior you're describing. The bash "$@" expansion
> seems to work as expected (always receiving correctly expanded unquoted
> strings in the main class). Can you maybe elaborate about your environment
> and submit a minimal reproducer?
>
> Best,
> D.
>
> On Mon, Mar 28, 2022 at 6:00 AM Kevin Lee 
> wrote:
>
>> Flink version : 1.13
>>
>> Bug:
>> When I pass an argument with space by single quota.
>> The main function get this argument with a double quota
>>
>> example:
>> ./bin/flink run -c com.lmk.QuotaTest --rate 10 --time ''2022-03-28
>> 11:53:21"
>>
>> The main function get parameters:
>>
>> 1-rate
>> 2---10
>> 3-time
>> 4---"2022-03-28 11:53:21"
>>
>>
>> I think flink shell should remove the double quota in "2022-03-28
>> 11:53:21"
>>
>>
>> Hope to get your reply asap
>>
>


Re: Wrong format when passing arguments with space

2022-03-29 Thread David Morávek
Hi Kevin,

-dev@f.a.o +user@f.a.o

Thanks for the report! I've run some experiments and unfortunately I'm not
able to reproduce the behavior you're describing. The bash "$@" expansion
seems to work as expected (always receiving correctly expanded unquoted
strings in the main class). Can you maybe elaborate about your environment
and submit a minimal reproducer?

Best,
D.

On Mon, Mar 28, 2022 at 6:00 AM Kevin Lee  wrote:

> Flink version : 1.13
>
> Bug:
> When I pass an argument with space by single quota.
> The main function get this argument with a double quota
>
> example:
> ./bin/flink run -c com.lmk.QuotaTest --rate 10 --time ''2022-03-28
> 11:53:21"
>
> The main function get parameters:
>
> 1-rate
> 2---10
> 3-time
> 4---"2022-03-28 11:53:21"
>
>
> I think flink shell should remove the double quota in "2022-03-28 11:53:21"
>
>
> Hope to get your reply asap
>