Re: flink how to access remote hdfs using namenode nameservice

2020-05-07 Thread Yang Wang
Do you mean to use the hdfs nameservice? You could find it with config key
"dfs.nameservices" in hdfs-site.xml. For example,
hdfs://myhdfs/flink/recovery.

Please keep in mind that you need to set the HADOOP_CONF_DIR environment
beforehand.


Best,
Yang

wangl...@geekplus.com.cn  于2020年5月7日周四 下午5:04写道:

>
> According to
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html
>
>
> I am deploying standalone  cluster with jobmanager HA and need the hdfs
> address:
>
> high-availability.storageDir: hdfs:///flink/recovery
>
> My hadoop is a remote cluster. I can write it as
> hdfs://active-namenode-ip:8020. But this way lost namenode HA
>
> Is there's any method that I can config it as hdfs://name-service:8020
>
> Thanks,
> Lei
>
>
> --
> wangl...@geekplus.com.cn
>
>


What's the best practice to determine whether a job has finished or not?

2020-05-07 Thread Caizhi Weng
Hi dear Flink community,

I would like to determine whether a job has finished (no matter
successfully or exceptionally) in my code.

I used to think that JobClient#getJobStatus is a good idea, but I found
that it behaves quite differently under different executing environments.
For example, under a standalone session cluster it will return the FINISHED
status for a finished job, while under a yarn per job cluster it will throw
a ApplicationNotFound exception. I'm afraid that there might be other
behaviors for other environments.

So what's the best practice to determine whether a job has finished or not?
Note that I'm not waiting for the job to finish. If the job hasn't finished
I would like to know it and do something else.


Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-07 Thread Trystan
Aha, so incremental checkpointing *does* rely on infinitely-previous
checkpoint state, regardless of the incremental retention number. The
documentation wasn't entirely clear about this. One would assume that if
you retain 3 checkpoints, anything older than the 3rd is irrelevant, but
that's evidently not true. So it is never safe to delete any files in
/shared, because we can't know which files belong to the current job (and
may have lived on from checkpoint 1 even though we're on checkpoint 10 and
only "retain" 3) and which ones have been abandoned altogether (due to a
previous run of the job where we didn't restore state).

This is really unfortunate - it can lead to a case where you accumulate a
huge number of files in S3 and you can't know when ones to delete,
especially if the job id remains the same (for job mode, they're all
zeros). So this shared state lives on forever and there is no way to ever
clean it up, at all. I am surprised that this hasn't been a problem for
anyone else. Maybe I should just file a feature request for this, at least
to find some solution for ways to clean up these directories.

I appreciate your patience and help, thank you so much!

Trystan

On Thu, May 7, 2020 at 7:15 PM Congxian Qiu  wrote:

> Hi
>
> Yes, there should only files used in checkpoint 8 and 9 and 10 in the
> checkpoint file, but you can not delete the file which created older than 3
> minutes(because checkpoint 8,9, 10 may reuse the file created in the
> previous checkpoint, this is the how incremental checkpoint works[1])
>
> you can also check the directory of checkpoint files[2] for more
> information, copied from the website here:
> > The SHARED directory is for state that is possibly part of multiple
> checkpoints, TASKOWNED is for state that must never be dropped by the
> JobManager, and EXCLUSIVE is for state that belongs to one checkpoint
> only.
>
> For the entropy injection, you can enable it as the documentation said, it
> will replace the entropy_key with some random strings with the
> specified length so that the files are not all in the same directory.
>
> [1]
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
> Best,
> Congxian
>
>
> Trystan  于2020年5月7日周四 下午12:54写道:
>
>> Thanks Congxian! To make sure I'm understanding correctly, if I retain 3
>> incremental checkpoints (say every minute), and I've just completed
>> checkpoint 10, then anything in shared is from checkpoint 8 and 9 only. So
>> anything older than ~3 minutes can safely be deleted? The state from
>> checkpoint 5 doesn't live on in the shared directory - at all?
>>
>> I ask because we have run into cases where we end up abandoning the
>> state, and Flink does not clean up state from, say, a previous iteration of
>> the job if you don't restore state. We need to remove these files
>> automatically, but I want to be sure that I don't blow away older files in
>> the shared dir from earlier, subsumed checkpoints - but you are saying that
>> isn't possible, and that all subsumed checkpoints will have their /shared
>> state rewritten or cleaned up as needed, correct?
>>
>> As for entropy, where would you suggest to use it? My understanding is
>> that I don't control anything beyond the checkpoint directory, and since
>> shared is in that directory I can't put entropy inside the shared directory
>> itself (which is what I would need).
>>
>> Thanks,
>> Trystan
>>
>> On Wed, May 6, 2020 at 7:31 PM Congxian Qiu 
>> wrote:
>>
>>> Hi
>>> For the rate limit, could you please try entropy injection[1].
>>> For checkpoint, Flink will handle the file lifecycle(it will delete the
>>> file if it will never be used in the future). The file in the checkpoint
>>> will be there if the corresponding checkpoint is still valid.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/s3.html#entropy-injection-for-s3-file-systems
>>> Best,
>>> Congxian
>>>
>>>
>>> Trystan  于2020年5月7日周四 上午2:46写道:
>>>
 Hello!

 Recently we ran into an issue when checkpointing to S3. Because S3
 ratelimits based on prefix, the /shared directory would get slammed and
 cause S3 throttling. There is no solution for this, because
 /job/checkpoint/:id/shared is all part of the prefix, and is limited to 
 3,500
 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix.

 (source:
 https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html
 )

 Jobs sometimes also completely crash, and they leave state laying
 around when we bring the job up fresh.

 Our solutions have been to 1) reduce the number of taskmanagers 2)
 reduce the state.backend.rocksdb.checkpoint.transfer.thread.num back to 1
 (we had increased it to speed up checkpointing/savepoint) and 3) manually
 delete tons of old files in the share

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-07 Thread Congxian Qiu
Hi

Yes, there should only files used in checkpoint 8 and 9 and 10 in the
checkpoint file, but you can not delete the file which created older than 3
minutes(because checkpoint 8,9, 10 may reuse the file created in the
previous checkpoint, this is the how incremental checkpoint works[1])

you can also check the directory of checkpoint files[2] for more
information, copied from the website here:
> The SHARED directory is for state that is possibly part of multiple
checkpoints, TASKOWNED is for state that must never be dropped by the
JobManager, and EXCLUSIVE is for state that belongs to one checkpoint only.

For the entropy injection, you can enable it as the documentation said, it
will replace the entropy_key with some random strings with the
specified length so that the files are not all in the same directory.

[1]
https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
Best,
Congxian


Trystan  于2020年5月7日周四 下午12:54写道:

> Thanks Congxian! To make sure I'm understanding correctly, if I retain 3
> incremental checkpoints (say every minute), and I've just completed
> checkpoint 10, then anything in shared is from checkpoint 8 and 9 only. So
> anything older than ~3 minutes can safely be deleted? The state from
> checkpoint 5 doesn't live on in the shared directory - at all?
>
> I ask because we have run into cases where we end up abandoning the state,
> and Flink does not clean up state from, say, a previous iteration of the
> job if you don't restore state. We need to remove these files
> automatically, but I want to be sure that I don't blow away older files in
> the shared dir from earlier, subsumed checkpoints - but you are saying that
> isn't possible, and that all subsumed checkpoints will have their /shared
> state rewritten or cleaned up as needed, correct?
>
> As for entropy, where would you suggest to use it? My understanding is
> that I don't control anything beyond the checkpoint directory, and since
> shared is in that directory I can't put entropy inside the shared directory
> itself (which is what I would need).
>
> Thanks,
> Trystan
>
> On Wed, May 6, 2020 at 7:31 PM Congxian Qiu 
> wrote:
>
>> Hi
>> For the rate limit, could you please try entropy injection[1].
>> For checkpoint, Flink will handle the file lifecycle(it will delete the
>> file if it will never be used in the future). The file in the checkpoint
>> will be there if the corresponding checkpoint is still valid.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/s3.html#entropy-injection-for-s3-file-systems
>> Best,
>> Congxian
>>
>>
>> Trystan  于2020年5月7日周四 上午2:46写道:
>>
>>> Hello!
>>>
>>> Recently we ran into an issue when checkpointing to S3. Because S3
>>> ratelimits based on prefix, the /shared directory would get slammed and
>>> cause S3 throttling. There is no solution for this, because
>>> /job/checkpoint/:id/shared is all part of the prefix, and is limited to 
>>> 3,500
>>> PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix.
>>>
>>> (source:
>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html
>>> )
>>>
>>> Jobs sometimes also completely crash, and they leave state laying around
>>> when we bring the job up fresh.
>>>
>>> Our solutions have been to 1) reduce the number of taskmanagers 2)
>>> reduce the state.backend.rocksdb.checkpoint.transfer.thread.num back to 1
>>> (we had increased it to speed up checkpointing/savepoint) and 3) manually
>>> delete tons of old files in the shared directory.
>>>
>>> My question:
>>> Can we safely apply a Lifecycle Policy to the directory/bucket to remove
>>> things? How long is stuff under /shared retained? Is it only for the
>>> duration of the oldest checkpoint, or could it carry forward, untouched,
>>> from the very first checkpoint to the very last? This shared checkpoint
>>> dir/prefix is currently limiting some scalability of our jobs. I don't
>>> believe the _entropy_ trick would help this, because the issue is
>>> ultimately that there's a single shared directory.
>>>
>>> Thank you!
>>> Trystan
>>>
>>


Re: Correctly implementing of SourceFunction.run()

2020-05-07 Thread Jingsong Li
Hi,

Some suggestions from my side:
- synchronized (checkpointLock) to some work and ctx.collect?
- Put Thread.sleep(interval) out of try catch? Maybe should not
swallow interrupt exception (Like cancel the job).

Best,
Jingsong Lee

On Fri, May 8, 2020 at 2:52 AM Senthil Kumar  wrote:

> I am implementing a source function which periodically wakes up and
> consumes data from S3.
>
>
>
> My currently implementation is like so.
>
> Following: 
> *org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction*
>
>
>
> Is it safe to simply swallow any and all exceptions in the run method and
> just rely on this.isRunning variable to quit the run() method?
>
>
>
> Cheers
>
> Kumar
>
>
>
> ---
>
>
>
> @Override
> *public void *cancel() {
> *this*.*isRunning *= *false*;   // Set volatile state variable, initially 
> set to true on Class
> }
>
>
>
> @Override
> *public void *run(SourceFunction.SourceContext ctx) {
> *while *(*this*.*isRunning*) {
> *try *{
> OUT out = /* Do some work */
> ctx.collect(out);
>
> Thread.*sleep*(*this*.*sleepIntervalHours ** 60 * 60 * 1000);
> *// Hours to milli seconds *} *catch*(Throwable t) {
>
> // Simply swallow
> }
> }
> }
>
>
>


-- 
Best, Jingsong Lee


Correctly implementing of SourceFunction.run()

2020-05-07 Thread Senthil Kumar
I am implementing a source function which periodically wakes up and consumes 
data from S3.


My currently implementation is like so.

Following: 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction

Is it safe to simply swallow any and all exceptions in the run method and just 
rely on this.isRunning variable to quit the run() method?

Cheers
Kumar

---


@Override
public void cancel() {
this.isRunning = false;   // Set volatile state variable, initially set to 
true on Class
}

@Override
public void run(SourceFunction.SourceContext ctx) {
while (this.isRunning) {
try {
OUT out = /* Do some work */
ctx.collect(out);

Thread.sleep(this.sleepIntervalHours * 60 * 60 * 1000); // Hours to 
milli seconds
} catch(Throwable t) {
// Simply swallow
}
}
}



Re: Rich Function Thread Safety

2020-05-07 Thread tao xiao
As the java doc suggests it seems operator method and snapshot checkpoint
are accessed by two different threads

https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java#L39-L62

On Thu, May 7, 2020 at 1:22 AM Joey Echeverria 
wrote:

> I’ve seen a few mailing list posts (including this one) that say Flink
> guarantees there is no concurrent access to operator methods (e.g. flatMap,
> snapshotState, etc.) and thus synchronization isn’t needed when writing
> operators that support checkpointing. I was trying to find a place in the
> official docs where this was called out, but was coming up empty.
>
> Is there a section of the docs that covers this topic?
>
> Thanks!
>
> -Joey
>
> On Dec 18, 2019, at 9:38 PM, Zhu Zhu  wrote:
>
> [--- This email originated from outside of the organization. Do not click
> links or open attachments unless you recognize the sender and know the
> content is safe. ---]
>
> Hi Aaron,
>
> It is thread safe since the state snapshot happens in the same thread with
> the user function.
>
> Thanks,
> Zhu Zhu
>
> Aaron Langford  于2019年12月19日周四 上午11:25写道:
>
>> Hello Flink Community,
>>
>> I'm hoping to verify some understanding:
>>
>> If I have a function with managed state, I'm wondering if a
>> checkpoint will ever be taken while a function is mutating state. I'll try
>> to illustrate the situation I'm hoping to be safe from:
>>
>> Happy Path:
>> t0 -> processFunction invoked with el1
>> t1 -> set A to 5
>> t2 -> set B to 10
>> t3 -> function returns
>>
>> Unhappy path:
>> t0 -> processFunction invoked with el1
>> t1 -> set A to 5
>> t2 -> function interrupted, checkpoint taken (A = 5, B = 1)
>> t3 -> set B to 10
>> t4 -> function returns
>> ...
>> tn -> flink application fails, restart from prev checkpoint (A=5, B=1)
>> tn+1 -> recovery begins somewhere, but state is torn anyway, so we're
>> going to have a bad time
>>
>> I don't think this could happen given that checkpoints effectively are
>> messages in the pipeline, and the checkpoint is only taken when an operator
>> sees the checkpoint barrier.
>>
>> Hoping to make sure this is correct!
>>
>> Aaron
>>
>
>

-- 
Regards,
Tao


Re: Statefun 2.0 questions

2020-05-07 Thread Wouter Zorgdrager
Hi Igal,

Thanks for your quick reply. Getting back to point 2, I was wondering if
you could trigger indeed a stateful function directly from Flask and also
get the reply there instead of using Kafka in between. We want to
experiment running stateful functions behind a front-end (which should be
able to trigger a function), but we're a bit afraid that using Kafka
doesn't scale well if on the frontend side a user has to consume all Kafka
messages to find the correct reply/output for a certain request/input. Any
thoughts?

Thanks in advance,
Wouter

Op do 7 mei 2020 om 10:51 schreef Igal Shilman :

> Hi Wouter!
>
> Glad to read that you are using Flink for quite some time, and also
> exploring with StateFun!
>
> 1) yes it is correct and you can follow the Dockerhub contribution PR at
> [1]
>
> 2) I’m not sure I understand what do you mean by trigger from the browser.
> If you mean, for testing / illustration purposes triggering the function
> independently of StateFun, you would need to write some JavaScript and
> preform the POST (assuming CORS are enabled)
> Let me know if you’d like getting further information of how to do it.
> Broadly speaking, GET is traditionally used to get data from a resource
> and POST to send data (the data is the invocation batch in our case).
>
> One easier walk around for you would be to expose another endpoint in your
> Flask application, and call your stateful function directly from there
> (possibly populating the function argument with values taken from the query
> params)
>
> 3) I would expect a performance loss when going from the embedded SDK to
> the remote one, simply because the remote function is at a different
> process, and a round trip is required. There are different ways of
> deployment even for remote functions.
> For example they can be co-located with the Task managers and communicate
> via the loop back device /Unix domain socket, or they can be deployed
> behind a load balancer with an auto-scaler, and thus reacting to higher
> request rate/latency increases by spinning new instances (something that is
> not yet supported with the embedded API)
>
> Good luck,
> Igal.
>
>
>
>
>
> [1] https://github.com/docker-library/official-images/pull/7749
>
>
> On Wednesday, May 6, 2020, Wouter Zorgdrager 
> wrote:
>
>> Hi all,
>>
>> I've been using Flink for quite some time now and for a university
>> project I'm planning to experiment with statefun. During the walkthrough
>> I've run into some issues, I hope you can help me with.
>>
>> 1) Is it correct that the Docker image of statefun is not yet published?
>> I couldn't find it anywhere, but was able to run it by building the image
>> myself.
>> 2) In the example project using the Python SDK, it uses Flask to expose a
>> function using POST. Is there also a way to serve GET request so that you
>> can trigger a stateful function by for instance using your browser?
>> 3) Do you expect a lot of performance loss when using the Python SDK over
>> Java?
>>
>> Thanks in advance!
>>
>> Regards,
>> Wouter
>>
>


Re: Window processing in Stateful Functions

2020-05-07 Thread Igal Shilman
Hi all,

Data stream windows are not yet supported in statefun, but it seems like
the main motivation here
is to purge old edges?
If this is the case perhaps we need to integrate state TTL [1] into
persisted values/persistedtables.

An alternative approach would be to implement a thumbling window per vertex
(a stateful function instance)
by sending to itself a delayed message [2]. When that specific delayed
message arrives you would
have to purge the oldest edges by examining the edges in state.

I hope it helps,
Igal.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#state-time-to-live-ttl
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/sdk/java.html#sending-delayed-messages



On Wednesday, May 6, 2020, Oytun Tez  wrote:

> Oops – will follow the thread 🙊
>
>
>  --
>
> [image: MotaWord]
> Oytun Tez
> M O T A W O R D | CTO & Co-Founder
> oy...@motaword.com
>
>   
>
>
> On Wed, May 6, 2020 at 5:37 PM m@xi  wrote:
>
>> Hello Tez,
>>
>> With all the respect, I doubt your answer is related the question.
>>
>> *Just to re-phase a bit*: Assuming we use SF for our application, how can
>> we
>> apply window logic when a function does some processing? *Is there a
>> proper
>> way?*
>>
>> @*Igal*: we would very much appreciate your answer. :)
>>
>> Best,
>> Max
>>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: checkpointing opening too many file

2020-05-07 Thread David Anderson
With the FsStateBackend you could also try increasing the value
of state.backend.fs.memory-threshold [1]. Only those state chunks that are
larger than this value are stored in separate files; smaller chunks go into
the checkpoint metadata file. The default is 1KB, increasing this
should reduce filesystem stress for small state.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-backend-fs-memory-threshold

Best,
David

On Wed, May 6, 2020 at 12:36 PM Congxian Qiu  wrote:

> Hi
>
> Yes, for your use case, if you do not have large state size, you can try
> to use FsStateBackend.
> Best,
> Congxian
>
>
> ysnakie  于2020年4月27日周一 下午3:42写道:
>
>> Hi
>> If I use FsStateBackend instead of RocksdbFsStateBackend, will the open
>> files decrease significantly? I dont have large state size.
>>
>> thanks
>> On 4/25/2020 13:48,Congxian Qiu
>>  wrote:
>>
>> Hi
>> If there are indeed so many files need to upload to hdfs, then currently
>> we do not have any solutions to limit the open files, there exist an
>> issue[1] wants to fix this problem, and a pr for it, maybe you can try the
>> attached pr to try it can solve your problem.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-11937
>> Best,
>> Congxian
>>
>>
>> ysnakie  于2020年4月24日周五 下午11:30写道:
>>
>>> Hi everyone
>>> We have a Flink Job to write files to HDFS's different directories. It
>>> will open many files due to its high parallelism. I also found that if
>>> using rocksdb state backend, it will have even more files open during the
>>> checkpointing.  We use yarn to schedule Flink job. However yarn always
>>> schedule taskmanagers to the same machine and I cannot control it! So the
>>> datanode will get very very high pressure and always throw a "bad link"
>>> error.  We hava already increase the xiceviers limit of HDFS to 16384
>>>
>>> Any idea to solve this problem? reduce the number of opening file or
>>> control the yarn scheduling to put taskmanager on different machines!
>>>
>>> Thank you very much!
>>> regards
>>>
>>> Shengnan
>>>
>>>


flink how to access remote hdfs using namenode nameservice

2020-05-07 Thread wangl...@geekplus.com.cn

According to 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html
 

I am deploying standalone  cluster with jobmanager HA and need the hdfs 
address:  

high-availability.storageDir: hdfs:///flink/recovery  

My hadoop is a remote cluster. I can write it as  
hdfs://active-namenode-ip:8020. But this way lost namenode HA

Is there's any method that I can config it as hdfs://name-service:8020 

Thanks,
Lei




wangl...@geekplus.com.cn 



Re: Statefun 2.0 questions

2020-05-07 Thread Igal Shilman
Hi Wouter!

Glad to read that you are using Flink for quite some time, and also
exploring with StateFun!

1) yes it is correct and you can follow the Dockerhub contribution PR at [1]

2) I’m not sure I understand what do you mean by trigger from the browser.
If you mean, for testing / illustration purposes triggering the function
independently of StateFun, you would need to write some JavaScript and
preform the POST (assuming CORS are enabled)
Let me know if you’d like getting further information of how to do it.
Broadly speaking, GET is traditionally used to get data from a resource and
POST to send data (the data is the invocation batch in our case).

One easier walk around for you would be to expose another endpoint in your
Flask application, and call your stateful function directly from there
(possibly populating the function argument with values taken from the query
params)

3) I would expect a performance loss when going from the embedded SDK to
the remote one, simply because the remote function is at a different
process, and a round trip is required. There are different ways of
deployment even for remote functions.
For example they can be co-located with the Task managers and communicate
via the loop back device /Unix domain socket, or they can be deployed
behind a load balancer with an auto-scaler, and thus reacting to higher
request rate/latency increases by spinning new instances (something that is
not yet supported with the embedded API)

Good luck,
Igal.





[1] https://github.com/docker-library/official-images/pull/7749


On Wednesday, May 6, 2020, Wouter Zorgdrager  wrote:

> Hi all,
>
> I've been using Flink for quite some time now and for a university project
> I'm planning to experiment with statefun. During the walkthrough I've run
> into some issues, I hope you can help me with.
>
> 1) Is it correct that the Docker image of statefun is not yet published? I
> couldn't find it anywhere, but was able to run it by building the image
> myself.
> 2) In the example project using the Python SDK, it uses Flask to expose a
> function using POST. Is there also a way to serve GET request so that you
> can trigger a stateful function by for instance using your browser?
> 3) Do you expect a lot of performance loss when using the Python SDK over
> Java?
>
> Thanks in advance!
>
> Regards,
> Wouter
>


Re: async IO in UDFs

2020-05-07 Thread Benchao Li
Hi,

AFAIK, there is no way to do this for now.
This needs the operators running UDFs to support async IO.

lec ssmi  于2020年5月7日周四 下午3:23写道:

> Hi:
> Is there any way to implements async IO  in UDFs  (scalar function,
> table function, aggregate function)?
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


async IO in UDFs

2020-05-07 Thread lec ssmi
Hi:
Is there any way to implements async IO  in UDFs  (scalar function,
table function, aggregate function)?


Re: Autoscaling vs backpressure

2020-05-07 Thread Arvid Heise
Hi Manish,

while you could use backpressure and the resulting consumer lag to throttle
the source and keep processing lag to a minimum, I'd personally see only
very limited value. It assumes that you have an architecture where you can
influence the input rate, which is probably only true if you generate data
or you have some kind of sampling.

You can use backpressure to build your own autoscaling though if you need
some solution right away. That would involve full restarts (from
savepoints) when resources are added/removed. There have been users that
implemented it in a general way, but it's quite an effort. See also some
talks of the virtual Flink forward from Netflix [1] and AWS [2].

There are currently different efforts in the community to introduce
autoscaling at various conceptual levels. Afaik none of them make it in the
upcoming 1.11 release, so there will be some Flink solution in fall, I
guess.

To workaround that, you could also define some alerts (backpressure over X%
for the last Y min) and scale out manually. Depending on how many Flink
clusters you have, I'd combine that with some oversizing to avoid doing the
scale out too often (or during nighttime). It just may be cheaper to let
some more resources run (costing let's say $50/day) than spend a month on
implementing some custom autoscaling when the community will provide a
solution in 4-5 months. I know, it's not an ideal situation but probably
the more pragmatic way.

[1]
https://www.youtube.com/watch?v=NV0jvA5ZDNc&list=PLDX4T_cnKjD0ngnBSU-bYGfgVv17MiwA7&index=44
[2]
https://www.youtube.com/watch?v=pIVmw1HyUqU&list=PLDX4T_cnKjD0ngnBSU-bYGfgVv17MiwA7&index=31

On Wed, May 6, 2020 at 4:46 PM Aljoscha Krettek  wrote:

> I'd say the two can't be considered equivalent because the back pressure
> does not "reach" back into the source system. It only goes as far back
> as the Flink source. So if the outside system produces data to fast into
> the queue from which Flink is reading this input would keep piling up.
>
> Best,
> Aljoscha
>
> On 06.05.20 07:05, Manish G wrote:
> > Hi,
> >
> > As flink doesn't provide  out-of-box support for autoscaling, can
> > backpressure be considered as an alternative to it?
> > Autoscaling allows us to add/remove nodes as load goes up/down.
> > With backpressure, if load goes up system would signal upstream to
> release
> > data slowly. So we don't need to add more hardware horizontally.
> > Is it correct conceptually and practically?
> >
> > Manish
> >
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng