Re: Custom(application) Metrics - Piggyback on Flink's metrics infra or not?

2016-09-21 Thread Eswar Reddy
Thank you Chesnay. Good to know there are few wrappers available to get
best of both worlds. I may mostly go without piggybacking though to have
more control and learning for now, but I will keep an eye for new benefits
I will get in future via piggybacking.
The UDF point looks like a deal breaker, I will spend some more time
understanding it( we can get Flink's runtime using this

inside the UDF so by 'variables' you must have meant the metrics object
that gets passed around)

@Sumit, Adding backend writers(reporter) is as much simple in DropWizard as
well. Thanks for bringing it up though.

Thanks,
Eswar.

On Tue, Sep 20, 2016 at 11:33 PM, Chawla,Sumit 
wrote:

> In addition, It supports enabling multiple Reporters.  You can have same
> data pushed to multiple systems.  Plus its very easy to write new reporter
> for doing any customization.
>
>
> Regards
> Sumit Chawla
>
>
> On Tue, Sep 20, 2016 at 2:10 AM, Chesnay Schepler 
> wrote:
>
>> Hello Eswar,
>>
>> as far as I'm aware the general structure of the Flink's metric system is
>> rather similar to DropWizard. You can use DropWizard metrics by creating a
>> simple wrapper, we even ship one for Histograms. Furthermore, you can also
>> use DropWizard reporters, you only have to extend the DropWizardReporter
>> class, essentially providing a factory method for your reporter.
>>
>> Using Flinks infrastructure provides the following benefits:
>> * better resource usage, as only a single reporter instance per
>> taskmanager exists
>> * access to system metrics
>> * namespace stuff; you cannot access all variables yourselves from a UDF
>> without modifying the source of Flink; whether this is an advantage is of
>> course dependent on what you are interested in
>>
>> Regards,
>> Chesnay
>>
>>
>> On 20.09.2016 08:29, Eswar Reddy wrote:
>>
>> Hi,
>>
>> I see Flink support's built-in metrics to monitor various components of
>> Flink. In addition, one can register application specific(custom) metrics
>> to Flink's built-in metrics infra. The problem with this is user has to
>> develop his custom metrics using Flink's metrics framework/API rather than
>> a generic framework such as dropwizard. Alternatively, user can follow
>> this
>> 
>> approach where his   dropwizard metrics push code is co-located with actual
>> app code within each Task and metrics are directly pushed to a backend
>> writer(say, Graphite) from each Task.
>>
>> In this alternative, I am aware of having to handle mapping spatial
>> granularity of Flink's run-time with metrics namespace, but doing it myself
>> should not a big effort. Fault-tolerance comes automatically since app code
>> and metrics push code are co-located in the Task. Is there anything else
>> Flink's metrics infra handles automatically? Based on this I'd weigh using
>> good old dropwizard vs Flink specific metrics framework.
>>
>> Finally, I guess feasibility an automatic dropwizard-to-flinkmetrics
>> translation utility can be checked out, but I would like to first
>> understand additional benefits of using flink's infra for custom metrics.
>>
>> Thanks,
>> Eswar.
>>
>>
>>
>


How can I prove ....

2016-09-21 Thread amir bahmanyari
That all nodes in a Flink Cluster are involved simultaneously in processing the 
data?Programmatically, graphically...I need to stress CPU , MEM and all 
resources to their max.How can I guarantee this is happening in Flink 
Cluster?Out of 4 nodes, this is the highest resource usage I see from 
"top"...Everything else is not even close...top - 22:22:45 up 41 days,  2:39,  
1 user,  load average: 1.76, 1.55, 1.28Tasks: 344 total,   1 running, 343 
sleeping,   0 stopped,   0 zombie%Cpu(s):  5.4 us,  1.0 sy,  0.0 ni, 93.5 id,  
0.1 wa,  0.0 hi,  0.0 si,  0.0 stKiB Mem:  11551564+total, 65702020 used, 
49813632 free,   115072 buffersKiB Swap:        0 total,        0 used,        
0 free.  3148420 cached Mem

I am pretty sure I can push FlinkRunner to way more extent than thisAnd 
thats where true realistic perf numbers start showing up.Thanks+regards,Amir-

Re: Flink job fails to restore RocksDB state after upgrading to 1.2-SNAPSHOT

2016-09-21 Thread Josh
Hi Stephan,

Thanks for the reply. I should have been a bit clearer but actually I was
not trying to restore a 1.0 savepoint with 1.2. I ran my job on 1.2 from
scratch (starting with no state), then took a savepoint and tried to
restart it from the savepoint - and that's when I get this exception. If I
do this with the same job using an older version of Flink (1.1-SNAPSHOT
taken in June), the savepoint and restore works fine.

I wanted to use 1.2-SNAPSHOT because it has some changes I'd like to use
(improvements to Kinesis connector + the bucketing sink). Anyway for now I
have things working with an older version of Flink - but it would be good
to know what's changed recently that's causing the restore to break and if
my job is not going to be compatible with future releases of Flink.

Best,
Josh

On Wed, Sep 21, 2016 at 8:21 PM, Stephan Ewen  wrote:

> Hi Josh!
>
> The state code on 1.2-SNAPSHOT is undergoing pretty heavy changes right
> now, in order to add the elasticity feature (change parallelism or running
> jobs and still maintaining exactly once guarantees).
> At this point, no 1.0 savepoints can be restored in 1.2-SNAPSHOT. We will
> try and add compatibility towards 1.1 savepoints before the release of
> version 1.2.
>
> I think the exception is probably caused by the fact that old savepoint
> stored some serialized user code (the new one is not expected to) which
> cannot be loaded.
>
> Adding Aljoscha and Stefan to this, to see if they can add anything.
> In any case, this should have a much better error message.
>
> I hope you understand that the 1.2-SNAPSHOT version is in some parts WIP,
> so not really recommended for general use.
>
> Does version 1.1 not work for you?
>
> Greetings,
> Stephan
>
>
> On Wed, Sep 21, 2016 at 7:44 PM, Josh  wrote:
>
>> Hi,
>>
>> I have a Flink job which uses the RocksDBStateBackend, which has been
>> running on a Flink 1.0 cluster.
>>
>> The job is written in Scala, and I previously made some changes to the
>> job to ensure that state could be restored. For example, whenever I call
>> `map` or `flatMap` on a DataStream, I pass a named class: `.flatMap(new
>> MyCustomFlatMapper())` instead of an anonymous function.
>>
>> I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able
>> to restore state. I'm seeing exceptions which look like this when trying to
>> restore from a savepoint:
>>
>> java.lang.RuntimeException: Could not initialize keyed state backend.
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor.open(AbstractStreamOperator.java:148)
>> Caused by: java.lang.ClassNotFoundException:
>> com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2
>> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>> ckend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDB
>> KeyedStateBackend.java:653)
>>
>> I'm not passing any anonymous functions to `map` or `flatMap` on Flink
>> DataStreams, so it looks like this exception is caused just from using
>> Scala functions like `filter`, `map`, `flatMap` on standard Scala
>> collections, within my class `MyCustomFlatMapper`.
>>
>> Are there any changes to the way Flink state is restored or to
>> RocksDBStateBackend, in the last 2-3 months, which could cause this to
>> happen?
>>
>> If so, any advice on fixing it?
>>
>> I'm hoping there's a better solution to this than rewriting the Flink job
>> in Java.
>>
>> Thanks,
>>
>> Josh
>>
>
>


Re: Flink job fails to restore RocksDB state after upgrading to 1.2-SNAPSHOT

2016-09-21 Thread Stephan Ewen
Hi Josh!

The state code on 1.2-SNAPSHOT is undergoing pretty heavy changes right
now, in order to add the elasticity feature (change parallelism or running
jobs and still maintaining exactly once guarantees).
At this point, no 1.0 savepoints can be restored in 1.2-SNAPSHOT. We will
try and add compatibility towards 1.1 savepoints before the release of
version 1.2.

I think the exception is probably caused by the fact that old savepoint
stored some serialized user code (the new one is not expected to) which
cannot be loaded.

Adding Aljoscha and Stefan to this, to see if they can add anything.
In any case, this should have a much better error message.

I hope you understand that the 1.2-SNAPSHOT version is in some parts WIP,
so not really recommended for general use.

Does version 1.1 not work for you?

Greetings,
Stephan


On Wed, Sep 21, 2016 at 7:44 PM, Josh  wrote:

> Hi,
>
> I have a Flink job which uses the RocksDBStateBackend, which has been
> running on a Flink 1.0 cluster.
>
> The job is written in Scala, and I previously made some changes to the job
> to ensure that state could be restored. For example, whenever I call `map`
> or `flatMap` on a DataStream, I pass a named class: `.flatMap(new
> MyCustomFlatMapper())` instead of an anonymous function.
>
> I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able
> to restore state. I'm seeing exceptions which look like this when trying to
> restore from a savepoint:
>
> java.lang.RuntimeException: Could not initialize keyed state backend.
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.open(
> AbstractStreamOperator.java:148)
> Caused by: java.lang.ClassNotFoundException: com.joshfg.flink.job.MyJob$
> MyCustomFlatMapper$$anon$4$$anon$2
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBRestoreOperation.restoreKVStateMetaData(
> RocksDBKeyedStateBackend.java:653)
>
> I'm not passing any anonymous functions to `map` or `flatMap` on Flink
> DataStreams, so it looks like this exception is caused just from using
> Scala functions like `filter`, `map`, `flatMap` on standard Scala
> collections, within my class `MyCustomFlatMapper`.
>
> Are there any changes to the way Flink state is restored or to
> RocksDBStateBackend, in the last 2-3 months, which could cause this to
> happen?
>
> If so, any advice on fixing it?
>
> I'm hoping there's a better solution to this than rewriting the Flink job
> in Java.
>
> Thanks,
>
> Josh
>


Flink job fails to restore RocksDB state after upgrading to 1.2-SNAPSHOT

2016-09-21 Thread Josh
Hi,

I have a Flink job which uses the RocksDBStateBackend, which has been
running on a Flink 1.0 cluster.

The job is written in Scala, and I previously made some changes to the job
to ensure that state could be restored. For example, whenever I call `map`
or `flatMap` on a DataStream, I pass a named class: `.flatMap(new
MyCustomFlatMapper())` instead of an anonymous function.

I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able to
restore state. I'm seeing exceptions which look like this when trying to
restore from a savepoint:

java.lang.RuntimeException: Could not initialize keyed state backend.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.open(AbstractStreamOperator.java:148)
Caused by: java.lang.ClassNotFoundException:
com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:653)

I'm not passing any anonymous functions to `map` or `flatMap` on Flink
DataStreams, so it looks like this exception is caused just from using
Scala functions like `filter`, `map`, `flatMap` on standard Scala
collections, within my class `MyCustomFlatMapper`.

Are there any changes to the way Flink state is restored or to
RocksDBStateBackend, in the last 2-3 months, which could cause this to
happen?

If so, any advice on fixing it?

I'm hoping there's a better solution to this than rewriting the Flink job
in Java.

Thanks,

Josh


Re: Parallelism vs task manager allocation

2016-09-21 Thread Greg Hogan
Is the query stream also a Flink job? Is this use case not supported by
keeping state within a single Flink job?
  https://ci.apache.org/projects/flink/flink-docs-master/dev/state.html

FLINK-3779 recently added "queryable state" to allow external processes
access to operator state.
  https://issues.apache.org/jira/browse/FLINK-3779

On Wed, Sep 21, 2016 at 12:19 AM, pushpendra.jaiswal <
pushpendra.jaiswa...@gmail.com> wrote:

> Hi Greg
> My sink is a hashmap(acting as db store), I am reading from this hashmap
> from other query stream. I need one instance / slot per task manager of
> sink, so that everyone refers the same instance/slot.
> So is there any way so that I can restrict 1 slot per task manager.
>
> Thanks
> Pushpendra Jaiswal
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Parallelism-vs-
> task-manager-allocation-tp9084p9105.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: How to stop job through java API

2016-09-21 Thread Aljoscha Krettek
Hi,
right now this is not possible, I'm afraid.

I'm looping in Max who has done some work in that direction. Maybe he's got
something to say.

Cheers,
Aljoscha

On Wed, 14 Sep 2016 at 03:54 Will Du  wrote:

> Hi folks,
> How to stop link job given job_id through java API?
> Thanks,
> Will
>


Re: Discard message LeaderSessionMessage(null,ConnectionTimeout)

2016-09-21 Thread Aljoscha Krettek
Hi,
the log message about the leader session should be unrelated to Kafka. What
exactly do you mean by "fails to read"? You don't get elements? Or it fails
with some message?

Cheers,
Aljoscha

On Tue, 13 Sep 2016 at 17:10 Simone Robutti 
wrote:

> Hello,
>
> while running a job on Flink 1.1.2 on a cluster of 3 nodes using the
> KafkaProducer010, I encounter this error:
>
> WARN  org.apache.flink.runtime.client.JobClientActor-
> Discard message LeaderSessionMessage(null,ConnectionTimeout) because the
> expected leader session ID 4a1c16fe-d015-4351-81ea-814796e2167f did not
> equal the received leader session ID null.
>
> The KafkaProducer010 works locally but on the cluster fails to read from
> the topic. I can correctly read and write using console producer and
> consumer so I don't think it's misconfigured.
>
> I would like to know what could be the cause of the error, if it is
> related to the issue I'm facing on the flink-kafka connector and maybe if
> it's due to the upgrade to the version 0.10. I know the producer is still
> in a PR but it showed to work properly in a local environment.
>
> Thank you,
>
> Simone
>