Re: Exactly-once sink sync checkpoint stacking time effect

2022-03-29 Thread Filip Karnicki
Thank you very much for your answer.

I was able to reduce the number of sinks as you described. That helped a
lot, thank you.

I think you must be right with regards to (2) - opening a new transaction
being the culprit. It's unlikely to be (1) since this behaviour occurs even
when there are 0 messages going through a brand new, locally running kafka
cluster.

Kind regards,
Fil

On Tue, 29 Mar 2022 at 09:34, Arvid Heise  wrote:

> Hi Filip,
>
> two things will impact sync time for Kafka:
> 1. Flushing all old data [1], in particular flushing all in-flight
> partitions [2]. However, that shouldn't cause a stacking effect except when
> the brokers are overloaded on checkpoint.
> 2. Opening a new transaction [3]. Since all transactions are linearized on
> the Kafka brokers, this is the most likely root cause. Note that aborted
> checkpoints may require multiple transactions to be opened. So you could
> check if you have them quite often aborted.
>
> If you want to know more, I suggest you attach a profiler and find the
> specific culprit and report back [4]. There is a low probability that the
> sink framework has a bug that causes this behavior. In that case, we can
> fix it more easily than if it's a fundamental issue with Kafka. In general,
> exactly-once and low latency are somewhat contradicting requirements, so
> there is only so much you can do.
>
> Not knowing your topology but maybe you can reduce the number of sinks?
> With the KafkaRecordSerializationSchema you can set different topics for
> different ProducerRecords of the same DataStream.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L190-L190
> [2]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java#L177-L183
> [3]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L302-L321
> [4]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/debugging/application_profiling/
>
>
> On Sat, Mar 26, 2022 at 2:11 PM Filip Karnicki 
> wrote:
>
>> Hi, I noticed that with each added (kafka) sink with exactly-once
>> guarantees, there looks to be a penalty of ~100ms in terms of sync
>> checkpointing time.
>>
>> Would anyone be able to explain and/or point me in the right direction in
>> the source code so that I could understand why that is? Specifically, why
>> there appears to be a 100ms added for _each_ sink, and not a flat 100ms for
>> all sinks, potentially pointing to a sequential set of IO calls (wld
>> guess)
>>
>> I would be keen to understand if there's anything I could do (incl.
>> contributing code) that would parallelise this penalty in terms of sync
>> checkpointing time.
>>
>> Alternatively, is there any setting that would help me bring the sync
>> checkpointing time down (and still get exactly-once guarantees)?
>>
>> Many thanks,
>> Fil
>>
>


Re: Pyflink elastic search connectors

2022-03-29 Thread Xingbo Huang
Hi,

Are you using datastream api or table api?If you are using the table api,
you can use the connector by executing sql[1]. If you are using the
datastream api, there is really no es connector api provided, you need to
write python wrapper code, but the wrapper code is very simple. The
underlying code takes use of py4j to call the java api of es connector. For
details, you can refer to the wrapper code in kafka or pulsar[2].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/
[2]
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py

Best,
Xingbo

Sandeep Sharat  于2022年3月29日周二 20:51写道:

> Hello Everyone,
>
> I have been working on a streaming application using elasticsearch as the
> sink. I had achieved it using the java api quite easily. But due to a
> recent policy change we are moving towards the python api for flink,
> however we were unable to find any python elastic search connectors for
> flink. We were able to find support for the kafka connectors in python.
> Does it mean that we have to write our own connectors in python  to
> make use of the flink-elasticsearch connector jar?
>
> Thanks in advance
> --
> Thanks & Regards
> Sandeep Sharat Kumar
>


How to debug Metaspace exception?

2022-03-29 Thread John Smith
Hi running 1.14.4

My tasks manager still fails with java.lang.OutOfMemoryError: Metaspace.
The metaspace out-of-memory error has occurred. This can mean two things:
either the job requires a larger size of JVM metaspace to load classes or
there is a class loading leak.

I have 2GB of metaspace configed taskmanager.memory.jvm-metaspace.size:
2048m

But the task nodes still fail.

When looking at the UI metrics, the metaspace starts low. Now I see 85%
usage. It seems to be a class loading leak at this point, how can we debug
this issue?


Re: Pyflink elastic search connectors

2022-03-29 Thread Sandeep Sharat
Hi,

Thank you for the quick responses. We are using the datastream api for
pyflink. We are trying to implement a wrapper in python for the same as we
speak. Hopefully it will work out. 

On Wed, 30 Mar, 2022, 8:02 am Xingbo Huang,  wrote:

> Hi,
>
> Are you using datastream api or table api?If you are using the table api,
> you can use the connector by executing sql[1]. If you are using the
> datastream api, there is really no es connector api provided, you need to
> write python wrapper code, but the wrapper code is very simple. The
> underlying code takes use of py4j to call the java api of es connector. For
> details, you can refer to the wrapper code in kafka or pulsar[2].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/
> [2]
> https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py
>
> Best,
> Xingbo
>
> Sandeep Sharat  于2022年3月29日周二 20:51写道:
>
>> Hello Everyone,
>>
>> I have been working on a streaming application using elasticsearch as the
>> sink. I had achieved it using the java api quite easily. But due to a
>> recent policy change we are moving towards the python api for flink,
>> however we were unable to find any python elastic search connectors for
>> flink. We were able to find support for the kafka connectors in python.
>> Does it mean that we have to write our own connectors in python  to
>> make use of the flink-elasticsearch connector jar?
>>
>> Thanks in advance
>> --
>> Thanks & Regards
>> Sandeep Sharat Kumar
>>
>


Re: flink docker image (1.14.4) unable to access other pods from flink program (job and task manager access is fine)

2022-03-29 Thread huweihua
Hi, Jin

Can you provide more information about Flink cluster deployment modes? Is
it running in Kubernetes/YARN or standalone mode?
Maybe you can use application mode to keeps the environment (network
accessibility) always keep same. Application mode will run the user-main
method in the JobManager,

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#application-mode



Jin Yi  于2022年3月29日周二 11:23写道:

> i have a flink job that uses redis as a sink.  i optionally do some wiping
> and metadata writing from the job submitting flink program before it
> actually executes/submits the job to the job manager.  when i don't do this
> redis preparation, the redis sink works completely fine.  that is, the
> redis commands work fine from the taskmanager tasks.  however, if i enable
> the option redis preparation from within the flink job program, it fails to
> contact redis and hits a timeout exception.
>
> previously, we were using 1.12.3, and this behavior worked fine.  is the
> 1.14.4 flink docker image pretty restrictive when it comes to network
> access at the job submission client layer?
>
> thanks.
>


RE: Watermarks event time vs processing time

2022-03-29 Thread Schwalbe Matthias
Hello Hans-Peter,

I’m a little confused which version of your code you are testing against:

  *   ProcessingTimeSessionWindows or EventTimeSessionWindows?
  *   did you keep the withIdleness() ??

As said before:

  *   for ProcessingTimeSessionWindows, watermarks play no role
  *   if you keep withIdleness(), then the respective sparse DataStream is 
event-time-less most of the time, i.e. no triggers fire to close a session 
window
  *   withIdleness() makes only sense if you merge/union/connect multiple 
DataStream where at least one stream has their watermarks updated regularly 
(i.e. it is not withIdleness())
 *   this is not your case, your DAG is linear, no union nor connects
  *   in event-time mode processing time plays no role, watermarks exclusively 
take the role of the progress of model (event) time and hence the triggering of 
windows
  *   in order to trigger a (session-)window at time A the window operator 
needs to receive a watermark of at least time A
  *   next catch regards partitioning
 *   your first watermark strategy kafkaWmstrategy generates 
per-Kafka-partition watermarks
 *   a keyBy() reshuffles these partitions onto the number of subtasks 
according to the hash of the key
 *   this results in a per subtask calculation of the lowest watermark of 
all Kafka partitions that happen to be processed by that subtask
 *   i.e. if a single Kafka partition makes no watermark progress the 
subtask watermark makes no progress
 *   this surfaces in sparse data as in your case
  *   your second watermark strategy wmStrategy makes things worse because
 *   it discards the correct watermarks of the first watermark strategy
 *   and replaces it with something that is arbitrary (at this point it is 
hard to guess the correct max lateness that is a mixture of the events from 
multiple Kafka partitions)

Concusion:
The only way to make the event time session windows work for you in a timely 
manner is to make sure watermarks on all involved partitions make progress, 
i.e. new events arrive on all partitions in a regular manner.

Hope this helps

Thias


From: HG 
Sent: Tuesday, March 29, 2022 1:07 PM
To: Schwalbe Matthias 
Cc: user 
Subject: Re: Watermarks event time vs processing time

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hello Matthias,

When I remove all the watermark strategies it does not process anything .
For example when I use WatermarkStrategy.noWatermarks() instead of the one I 
build nothing seems to happen at all.

 Also when I skip the part where I add wmStrategy  to create tuple4dswm:
 DataStream> tuple4dswm = 
tuple4ds.assignTimestampsAndWatermarks(wmStrategy);

Nothing is processed.

Regards Hans-Peter

Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>>:
Hi Hanspeter,

Let me relate some hints that might help you getting concepts clearer.

From your description I make following assumptions where your are not specific 
enough (please confirm or correct in your answer):

a.   You store incoming events in state per transaction_id to be 
sorted/aggregated(min/max time) by event time later on

b.   So far you used a session window to determine the point in time when 
to emit the stored/enriched/sorted events

c.Watermarks are generated with bounded out of orderness

d.   You use session windows with a specific gap

e.   In your experiment you ever only send 1000 events and then stop 
producing incoming events

Now to your questions:

  *   For processing time session windows, watermarks play no role whatsoever, 
you simply assume that you’ve seen all events belonging so a single transaction 
id if the last such event for a specific transaction id was processed 
sessionWindowGap milliseconds ago
  *   Therefore you see all enriched incoming events the latest 
sessionWindowGap ms after the last incoming event (+ some latency)
  *   In event time mode and resp event time session windows the situation is 
exactly the same, only that processing time play no role
  *   A watermark means (ideally) that no event older than the watermark time 
ever follows the watermark (which itself is a meta-event that flows with the 
proper events on the same channels)
  *   In order for a session gap window to forward the enriched events the 
window operator needs to receive a watermark that is sessionWindowGap 
milliseconds beyond the latest incoming event (in terms of the respective event 
time)
  *   The watermark generator in order to generate a new watermark that 
triggers this last session window above needs to encounter an (any) event that 
has a timestamp of ( + outOfOrderness + 
sessionWindowGap + 1ms)
  *   Remember, the watermark generator never generated watermarks based on 
processing time, but only based on the timestamps it has seen in events 
actually encountered
  *   Coming back to your idleness configuration: it only means that the 
incoming stream becomes idle == timeless 

Re: Watermarks event time vs processing time

2022-03-29 Thread HG
Hello Matthias,

I am still using ProcessingTimeSessionWindow.
But it turns out I was wrong.
I tested a couple of times and it did not seem to work.
But now it does with both watermarkstrategies removed.

My apologies.'
Regards Hans-Peter

This is the code:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setMaxParallelism(Integer.parseInt(envMaxParallelism));
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(Integer.parseInt(envEnableCheckpointing));


Properties kafkaProps  = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
inputBrokers);
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
inputGroupId);
kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
autoCommit);

kafkaProps.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
autoCommitInterval);
kafkaProps.setProperty("ssl.truststore.type", inputTrustStoreType);
kafkaProps.setProperty("ssl.truststore.password",
inputTrustStorePassword);
kafkaProps.setProperty("ssl.truststore.location",
inputTrustStoreLocation);
kafkaProps.setProperty("security.protocol", inputSecurityProtocol);
kafkaProps.setProperty("ssl.enabled.protocols",
inputSslEnabledProtocols);

KafkaSource source = KafkaSource.builder()
.setProperties(kafkaProps)
.setGroupId(inputGroupId)
.setClientIdPrefix(clientId)
.setTopics(kafkaInputTopic)
.setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();


/* Use the watermark stragegy to create a datastream */
DataStream ds = env.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka Source");

/* Split the ObjectNode into a Tuple4 */
DataStream> tuple4ds =
ds.flatMap(new Splitter())

DataStream  tuple4DsWmKeyedbytr =  tuple4ds
.keyBy(new KeySelector,
String>() {
@Override
public String getKey(Tuple4
value) throws Exception {
return value.f2;
}
})

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap

.allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
.process(new MyProcessWindowFunction());



Properties sinkkafkaProps  = new Properties();
sinkkafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
outputBrokers);
sinkkafkaProps.setProperty("ssl.truststore.type",
outputTrustStoreType);
sinkkafkaProps.setProperty("ssl.truststore.location",
outputTrustStoreLocation);
sinkkafkaProps.setProperty("ssl.truststore.password",
outputTrustStorePassword);
sinkkafkaProps.setProperty("security.protocol",
outputSecurityProtocol);
sinkkafkaProps.setProperty("max.request.size", maxRequestSize);
sinkkafkaProps.setProperty("ssl.enabled.protocols",
outputSslEnabledProtocols);


KafkaSink kSink = KafkaSink.builder()
.setBootstrapServers(outputBrokers)
.setKafkaProducerConfig(sinkkafkaProps)

.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(kafkaOutputTopic)
.setValueSerializationSchema(new
SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();

// Sink to the Kafka topic
tuple4DsWmKeyedbytr.sinkTo(kSink);


// Splits the Object node into a Tuple4
private static class Splitter implements FlatMapFunction> {
@Override
public void flatMap(ObjectNode json, Collector> out) throws Exception {
// retrieved handling_time twice intentionally one of which
will be used for the watermark strategy and the other for the calculation
of the elapsed time
out.collect(new Tuple4(json.get("value").get("handling_time").asLong(),
json.get("value").get("handling_time").asLong(),
json.get("value").get("transaction_id").asText(),
 json.get("value").get("original_event").toPrettyString()));
}
}

// Class to sort the events that belong to the same transactions
public static class SortEventsHandlingTime implements
Comparator> {

// Let's compare 2 Tuple4 objects
public int compare(Tuple4 o1,
Tuple4 o2) {
int result =
Long.compare(Long.parseLong(o1.getField(0).toString()),
Long.parseLong(o2.getField(0).toString()));
if (result > 0) {
return 1;
} 

Re: Watermarks event time vs processing time

2022-03-29 Thread HG
Hello Matthias,

When I remove all the watermark strategies it does not process anything .
For example when I use WatermarkStrategy.noWatermarks() instead of the one
I build nothing seems to happen at all.

 Also when I skip the part where I add wmStrategy  to create tuple4dswm:
 DataStream> tuple4dswm =
tuple4ds.assignTimestampsAndWatermarks(wmStrategy);

Nothing is processed.

Regards Hans-Peter

Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Hi Hanspeter,
>
>
>
> Let me relate some hints that might help you getting concepts clearer.
>
>
>
> From your description I make following assumptions where your are not
> specific enough (please confirm or correct in your answer):
>
>1. You store incoming events in state per transaction_id to be
>sorted/aggregated(min/max time) by event time later on
>2. So far you used a session window to determine the point in time
>when to emit the stored/enriched/sorted events
>3. Watermarks are generated with bounded out of orderness
>4. You use session windows with a specific gap
>5. In your experiment you ever only send 1000 events and then stop
>producing incoming events
>
>
>
> Now to your questions:
>
>- For processing time session windows, watermarks play no role
>whatsoever, you simply assume that you’ve seen all events belonging so a
>single transaction id if the last such event for a specific transaction id
>was processed sessionWindowGap milliseconds ago
>- Therefore you see all enriched incoming events the latest
>sessionWindowGap ms after the last incoming event (+ some latency)
>- In event time mode and resp event time session windows the situation
>is exactly the same, only that processing time play no role
>- A watermark means (ideally) that no event older than the watermark
>time ever follows the watermark (which itself is a meta-event that flows
>with the proper events on the same channels)
>- In order for a session gap window to forward the enriched events the
>window operator needs to receive a watermark that is sessionWindowGap
>milliseconds beyond the latest incoming event (in terms of the respective
>event time)
>- The watermark generator in order to generate a new watermark that
>triggers this last session window above needs to encounter an (any) event
>that has a timestamp of ( + outOfOrderness
>+ sessionWindowGap + 1ms)
>- Remember, the watermark generator never generated watermarks based
>on processing time, but only based on the timestamps it has seen in events
>actually encountered
>- Coming back to your idleness configuration: it only means that the
>incoming stream becomes idle == timeless after a while … i.e. watermarks
>won’t make progress from this steam, and it tells all downstream operators
>- Idleness specification is only useful if a respective operator has
>another source of valid watermarks (i.e. after a union of two streams, one
>active/one idle ….). this is not your case
>
>
>
> I hope this clarifies your situation.
>
>
>
> Cheers
>
>
>
>
>
> Thias
>
>
>
>
>
> *From:* HG 
> *Sent:* Mittwoch, 16. März 2022 10:06
> *To:* user 
> *Subject:* Watermarks event time vs processing time
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi,
>
>
>
> I read from a Kafka topic events that are in JSON format
>
> These event contain a handling time (aka event time) in epoch
> milliseconds, a transaction_id and a large nested JSON structure.
>
> I need to group the events by transaction_id, order them by handling time
> and calculate the differences in handling time.
>
> The events are updated with this calculated elapsed time and pushed
> further.
>
> So all events that go in should come out with the elapsed time added.
>
>
>
> For testing I use events that are old (so handling time is not nearly the
> wall clock time)
>
> Initially I used EventTimeSessionWindows but somehow the processing did
> not run as expected.
>
> When I pushed 1000 events eventually 800 or so would appear at the output.
> This was resolved by switching to ProcessingTimeSessionWindows .
> My thought was then that I could remove the watermarkstrategies with
> watermarks with timestamp assigners (handling time) for the Kafka input
> stream and the data stream.
>
> However this was not the case.
>
>
>
> Can anyone enlighten me as to why the watermark strategies are still
> needed?
>
>
>
> Below the code
>
>
>
> KafkaSource source = KafkaSource.builder()
> .setProperties(kafkaProps)
> .setProperty("ssl.truststore.type", trustStoreType)
> .setProperty("ssl.truststore.password", trustStorePassword)
> .setProperty("ssl.truststore.location", trustStoreLocation)
> .setProperty("security.protocol", securityProtocol)
> .setProperty("partition.discovery.interval.ms",
> 

Parallel processing in a 2 node cluster apparently not working

2022-03-29 Thread HG
Hi,

I have a 2 node cluster just for testing.
When I start the cluster and the job I see that the parallelism is 2 as
expected.
But only they are both on the same node.
When I stop the taskmanager on that node it switches to the other one.
But I expected both nodes to have a subtask.

See below.

Any clues?

Regards Hans-Peter

[image: image.png]
[image: image.png]


Pyflink elastic search connectors

2022-03-29 Thread Sandeep Sharat
Hello Everyone,

I have been working on a streaming application using elasticsearch as the
sink. I had achieved it using the java api quite easily. But due to a
recent policy change we are moving towards the python api for flink,
however we were unable to find any python elastic search connectors for
flink. We were able to find support for the kafka connectors in python.
Does it mean that we have to write our own connectors in python  to
make use of the flink-elasticsearch connector jar?

Thanks in advance
-- 
Thanks & Regards
Sandeep Sharat Kumar


Re: flink docker image (1.14.4) unable to access other pods from flink program (job and task manager access is fine)

2022-03-29 Thread 胡伟华
I see, can you provide the startup command for 1.12.3 and 1.14.4?
Are these startup commands running on the same node?

> 2022年3月29日 下午10:32,Jin Yi  写道:
> 
> it's running in k8s.  we're not running in app mode b/c we have many jobs 
> running in the same flink cluster.
> 
> On Tue, Mar 29, 2022 at 4:29 AM huweihua  > wrote:
> Hi, Jin
> 
> Can you provide more information about Flink cluster deployment modes? Is it 
> running in Kubernetes/YARN or standalone mode?
> Maybe you can use application mode to keeps the environment (network 
> accessibility) always keep same. Application mode will run the user-main 
> method in the JobManager,
> 
> [1]https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#application-mode
>  
> 
> 
> 
> 
> Jin Yi mailto:j...@promoted.ai>> 于2022年3月29日周二 11:23写道:
> i have a flink job that uses redis as a sink.  i optionally do some wiping 
> and metadata writing from the job submitting flink program before it actually 
> executes/submits the job to the job manager.  when i don't do this redis 
> preparation, the redis sink works completely fine.  that is, the redis 
> commands work fine from the taskmanager tasks.  however, if i enable the 
> option redis preparation from within the flink job program, it fails to 
> contact redis and hits a timeout exception.
> 
> previously, we were using 1.12.3, and this behavior worked fine.  is the 
> 1.14.4 flink docker image pretty restrictive when it comes to network access 
> at the job submission client layer?
> 
> thanks.



Re: flink docker image (1.14.4) unable to access other pods from flink program (job and task manager access is fine)

2022-03-29 Thread 胡伟华
Are you referring to creating Flink cluster on Kubernetes by yaml file?

How did you submit the job to Flink cluster? Not via the command line (flink 
run xxx)?

> 2022年3月29日 下午10:38,Jin Yi  写道:
> 
> no they are not.  b/c we are using k8s, we use kubectl apply commands with a 
> yaml file to specify the startup.
> 
> On Tue, Mar 29, 2022 at 7:37 AM 胡伟华  > wrote:
> I see, can you provide the startup command for 1.12.3 and 1.14.4?
> Are these startup commands running on the same node?
> 
>> 2022年3月29日 下午10:32,Jin Yi mailto:j...@promoted.ai>> 写道:
>> 
>> it's running in k8s.  we're not running in app mode b/c we have many jobs 
>> running in the same flink cluster.
>> 
>> On Tue, Mar 29, 2022 at 4:29 AM huweihua > > wrote:
>> Hi, Jin
>> 
>> Can you provide more information about Flink cluster deployment modes? Is it 
>> running in Kubernetes/YARN or standalone mode?
>> Maybe you can use application mode to keeps the environment (network 
>> accessibility) always keep same. Application mode will run the user-main 
>> method in the JobManager,
>> 
>> [1]https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#application-mode
>>  
>> 
>> 
>> 
>> 
>> Jin Yi mailto:j...@promoted.ai>> 于2022年3月29日周二 11:23写道:
>> i have a flink job that uses redis as a sink.  i optionally do some wiping 
>> and metadata writing from the job submitting flink program before it 
>> actually executes/submits the job to the job manager.  when i don't do this 
>> redis preparation, the redis sink works completely fine.  that is, the redis 
>> commands work fine from the taskmanager tasks.  however, if i enable the 
>> option redis preparation from within the flink job program, it fails to 
>> contact redis and hits a timeout exception.
>> 
>> previously, we were using 1.12.3, and this behavior worked fine.  is the 
>> 1.14.4 flink docker image pretty restrictive when it comes to network access 
>> at the job submission client layer?
>> 
>> thanks.
> 



Re: SQL Client Kafka (UPSERT?) Sink for confluent-avro

2022-03-29 Thread Georg Heiler
I got it working now: It needs to be specified both for the key and value



thanks

Am Mo., 28. März 2022 um 13:33 Uhr schrieb Ingo Bürk :

> Hi Georg,
>
> which Flink version are you using? The missing property is for the
> avro-confluent format, and if I recall correctly, how these are passed
> has changed in recent versions, so it'd be good to double check you are
> using the documentation for the version you are running on.
>
>
> Best
> Ingo
>
> On 24.03.22 11:57, Georg Heiler wrote:
> > Hi,
> >
> > how can I get Flinks SQL client to nicely sink some data to either the
> > regular kafka or the kafka-upsert connector?
> >
> > I have a table/ topic with dummy data:
> > CREATE TABLE metrics_brand_stream (
> >  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
> >  WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE,
> >`partition` BIGINT METADATA VIRTUAL,
> >`offset` BIGINT METADATA VIRTUAL,
> >  brand string,
> >  duration int,
> >  rating int
> >
> > ) WITH (
> >  'connector' = 'kafka',
> >  'topic' = 'commercials_avro',
> >  'scan.startup.mode' = 'earliest-offset',
> >  'format' = 'avro-confluent',
> >  'avro-confluent.schema-registry.url' = 'http://localhost:8081/
> > ',
> >  'properties.group.id ' =
> 'flink-test-001',
> >  'properties.bootstrap.servers' = 'localhost:9092'
> > );
> >
> > And the following aggregation:
> >
> > SELECT brand,
> >   COUNT(*) AS cnt,
> >   AVG(duration) AS  duration_mean,
> >   AVG(rating) AS rating_mean
> >FROM metrics_brand_stream
> >GROUP BY brand;
> >
> > When trying to define an output table:
> >
> > CREATE TABLE metrics_per_brand (
> >  brand string,
> >  cnt BIGINT,
> >  duration_mean DOUBLE,
> >  rating_mean DOUBLE
> >
> > ) WITH (
> >  'connector' = 'upsert-kafka',
> >  'topic' = 'metrics_per_brand',
> >  'avro-confluent.schema-registry.url' = 'http://localhost:8081/
> > ',
> >  'properties.group.id ' =
> 'flink-test-001',
> >  'properties.bootstrap.servers' = 'localhost:9092',
> >  'key.format' = 'avro-confluent',
> >  'value.format' = 'avro-confluent'
> > );
> >
> > And trying to INSERT some result data:
> >
> > INSERT INTO metrics_per_brand
> >SELECT brand,
> >   COUNT(*) AS cnt,
> >   AVG(duration) AS  duration_mean,
> >   AVG(rating) AS rating_mean
> >FROM metrics_brand_stream
> >GROUP BY brand;
> >
> > The query fails with:
> >
> > org.apache.flink.table.api.ValidationException: One or more required
> > options are missing.
> >
> > Missing required options are:
> >
> > url
> >
> > But neither:
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/
> > <
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/>
>
> > nor
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/
> > <
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/>
>
> > nor
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
> > <
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/>
>
> > seems to list the right configuration (or I am misreading the
> > documentation).
> >
> >
> > How can I sink data to kafka after some arbitrary computation using the
> > flink-sql client using either the kafka or upsert-kafka connector where
> > the input is AVRO with a schema from the confluent schema registry and
> > the output should store its schema there as well (and serialize using
> AVRO).
> >
> >
> > Best,
> > Georg
>


Re: flink docker image (1.14.4) unable to access other pods from flink program (job and task manager access is fine)

2022-03-29 Thread Jin Yi
it's running in k8s.  we're not running in app mode b/c we have many jobs
running in the same flink cluster.

On Tue, Mar 29, 2022 at 4:29 AM huweihua  wrote:

> Hi, Jin
>
> Can you provide more information about Flink cluster deployment modes? Is
> it running in Kubernetes/YARN or standalone mode?
> Maybe you can use application mode to keeps the environment (network
> accessibility) always keep same. Application mode will run the user-main
> method in the JobManager,
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#application-mode
>
>
>
> Jin Yi  于2022年3月29日周二 11:23写道:
>
>> i have a flink job that uses redis as a sink.  i optionally do some
>> wiping and metadata writing from the job submitting flink program before it
>> actually executes/submits the job to the job manager.  when i don't do this
>> redis preparation, the redis sink works completely fine.  that is, the
>> redis commands work fine from the taskmanager tasks.  however, if i enable
>> the option redis preparation from within the flink job program, it fails to
>> contact redis and hits a timeout exception.
>>
>> previously, we were using 1.12.3, and this behavior worked fine.  is the
>> 1.14.4 flink docker image pretty restrictive when it comes to network
>> access at the job submission client layer?
>>
>> thanks.
>>
>


Re: flink docker image (1.14.4) unable to access other pods from flink program (job and task manager access is fine)

2022-03-29 Thread Jin Yi
no they are not.  b/c we are using k8s, we use kubectl apply commands with
a yaml file to specify the startup.

On Tue, Mar 29, 2022 at 7:37 AM 胡伟华  wrote:

> I see, can you provide the startup command for 1.12.3 and 1.14.4?
> Are these startup commands running on the same node?
>
> 2022年3月29日 下午10:32,Jin Yi  写道:
>
> it's running in k8s.  we're not running in app mode b/c we have many jobs
> running in the same flink cluster.
>
> On Tue, Mar 29, 2022 at 4:29 AM huweihua  wrote:
>
>> Hi, Jin
>>
>> Can you provide more information about Flink cluster deployment modes? Is
>> it running in Kubernetes/YARN or standalone mode?
>> Maybe you can use application mode to keeps the environment (network
>> accessibility) always keep same. Application mode will run the user-main
>> method in the JobManager,
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#application-mode
>>
>>
>>
>> Jin Yi  于2022年3月29日周二 11:23写道:
>>
>>> i have a flink job that uses redis as a sink.  i optionally do some
>>> wiping and metadata writing from the job submitting flink program before it
>>> actually executes/submits the job to the job manager.  when i don't do this
>>> redis preparation, the redis sink works completely fine.  that is, the
>>> redis commands work fine from the taskmanager tasks.  however, if i enable
>>> the option redis preparation from within the flink job program, it fails to
>>> contact redis and hits a timeout exception.
>>>
>>> previously, we were using 1.12.3, and this behavior worked fine.  is the
>>> 1.14.4 flink docker image pretty restrictive when it comes to network
>>> access at the job submission client layer?
>>>
>>> thanks.
>>>
>>
>


Re: "Native Kubernetes" sample in Flink documentation fails. JobManager Web Interface is wrongly generated. [Flink 1.14.4]

2022-03-29 Thread Yang Wang
By default, the idle TaskManager will be released after 30s(configured via
"resourcemanager.taskmanager-timeout").
If it could not be removed, you need to check the JobManager logs for the
root cause. Maybe it does not have enough permission or sth else.

Best,
Yang

Burcu Gul POLAT EGRI  于2022年3月29日周二 13:15写道:

> Thank you, I have tried the first suggestion and the sample job executed
> successfully (last executed command is like below).
>
>
>
> But I have another question. After executing the below command, a new task
> manager pod is created as expected but it is not removed automatically
> after the execution completed. Actually, for native kubernetes, I expect
> that the task manager pod should disappear after job completion.
>
> Do you have any comment for this? Are there any other configuration for
> task manager pod removal?
>
>
>
>
>
> ./bin/flink run --target kubernetes-session
> -Dkubernetes.service-account=flink-service-account
> -Dkubernetes.rest-service.exposed.type=NodePort
> -Dkubernetes.cluster-id=dproc-example-flink-cluster-id
> -Dkubernetes.namespace=sdt-dproc-flink-test
> -Dkubernetes.config.file=/home/devuser/.kube/config
> examples/batch/WordCount.jar
>
>
>
> Best regards,
>
> Burcu
>
>
>
> *From:* Yang Wang [mailto:danrtsey...@gmail.com]
> *Sent:* Saturday, March 26, 2022 7:48 AM
> *To:* Burcu Gul POLAT EGRI 
> *Cc:* user@flink.apache.org
> *Subject:* Re: "Native Kubernetes" sample in Flink documentation fails.
> JobManager Web Interface is wrongly generated. [Flink 1.14.4]
>
>
>
> The root cause might be the LoadBalancer could not really work in your
> environment. We already have a ticket to track this[1] and will try to get
> it resolved in the next release.
>
>
>
> For now, could you please have a try by adding
> "-Dkubernetes.rest-service.exposed.type=NodePort" to your session and
> submission commands?
>
>
>
> Maybe you are also interested in the new flink-kubernetes-operator
> project[2]. It should make it easier to run a Flink application on the K8s.
>
>
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17231
>
> [2]. https://github.com/apache/flink-kubernetes-operator
>
>
>
> Best,
>
> Yang
>
>
>
> Burcu Gul POLAT EGRI  于2022年3月25日周五 21:39写道:
>
> I am getting the following error when I try to execute sample at Flink
> documentation - Native Kubernetes
> 
> .
>
> I have succedded to execute the first command in documentation by adding
> some extra parameters with the help of this post
> 
> .
>
> user@local:~/flink-1.14.4$ ./bin/kubernetes-session.sh \
>
> -Dkubernetes.cluster-id=dproc-example-flink-cluster-id \
>
> -Dtaskmanager.memory.process.size=4096m \
>
> -Dkubernetes.taskmanager.cpu=2 \
>
> -Dtaskmanager.numberOfTaskSlots=4 \
>
> -Dresourcemanager.taskmanager-timeout=360 \
>
> -Dkubernetes.namespace=sdt-dproc-flink-test \
>
> -Dkubernetes.config.file=/home/devuser/.kube/config \
>
> -Dkubernetes.jobmanager.service-account=flink-service-account
>
> After executing above command, I have listed the new pod like below.
>
> user@local:~/flink-1.14.4$ kubectl get pods
>
> NAME READY   STATUSRESTARTS   
> AGE
>
> dproc-example-flink-cluster-id-68c79bf67-mwh52   1/1 Running   0  
> 1m
>
> Then, I have executed the below command to submit example job.
>
> user@local:~/flink-1.14.4$ ./bin/flink run --target kubernetes-session \
>
> -Dkubernetes.service-account=flink-service-account \
>
> -Dkubernetes.cluster-id=dproc-example-flink-cluster-id \
>
> -Dkubernetes.namespace=sdt-dproc-flink-test \
>
> -Dkubernetes.config.file=/home/devuser/.kube/config
>
> examples/batch/WordCount.jar --input /home/user/sometexts.txt --output 
> /tmp/flinksample
>
> After a while, I received below logs:
>
> 2022-03-25 12:38:00,538 INFO  
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve 
> flink cluster dproc-example-flink-cluster-id successfully, JobManager Web 
> Interface: http://10.150.140.248:8081
>
>
>
> 
>
>  The program finished with the following exception:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
>
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>
> at 

Re: Parallel processing in a 2 node cluster apparently not working

2022-03-29 Thread David Anderson
In this situation, changing your configuration [1] to include

cluster.evenly-spread-out-slots: true

should change the scheduling behavior to what you are looking for.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#cluster-evenly-spread-out-slots

Regards,
David

On Tue, Mar 29, 2022 at 1:30 PM HG  wrote:

>
> Hi,
>
> I have a 2 node cluster just for testing.
> When I start the cluster and the job I see that the parallelism is 2 as
> expected.
> But only they are both on the same node.
> When I stop the taskmanager on that node it switches to the other one.
> But I expected both nodes to have a subtask.
>
> See below.
>
> Any clues?
>
> Regards Hans-Peter
>
> [image: image.png]
> [image: image.png]
>


Re: Wrong format when passing arguments with space

2022-03-29 Thread David Morávek
Hi Kevin,

-dev@f.a.o +user@f.a.o

Thanks for the report! I've run some experiments and unfortunately I'm not
able to reproduce the behavior you're describing. The bash "$@" expansion
seems to work as expected (always receiving correctly expanded unquoted
strings in the main class). Can you maybe elaborate about your environment
and submit a minimal reproducer?

Best,
D.

On Mon, Mar 28, 2022 at 6:00 AM Kevin Lee  wrote:

> Flink version : 1.13
>
> Bug:
> When I pass an argument with space by single quota.
> The main function get this argument with a double quota
>
> example:
> ./bin/flink run -c com.lmk.QuotaTest --rate 10 --time ''2022-03-28
> 11:53:21"
>
> The main function get parameters:
>
> 1-rate
> 2---10
> 3-time
> 4---"2022-03-28 11:53:21"
>
>
> I think flink shell should remove the double quota in "2022-03-28 11:53:21"
>
>
> Hope to get your reply asap
>


Re: Wrong format when passing arguments with space

2022-03-29 Thread David Morávek
cc Kevin

On Tue, Mar 29, 2022 at 9:15 AM David Morávek  wrote:

> Hi Kevin,
>
> -dev@f.a.o +user@f.a.o
>
> Thanks for the report! I've run some experiments and unfortunately I'm not
> able to reproduce the behavior you're describing. The bash "$@" expansion
> seems to work as expected (always receiving correctly expanded unquoted
> strings in the main class). Can you maybe elaborate about your environment
> and submit a minimal reproducer?
>
> Best,
> D.
>
> On Mon, Mar 28, 2022 at 6:00 AM Kevin Lee 
> wrote:
>
>> Flink version : 1.13
>>
>> Bug:
>> When I pass an argument with space by single quota.
>> The main function get this argument with a double quota
>>
>> example:
>> ./bin/flink run -c com.lmk.QuotaTest --rate 10 --time ''2022-03-28
>> 11:53:21"
>>
>> The main function get parameters:
>>
>> 1-rate
>> 2---10
>> 3-time
>> 4---"2022-03-28 11:53:21"
>>
>>
>> I think flink shell should remove the double quota in "2022-03-28
>> 11:53:21"
>>
>>
>> Hope to get your reply asap
>>
>


Re: Exactly-once sink sync checkpoint stacking time effect

2022-03-29 Thread Arvid Heise
Hi Filip,

two things will impact sync time for Kafka:
1. Flushing all old data [1], in particular flushing all in-flight
partitions [2]. However, that shouldn't cause a stacking effect except when
the brokers are overloaded on checkpoint.
2. Opening a new transaction [3]. Since all transactions are linearized on
the Kafka brokers, this is the most likely root cause. Note that aborted
checkpoints may require multiple transactions to be opened. So you could
check if you have them quite often aborted.

If you want to know more, I suggest you attach a profiler and find the
specific culprit and report back [4]. There is a low probability that the
sink framework has a bug that causes this behavior. In that case, we can
fix it more easily than if it's a fundamental issue with Kafka. In general,
exactly-once and low latency are somewhat contradicting requirements, so
there is only so much you can do.

Not knowing your topology but maybe you can reduce the number of sinks?
With the KafkaRecordSerializationSchema you can set different topics for
different ProducerRecords of the same DataStream.

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L190-L190
[2]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java#L177-L183
[3]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L302-L321
[4]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/debugging/application_profiling/


On Sat, Mar 26, 2022 at 2:11 PM Filip Karnicki 
wrote:

> Hi, I noticed that with each added (kafka) sink with exactly-once
> guarantees, there looks to be a penalty of ~100ms in terms of sync
> checkpointing time.
>
> Would anyone be able to explain and/or point me in the right direction in
> the source code so that I could understand why that is? Specifically, why
> there appears to be a 100ms added for _each_ sink, and not a flat 100ms for
> all sinks, potentially pointing to a sequential set of IO calls (wld
> guess)
>
> I would be keen to understand if there's anything I could do (incl.
> contributing code) that would parallelise this penalty in terms of sync
> checkpointing time.
>
> Alternatively, is there any setting that would help me bring the sync
> checkpointing time down (and still get exactly-once guarantees)?
>
> Many thanks,
> Fil
>


flink on k8s场景,大家一般如何解决访问hdfs的问题呢。

2022-03-29 Thread yidan zhao
如题,是需要打包hadoop client到镜像中吗。


退订

2022-03-29 Thread 袁超
退订


Re: 实时数据入库怎样过滤中间状态,保证最终一致

2022-03-29 Thread Guo Thompson
可以参考jdbc-connector写mysql的思路,在java里面用hashMap来存,key为 order_id
 ,然后定时把map的数据刷mysql

18703416...@163.com <18703416...@163.com> 于2022年3月1日周二 14:40写道:

> 首先确定 source 事件有 eventTime ,比如 source 的返回类型为 MySource
> 示例代码如下:
> static class MySource {
> Long ts;
> String key;
> Object object;
> }
> env.addSource(new SourceFunction() {
> @Override
> public void run(SourceContext ctx) throws Exception {
> ctx.collect(new MySource());
> }
> @Override
> public void cancel() {
> }
> }).keyBy(new KeySelector() {
> @Override
> public String getKey(MySource value) throws Exception {
> return value.key;
> }
> }).timeWindow(Time.seconds(10)).process(new
> ProcessWindowFunction() {
> @Override
> public void process(String s, Context context, Iterable
> elements, Collector out) throws Exception {
> List collect =
> Lists.newArrayList(elements).stream().sorted(new Comparator() {
> @Override
> public int compare(MySource o1, MySource o2) {
> return o2.ts.compareTo(o1.ts);
> }
> }).collect(Collectors.toList());
> if (collect.size() > 0){
> out.collect(collect.get(0).object);
> }
> }
> }).addSink(new SinkFunction() {
> @Override
> public void invoke(Object value, Context context) throws Exception {
> System.out.println(value);
> }
> });
>
>
>
>
>
> > 2022年3月1日 上午11:35,Lei Wang  写道:
> >
> > 谢谢,这种是可以。
> >
> > 取窗口内最新的数据怎么写合适呢,我直接这样试了下不符合预期:
> >
> > env.addSource(consumer).keyBy(new KeySelector() {
> >@Override
> >public String getKey(String value) throws Exception {
> >return value;
> >}
> > }).timeWindow(Time.seconds(10)).reduce((a,b)->b).addSink()
> >
> > 实际上逆序输出了窗口内的所有记录。
> >
> > 谢谢,
> >
> > 王磊
> >
> >
> >
> > On Mon, Feb 28, 2022 at 9:59 AM 18703416...@163.com <18703416...@163.com
> >
> > wrote:
> >
> >> keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小
> >>
> >>> 2022年2月25日 下午6:45,Lei Wang  写道:
> >>>
> >>> 场景描述:
> >>> Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下:
> >>> order_id   status
> >>> 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。
> >>>
> >>> 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id
> >>> 最终的状态不丢,但这个最终的状态也不确定是多少。
> >>>
> >>> 我的做法是 KeyBy  orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id
> >>> 最后来的两条记录时间间隔太小,会导致最终的状态丢失。
> >>>
> >>> 请问有什么其他的解决方法吗?
> >>>
> >>> 谢谢,
> >>> 王磊
> >>
> >>
>
>


Re:flink on k8s场景,大家一般如何解决访问hdfs的问题呢。

2022-03-29 Thread casel.chen
我们是直接使用云存储,像阿里云的oss,没有再搭建hadoop集群。如果flink on 
k8s的确需要访问hadoop的话,是需要打包hadoop发行包在镜像里面的,配置好core-site.xml, hdfs-site.xml等

















在 2022-03-30 12:01:54,"yidan zhao"  写道:
>如题,是需要打包hadoop client到镜像中吗。


Re: flink on k8s是否有替代yarn.ship-files的参数

2022-03-29 Thread shimin huang
好的 我了解下 感谢!

yu'an huang  于2022年3月28日周一 22:12写道:

> 你好,
>
>
> 可以看看这个链接中关于usrlib的介绍(Application mode部分)。
>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/standalone/docker/#docker-hub-flink-images
>
> Kubernetes不像yarn一样提供了ship文件的功能。对于Kubernetes application mode来说,用户程序是运行在Job
>
> Manager的,要求所有的artifacts都已经在镜像中存在。Flink会自动将$FLINK_HOME/usrlib目录下的文件都放入用户程序的classpath中,所以你需要按照链接中的方法,创建镜像,将你需要的artifacts提前放到镜像之中。然后在提交命令中指定主类和主类所用的JAR就可以了。
>
>
>
>
> On Mon, 28 Mar 2022 at 8:26 PM, shimin huang 
> wrote:
>
> > 1.12.0没有找到相关的配置,目前考虑测试下pipeline.classpaths指定对应的jars路径是否生效。
> >
> > Geng Biao  于2022年3月28日周一 20:18写道:
> >
> > > Hi shimin,
> > > 外部jar依赖可以看一下文档里usrlib在flink on k8s里的使用。
> > >
> > > Best,
> > > Biao
> > >
> > > 获取 Outlook for iOS
> > > 
> > > 发件人: shimin huang 
> > > 发送时间: Monday, March 28, 2022 8:14:28 PM
> > > 收件人: user-zh@flink.apache.org 
> > > 主题: flink on k8s是否有替代yarn.ship-files的参数
> > >
> > > flink version 1.12.0
> > >
> > > 近期在将flink on yarn迁移至flink on
> > >
> > >
> >
> k8s,以前外部的jar包和配置都是通过yarn.skip-files参数来进行配置加载的,想问下k8s是否有类似参数,目前在1.12.0的文档发现没找到类似的,有个
> > >
> > >
> >
> external-resource..yarn.config-key配置,但是没有具体的试用案例,希望有大佬能够解答下有什么好的方式吗
> > >
> >
>


Re: elasticsearch+hbase

2022-03-29 Thread Guo Thompson
hbase和es的数据是怎么实时同步的?

潘明文  于2022年3月1日周二 19:07写道:

> HI,
>现在环境是CDH
> 集群6台下,elasticsearch作为hbase二级索引,如何优化代码使得通过elasticsearch二级索引再查询hbase数据速度优化到0.1秒一下。谢谢。
>
>
>
>
>


Re: RocksDB 读 cpu 100% 如何调优

2022-03-29 Thread Guo Thompson
如果rocksDB的状态很大呢?例如:200G,这种开了火焰图经常发现瓶颈也是在rocksDB的get(),这种有优化思路么?

Yun Tang  于2022年3月21日周一 14:42写道:

> Hi,
>
> RocksDB 的CPU栈能卡在100%,很有可能是大量解压缩 index/filter block导致的,可以enable partition
> index/filter [1] 看看问题是否解决。
> 相关内容也可以参考我之前线下做过的分享 [2]
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-memory-partitioned-index-filters
>
> [2] https://developer.aliyun.com/article/784995 《Flink 1.13,State Backend
> 优化及生产实践分享》
>
> 祝好
> 唐云
>
> 
> From: Peihui He 
> Sent: Friday, March 18, 2022 20:16
> To: user-zh@flink.apache.org 
> Subject: Re: RocksDB 读 cpu 100% 如何调优
>
> OK,我这边加个metric,先观察下
>
> yue ma  于2022年3月18日周五 12:23写道:
>
> > hi
> > 我觉得这里可以注意两地方
> > 首先 你可以观察一下这个时候 task 的吞吐量是多少 ,如果 qps 特别高 ,比如作业重最旧的offset 消费,我觉得这个时候 cpu
> 100%
> > 是符合预期的。
> > 其次 你可以在代码中加一些内存缓存的逻辑 类似于 mini-batch, 来减少和 state 交互的频率,也许这样能缓解一部分问题。
> >
> > deng xuezhao  于2022年3月18日周五 11:19写道:
> >
> > > 退订
> > >
> > >
> > >
> > > 在 Peihui He ,2022年3月18日 上午11:18写道:
> > >
> > > Hi, all
> > >
> > > 如题,flink 任务使用rocksdb 做为状态后端,任务逻辑大概意思是:
> > > 来一条数据先判断该数据的key 是否再mapstat 中, 然后再将该key 写入mapstat中。
> > >
> > > 产生问题是当数据跑一段时间后,判断是否存在线程cpu总是100%,堆栈如下:
> > >
> > > "process (6/18)#0" Id=80 RUNNABLE (in native)
> > > at org.rocksdb.RocksDB.get(Native Method)
> > > at org.rocksdb.RocksDB.get(RocksDB.java:2084)
> > > at
> > >
> > >
> >
> org.apache.flink.contrib.streaming.state.RocksDBMapState.contains(RocksDBMapState.java:173)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72)
> > > at
> > >
> > >
> >
> com.huanju.security.soc.internal.hs.bigdata.FileScanToTiDB$$anon$12.processElement(FileScanToTiDB.scala:156)
> > > at
> > >
> > >
> >
> com.huanju.security.soc.internal.hs.bigdata.FileScanToTiDB$$anon$12.processElement(FileScanToTiDB.scala:145)
> > > at
> > >
> > >
> >
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> > > at
> > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> > > at
> > > org.apache.flink.streaming.runtime.io
> > >
> >
> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> > > at
> > > org.apache.flink.streaming.runtime.io
> > >
> >
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> > > at
> > > org.apache.flink.streaming.runtime.io
> > > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> > > at
> > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
> > > at
> > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$624/715942770.runDefaultAction(Unknown
> > > Source)
> > > at
> > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> > > at
> > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> > > at
> > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> > > at
> > >
> >
> org.apache.flink.runtime.taskmanager.Task$$Lambda$773/520411616.run(Unknown
> > > Source)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> > > at
> > >
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> > > at java.lang.Thread.run(Thread.java:748)
> > >
> > > 但是看checkpoint数据,才100m左右
> > >
> > > 请问大家 rocksdb 是出现什么性能瓶颈了呢? 改怎么调优呢?
> > >
> >
>