Re:are there any ways to test the performance of rocksdb state backend?

2018-05-17 Thread sihua zhou


Hi makeyang,
there are some cases under 
_org.apache.flink.contrib.streaming.state.benchmark.*_ that you can refer to. 
But, I not sure whether it's possible to upgrade the RocksDB to any higher 
version because the regression of the merge operator, the comments in this PR 
https://github.com/apache/flink/pull/5937 may also give you some help.


Best, Sihua


On 05/18/2018 11:05,makeyang wrote:
I'd like to integrate newer version of rocksdb with flink. I'd like to know
if there are existing tools/ways to benchmark the performance of rocksdb
state backend to see if there are performence improve or drop?


MaKeyang
TIG.JD.COM




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


are there any ways to test the performance of rocksdb state backend?

2018-05-17 Thread makeyang
I'd like to integrate newer version of rocksdb with flink. I'd like to know
if there are existing tools/ways to benchmark the performance of rocksdb
state backend to see if there are performence improve or drop?


MaKeyang
TIG.JD.COM




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink list -r shows CANCELED jobs - Flink 1.5

2018-05-17 Thread Rong Rong
Hi Edward,

I dug a little into the CLIFrontend code and seems like there's some
discrepancies between the description and the result return from
flink list -r
I have documented the issue in [1]. Please feel free to comment if I missed
anything.

Thanks,
Rong

Reference:
[1] https://issues.apache.org/jira/browse/FLINK-9398

On Thu, May 17, 2018 at 4:48 PM, Rong Rong  wrote:

> This sounds like a bug to me, I can reproduce with the latest RC#3 of
> Flink 1.5 with:
>
> bin/start-cluster.sh
>> bin/flink run examples/streaming/WordCount.jar
>> bin/flink list -r
>>
>
> Would you please file a JIRA bug report? I will look into it
>
> --
> Rong
>
> On Thu, May 17, 2018 at 10:50 AM, Edward Rojas 
> wrote:
>
>> I forgot to add an example of the execution:
>>
>> $ ./bin/flink list -r
>> Waiting for response...
>> -- Running/Restarting Jobs ---
>> 17.05.2018 19:34:31 : edec969d6f9609455f9c42443b26d688 : FlinkAvgJob
>> (CANCELED)
>> 17.05.2018 19:36:01 : bd87ffc35e1521806928d6251990d715 : FlinkAvgJob
>> (RUNNING)
>>
>> Note that from the title it's supposed to return only Running or
>> Restarting
>> Jobs.
>>
>> I'm using this response on a script that updates a job by canceling with
>> savepoint... I just want to know if I have ti update my script :)
>>
>> Thanks in advance
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: flink list -r shows CANCELED jobs - Flink 1.5

2018-05-17 Thread Rong Rong
This sounds like a bug to me, I can reproduce with the latest RC#3 of Flink
1.5 with:

bin/start-cluster.sh
> bin/flink run examples/streaming/WordCount.jar
> bin/flink list -r
>

Would you please file a JIRA bug report? I will look into it

--
Rong

On Thu, May 17, 2018 at 10:50 AM, Edward Rojas 
wrote:

> I forgot to add an example of the execution:
>
> $ ./bin/flink list -r
> Waiting for response...
> -- Running/Restarting Jobs ---
> 17.05.2018 19:34:31 : edec969d6f9609455f9c42443b26d688 : FlinkAvgJob
> (CANCELED)
> 17.05.2018 19:36:01 : bd87ffc35e1521806928d6251990d715 : FlinkAvgJob
> (RUNNING)
>
> Note that from the title it's supposed to return only Running or Restarting
> Jobs.
>
> I'm using this response on a script that updates a job by canceling with
> savepoint... I just want to know if I have ti update my script :)
>
> Thanks in advance
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Message guarantees with S3 Sink

2018-05-17 Thread Gary Yao
Hi Amit,

The BucketingSink doesn't have well defined semantics when used with S3.
Data
loss is possible but I am not sure whether it is the only problem. There are
plans to rewrite the BucketingSink in Flink 1.6 to enable eventually
consistent
file systems [1][2].

Best,
Gary


[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/sink-with-BucketingSink-to-S3-files-override-td18433.html
[2] https://issues.apache.org/jira/browse/FLINK-6306

On Thu, May 17, 2018 at 11:57 AM, Amit Jain  wrote:

> Hi,
>
> We are using Flink to process click stream data from Kafka and pushing
> the same in 128MB file in S3.
>
> What is the message processing guarantees with S3 sink? In my
> understanding, S3A client buffers the data on memory/disk. In failure
> scenario on particular node, TM would not trigger Writer#close hence
> buffered data can lose entirely assuming this buffer contains data of
> last successful checkpointing.
>
> --
> Thanks,
> Amit
>


Re: flink list -r shows CANCELED jobs - Flink 1.5

2018-05-17 Thread Edward Rojas
I forgot to add an example of the execution:

$ ./bin/flink list -r
Waiting for response...
-- Running/Restarting Jobs ---
17.05.2018 19:34:31 : edec969d6f9609455f9c42443b26d688 : FlinkAvgJob
(CANCELED)
17.05.2018 19:36:01 : bd87ffc35e1521806928d6251990d715 : FlinkAvgJob
(RUNNING)

Note that from the title it's supposed to return only Running or Restarting
Jobs.

I'm using this response on a script that updates a job by canceling with
savepoint... I just want to know if I have ti update my script :)

Thanks in advance



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Message guarantees with S3 Sink

2018-05-17 Thread Amit Jain
Hi Rong,

We are using BucketingSink only. I'm looking for the case where TM
does not get the chance to call Writer#flush like YARN killed the TM
because of OOM. We have configured fs.s3.impl to
com.amazon.ws.emr.hadoop.fs.EmrFileSystem in core-site.xml, so
BucketingSink is using S3 client internally.

When we write data using S3A client, it buffers up the data in memory
or disk until it hit multipart file size or call to close of
OutputStream happens. Now suppose, S3A client buffers up 40MB data in
TM's local disk and same time checkpoint barrier comes in at Sink and
got successfully completed. Write process in sink resumes and now
buffer data size reaches to 60MB and now YARN killed the TM. What
would happen to original 40MB of data ?

--
Thanks,
Amit




On Thu, May 17, 2018 at 10:28 PM, Rong Rong  wrote:
> Hi Amit,
>
> Can you elaborate how you write using "S3 sink" and which version of Flink
> you are using?
>
> If you are using BucketingSink[1], you can checkout the API doc and
> configure to flush before closing your sink.
> This way your sink is "integrated with the checkpointing mechanism to
> provide exactly once semantics"[2]
>
> Thanks,
> Rong
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>
> On Thu, May 17, 2018 at 2:57 AM, Amit Jain  wrote:
>>
>> Hi,
>>
>> We are using Flink to process click stream data from Kafka and pushing
>> the same in 128MB file in S3.
>>
>> What is the message processing guarantees with S3 sink? In my
>> understanding, S3A client buffers the data on memory/disk. In failure
>> scenario on particular node, TM would not trigger Writer#close hence
>> buffered data can lose entirely assuming this buffer contains data of
>> last successful checkpointing.
>>
>> --
>> Thanks,
>> Amit
>
>


flink list -r shows CANCELED jobs - Flink 1.5

2018-05-17 Thread Edward Rojas
Hello all,

On Flink 1.5, the CLI returns the CANCELED jobs when requesting only the
running job by using the -r flag...
is this an intended behavior ?

On 1.4 CANCELED jobs does not appear when running this command.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Message guarantees with S3 Sink

2018-05-17 Thread Rong Rong
Hi Amit,

Can you elaborate how you write using "S3 sink" and which version of Flink
you are using?

If you are using BucketingSink[1], you can checkout the API doc and
configure to flush before closing your sink.
This way your sink is "integrated with the checkpointing mechanism to
provide exactly once semantics"[2]

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html

On Thu, May 17, 2018 at 2:57 AM, Amit Jain  wrote:

> Hi,
>
> We are using Flink to process click stream data from Kafka and pushing
> the same in 128MB file in S3.
>
> What is the message processing guarantees with S3 sink? In my
> understanding, S3A client buffers the data on memory/disk. In failure
> scenario on particular node, TM would not trigger Writer#close hence
> buffered data can lose entirely assuming this buffer contains data of
> last successful checkpointing.
>
> --
> Thanks,
> Amit
>


LocatableInputSplit flink and not Assigning properly

2018-05-17 Thread Daniel Tavares
So I have a cluster with one job manager and two task manager with the IP
192.168.112.74 and 192.168.112.75. Each task manager specific has specific
data I want to read and for that, I created LocatableInputSPlit one for
each task manager. I am using the LocatableInputSplitAssigner, however,
when I run the job it doesn't run on the host that I have on the
LocatableInputSPlit sometimes it runs both on the same task manager.
Should it run on the host I specified on the LocatableInputSplit?
Thanks in advance


-- 
Daniel Tavares


Re: Async Source Function in Flink

2018-05-17 Thread Federico D'Ambrosio
I see, thank you very much for your answer! I'll look into pool connection
handling.

Alternatively, I suppose that since it is a SourceFunction, even
synchronous calls may be used without side effects in Flink?

Thank you,
Federico

Il giorno mar 15 mag 2018 alle ore 16:16 Timo Walther 
ha scritto:

> Hi Frederico,
>
> Flink's AsyncFunction is meant for enriching a record with information
> that needs to be queried externally. So I guess you can't use it for your
> use case because an async call is initiated by the input. However, your
> custom SourceFunction could implement a similar asynchronous logic. By
> having a pool of open connections that request asynchronously and emit the
> response to the stream, once available, you can improve your throughput
> (see [0]).
>
> Depending on your use case maybe the SourceFunction can only be
> responsible for determining e.g. ids and the AsyncFunction is requesting
> these ids via REST. This way you could leverage the available async
> capabilities.
>
> I hope this helps.
>
> Regards,
> Timo
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/asyncio.html#the-need-for-asynchronous-io-operations
>
>
> Am 14.05.18 um 14:51 schrieb Federico D'Ambrosio:
>
> Hello everyone,
>
> just wanted to ask a quick question: I have to retrieve data from 2 web
> services via REST calls, use them as sources and push these data to Kafka.
> So far, I implemented a SourceFunction which deals with making the calls
> with the respective clients.
>
> Now, the function does use, for each REST call, Await.result(). Do I
> need to use Flink's AsyncFunction instead? What are the best practices when
> it comes to AsyncSources?
>
> Thank you,
> --
> Federico D'Ambrosio
>
>
>

-- 
Federico D'Ambrosio


Re: Missing MapState when Timer fires after restored state

2018-05-17 Thread Stefan Richter
Hi,

> 
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my 
> job? Based on my experience it seems like rescaling sometimes works, but then 
> you can have these random errors.

If there is a problem, I would still consider it a bug because it should work 
correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if 
> it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a 
"hidden feature“ until it got some more exposure and also some questions about 
the future of differences between savepoints and checkpoints are solved. 

> - We have sometimes had Flink jobs where the state has become so heavy that 
> cancelling with a savepoint times out & fails. Incremental checkpoints are 
> still working because they don't timeout as long as the state is growing 
> linearly. In that case if we want to scale up (for example to enable 
> successful savepoint creation ;) ), the only thing we can do is to restore 
> from the latest checkpoint. But then we have no way to scale up by increasing 
> the cluster size, because we can't create a savepoint with a smaller cluster 
> but on the other hand can't restore a checkpoint to a bigger cluster, if 
> rescaling from a checkpoint is not supposed to be relied on. So in this case 
> we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature 
in the near future.

Best,
Stefan



Message guarantees with S3 Sink

2018-05-17 Thread Amit Jain
Hi,

We are using Flink to process click stream data from Kafka and pushing
the same in 128MB file in S3.

What is the message processing guarantees with S3 sink? In my
understanding, S3A client buffers the data on memory/disk. In failure
scenario on particular node, TM would not trigger Writer#close hence
buffered data can lose entirely assuming this buffer contains data of
last successful checkpointing.

--
Thanks,
Amit