Hundreds of parallel jobs on Flink Cluster

2018-06-07 Thread Chirag Dewan
Hi,
I am coming across a use case where I may have to run more than100 parallel 
jobs(which may have different processing needs) on a Flink cluster. 
My flink cluster, currently, has 1 Job Manager and 4/5 Task Managers depending 
on the processing needed is running on a Kubernetes cluster with 3 worker 
nodes.   
I have few questions around this before I start looking at how I can execute 
this use case on such cluster configuration : 
1) Is there any limitation w.r.t  Job Manager while running 100 parallel jobs 
on a similar cluster? 2) Is there any limitation on the number of jobs that can 
be run on a cluster with 1 Job Manager? 3) Are there any recommendations around 
having hundreds of jobs on a single cluster?4) Would I need multiple clusters 
for such a use case or are there any tuning parameters which can help for eg. 
Akka tuning parameters? 
Thanks in advance,
Chirag  

Having a backoff while experiencing checkpointing failures

2018-06-07 Thread vipul singh
Hello all,

Are there any recommendations on using a backoff when experiencing
checkpointing failures?
What we have seen is when a checkpoint starts to expire, the next
checkpoint dosent care about the previous failure, and starts soon after.
We experimented with *min_pause_between_checkpoints*, however that seems
only to work for successful checkpoints( the same is discussed on this
thread

)

Are there any recommendations on how to have a backoff or is there
something in works to add a backoff incase of checkpointing failures? This
seems very valuable incase of checkpointing on an external location like
s3, where one can be potentially throttled or gets errors like
TooBusyException from s3(for example like in this jira
)

Please let us know!
Thanks,
Vipul


Stopping of a streaming job empties state store on HDFS

2018-06-07 Thread Peter Zende
Hi all,

We have a streaming pipeline (Flink 1.4.2) for which we implemented
stoppable sources to be able to  gracefully exit from the job with Yarn
state "finished/succeeded".
This works fine, however after creating a savepoint, stopping the job (stop
event) and restarting it we remarked that the RocksDB state hasn't been
recovered. It looks like that it's because the state directory on HDFS was
emptied after issueing a stop event. This isn't the case when we cancel the
job, but we'd like to distinguish between job failures and stop events.
After reading some related tickets (e.g. FLINK-4201, FLINK-5007) it's still
not clear why this is the intended behavior.
Should we use cancel instead?

When we backup the local state directory, stop the job, copy back the
directory and start a new job from the savepoint then it works fine.
Another issue is that when we restart the job with different source (1st
job: HDFS and Kafka, 2nd job: Kafka), each having uids set, the recovery
from savepoint doesn't fail but the local state isn't restored. Is there
any trick besides setting allowNonRestoredState?

Many thanks,
Peter


Re: IoT Use Case, Problem and Thoughts

2018-06-07 Thread Fabian Hueske
Hi Ashish,

Thanks for the great write up.
If I understood you correctly, there are two different issues that are
caused by the disabled checkpointing.

1) Recovery from a failure without restarting all operators to preserve the
state in the running tasks
2) Planned restarts an application without losing all state (even with
disabled checkpointing).

Ad 1) The community is constantly working on reducing the time for
checkpointing and recovery.
For 1.5, local task recovery was added, which basically stores a state copy
on the local disk which is read in case of a recovery. So, tasks are
restarted but don't read the to restore state from distributed storage but
from the local disk.
AFAIK, this can only be used together with remote checkpoints. I think this
might be an interesting option for you if it would be possible to write
checkpoints only to local disk and not remote storage. AFAIK, there are
also other efforts to reduce the number of restarted tasks in case of a
failure. I guess, you've played with other features such as
RocksDBStateBackend, incremental and async checkpoints already.

Ad 2) It sounds as if savepoints are exactly the feature your are looking
for. It would be good to know what exactly did not work for you. The
MemoryStateBackend is not suitable for large state sizes because it backups
into the heap memory of the JobManager.

Best, Fabian

2018-06-05 21:57 GMT+02:00 ashish pok :

> Fabian, Stephan, All,
>
> I started a discussion a while back around having a form of event-based
> checkpointing policy that will help us in some of our high volume data
> pipelines. Here is an effort to put this in front of community and
> understand what capabilities can support these type of use cases, how much
> others feel the same need and potentially a feature that can make it to a
> user story.
>
> *Use Case Summary:*
> - Extremely high volume of data (events from consumer devices with
> customer base of over 100M)
> - Multiple events need to be combined using a windowing streaming app
> grouped by keys (something like 5 min floor of timestamp and unique
> identifiers for customer devices)
> - "Most" events by a group/key arrive in few seconds if not milliseconds
> however events can sometimes delay or get lost in transport (so delayed
> event handling and timeouts will be needed)
> - Extremely low (pretty vague but hopefully details below clarify it more)
> data loss is acceptable
> - Because of the volume and transient nature of source, checkpointing is
> turned off (saves on writes to persistence as states/sessions are active
> for only few seconds during processing)
>
> *Problem Summary:*
> Of course, none of the above is out of the norm for Flink and as a matter
> of factor we already have a Flink app doing this. The issue arises when it
> comes to graceful shutdowns and on operator failures (eg: Kafka timeouts
> etc.) On operator failures, entire job graph restarts which essentially
> flushes out in-memory states/sessions. I think there is a feature in works
> (not sure if it made it to 1.5) to perform selective restarts which will
> control the damage but still will result in data loss. Also, it doesn't
> help when application restarts are needed. We did try going savepoint route
> for explicit restart needs but I think MemoryBackedState ran into issues
> for larger states or something along those line(not certain). We obviously
> cannot recover an operator that actually fails because it's own state could
> be unrecoverable. However, it feels like Flink already has a lot of
> plumbing to help with overall problem of allowing some sort of recoverable
> state to handle graceful shutdowns and restarts with minimal data loss.
>
> *Solutions:*
> Some in community commented on my last email with decent ideas like having
> an event-based checkpointing trigger (on shutdown, on restart etc) or
> life-cycle hooks (onCancel, onRestart etc) in Functions that can be
> implemented if this type of behavior is needed etc.
>
> Appreciate feedback from community on how useful this might be for others
> and from core contributors on their thoughts as well.
>
> Thanks in advance, Ashish
>
>


Re: Conceptual question

2018-06-07 Thread Tony Wei
Hi Piotrek,

So my question is: is that feasible to migrate state from `ProcessFunction`
to my own operator then use `getKeyedStateBackend()` to migrate the states?
If yes, is there anything I need to be careful with? If no, why and can it
be available in the future? Thank you.

Best Regards,
Tony Wei

2018-06-07 21:43 GMT+08:00 Piotr Nowojski :

> Hi,
>
> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the
> function and you can not migrate your state that way.
>
> As far as I know yes, at the moment in order to convert everything at once
> (without getKeyes you still can implement lazy conversion) you would have
> to write your own operator.
>
> Piotrek
>
>
> On 7 Jun 2018, at 15:26, Tony Wei  wrote:
>
> Hi Piotrek,
>
> I used `ProcessFunction` to implement it, but it seems that I can't call
> `getKeyedStateBackend()` like `WindowOperator` did.
> I found that `getKeyedStateBackend()` is the method in
> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
> Dose that mean I can't look up all keys and migrate the entire previous
> states to the new states in `ProcessFunction#open()`?
> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator`
> to migration state like the manner showed in `WindowOperator`?
>
> Best Regards,
> Tony Wei
>
> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski :
>
>> What function are you implementing and how are you using it?
>>
>> Usually it’s enough if your function implements RichFunction (or rather
>> extend from AbstractRichFunction) and then you could use RichFunction#open
>> in the similar manner as in the code that I posted in previous message.
>> Flink in many places performs instanceof chekcs like:
>> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
>>
>> public static void openFunction(Function fun
>> ction, Configuration parameters) throws Exception{
>>if (function instanceof RichFunction) {
>>   RichFunction richFunction = (RichFunction) function;
>>   richFunction.open(parameters);
>>}
>> }
>>
>> Piotrek
>>
>>
>> On 7 Jun 2018, at 11:07, Tony Wei  wrote:
>>
>> Hi Piotrek,
>>
>> It seems that this was implemented by `Operator` API, which is a more low
>> level api compared to `Function` API.
>> Since in `Function` API level we can only migrate state by event
>> triggered, it is more convenient in this way to migrate state by foreach
>> all keys in `open()` method.
>> If I was implemented state operator by `ProcessFunction` API, is it
>> possible to port it to `KeyedProcessOperator` and do the state migration
>> that you mentioned?
>> And are there something concerned and difficulties that will leads to
>> restored state failed or other problems? Thank you!
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski :
>>
>>> Hi,
>>>
>>> General solution for state/schema migration is under development and it
>>> might be released with Flink 1.6.0.
>>>
>>> Before that, you need to manually handle the state migration in your
>>> operator’s open method. Lets assume that your OperatorV1 has a state field
>>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible
>>> with previous version. What you can do, is to add a logic in open method,
>>> to check:
>>> 1. If “stateV2” is non empty, do nothing
>>> 2. If there is no “stateV2”, iterate over all of the keys and manually
>>> migrate “stateV1” to “stateV2”
>>>
>>> In your OperatorV3 you could drop the support for “stateV1”.
>>>
>>> I have once implemented something like that here:
>>>
>>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7
>>> acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/
>>> apache/flink/streaming/runtime/operators/windowing/WindowOpe
>>> rator.java#L258
>>>
>>> Hope that helps!
>>>
>>> Piotrek
>>>
>>>
>>> On 6 Jun 2018, at 17:04, TechnoMage  wrote:
>>>
>>> We are still pretty new to Flink and I have a conceptual / DevOps
>>> question.
>>>
>>> When a job is modified and we want to deploy the new version, what is
>>> the preferred method?  Our jobs have a lot of keyed state.
>>>
>>> If we use snapshots we have old state that may no longer apply to the
>>> new pipeline.
>>> If we start a new job we can reprocess historical data from Kafka, but
>>> that can be very resource heavy for a while.
>>>
>>> Is there an option I am missing?  Are there facilities to “patch” or
>>> “purge” selectively the keyed state?
>>>
>>> Michael
>>>
>>>
>>>
>>
>>
>
>


Re: Conceptual question

2018-06-07 Thread Piotr Nowojski
Hi,

Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the 
function and you can not migrate your state that way.

As far as I know yes, at the moment in order to convert everything at once 
(without getKeyes you still can implement lazy conversion) you would have to 
write your own operator.

Piotrek

> On 7 Jun 2018, at 15:26, Tony Wei  wrote:
> 
> Hi Piotrek,
> 
> I used `ProcessFunction` to implement it, but it seems that I can't call 
> `getKeyedStateBackend()` like `WindowOperator` did.
> I found that `getKeyedStateBackend()` is the method in 
> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
> Dose that mean I can't look up all keys and migrate the entire previous 
> states to the new states in `ProcessFunction#open()`?
> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to 
> migration state like the manner showed in `WindowOperator`? 
> 
> Best Regards,
> Tony Wei
> 
> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski  >:
> What function are you implementing and how are you using it?
> 
> Usually it’s enough if your function implements RichFunction (or rather 
> extend from AbstractRichFunction) and then you could use RichFunction#open in 
> the similar manner as in the code that I posted in previous message. Flink in 
> many places performs instanceof chekcs like: 
> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
> 
> public static void openFunction(Function function, Configuration parameters) 
> throws Exception{
>if (function instanceof RichFunction) {
>   RichFunction richFunction = (RichFunction) function;
>   richFunction.open(parameters);
>}
> }
> 
> Piotrek
> 
> 
>> On 7 Jun 2018, at 11:07, Tony Wei > > wrote:
>> 
>> Hi Piotrek,
>> 
>> It seems that this was implemented by `Operator` API, which is a more low 
>> level api compared to `Function` API.
>> Since in `Function` API level we can only migrate state by event triggered, 
>> it is more convenient in this way to migrate state by foreach all keys in 
>> `open()` method.
>> If I was implemented state operator by `ProcessFunction` API, is it possible 
>> to port it to `KeyedProcessOperator` and do the state migration that you 
>> mentioned?
>> And are there something concerned and difficulties that will leads to 
>> restored state failed or other problems? Thank you!
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski > >:
>> Hi,
>> 
>> General solution for state/schema migration is under development and it 
>> might be released with Flink 1.6.0.
>> 
>> Before that, you need to manually handle the state migration in your 
>> operator’s open method. Lets assume that your OperatorV1 has a state field 
>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible 
>> with previous version. What you can do, is to add a logic in open method, to 
>> check:
>> 1. If “stateV2” is non empty, do nothing
>> 2. If there is no “stateV2”, iterate over all of the keys and manually 
>> migrate “stateV1” to “stateV2”
>> 
>> In your OperatorV3 you could drop the support for “stateV1”.
>> 
>> I have once implemented something like that here:
>> 
>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
>>  
>> 
>> 
>> Hope that helps!
>> 
>> Piotrek
>> 
>> 
>>> On 6 Jun 2018, at 17:04, TechnoMage >> > wrote:
>>> 
>>> We are still pretty new to Flink and I have a conceptual / DevOps question.
>>> 
>>> When a job is modified and we want to deploy the new version, what is the 
>>> preferred method?  Our jobs have a lot of keyed state.
>>> 
>>> If we use snapshots we have old state that may no longer apply to the new 
>>> pipeline.
>>> If we start a new job we can reprocess historical data from Kafka, but that 
>>> can be very resource heavy for a while.
>>> 
>>> Is there an option I am missing?  Are there facilities to “patch” or 
>>> “purge” selectively the keyed state?
>>> 
>>> Michael
>> 
>> 
> 
> 



Re: Conceptual question

2018-06-07 Thread Tony Wei
Hi Piotrek,

I used `ProcessFunction` to implement it, but it seems that I can't call
`getKeyedStateBackend()` like `WindowOperator` did.
I found that `getKeyedStateBackend()` is the method in
`AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
Dose that mean I can't look up all keys and migrate the entire previous
states to the new states in `ProcessFunction#open()`?
As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to
migration state like the manner showed in `WindowOperator`?

Best Regards,
Tony Wei

2018-06-07 20:28 GMT+08:00 Piotr Nowojski :

> What function are you implementing and how are you using it?
>
> Usually it’s enough if your function implements RichFunction (or rather
> extend from AbstractRichFunction) and then you could use RichFunction#open
> in the similar manner as in the code that I posted in previous message.
> Flink in many places performs instanceof chekcs like: org.apache.flink.api.
> common.functions.util.FunctionUtils#openFunction
>
> public static void openFunction(Function function, Configuration
> parameters) throws Exception{
>if (function instanceof RichFunction) {
>   RichFunction richFunction = (RichFunction) function;
>   richFunction.open(parameters);
>}
> }
>
> Piotrek
>
>
> On 7 Jun 2018, at 11:07, Tony Wei  wrote:
>
> Hi Piotrek,
>
> It seems that this was implemented by `Operator` API, which is a more low
> level api compared to `Function` API.
> Since in `Function` API level we can only migrate state by event
> triggered, it is more convenient in this way to migrate state by foreach
> all keys in `open()` method.
> If I was implemented state operator by `ProcessFunction` API, is it
> possible to port it to `KeyedProcessOperator` and do the state migration
> that you mentioned?
> And are there something concerned and difficulties that will leads to
> restored state failed or other problems? Thank you!
>
> Best Regards,
> Tony Wei
>
> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski :
>
>> Hi,
>>
>> General solution for state/schema migration is under development and it
>> might be released with Flink 1.6.0.
>>
>> Before that, you need to manually handle the state migration in your
>> operator’s open method. Lets assume that your OperatorV1 has a state field
>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible
>> with previous version. What you can do, is to add a logic in open method,
>> to check:
>> 1. If “stateV2” is non empty, do nothing
>> 2. If there is no “stateV2”, iterate over all of the keys and manually
>> migrate “stateV1” to “stateV2”
>>
>> In your OperatorV3 you could drop the support for “stateV1”.
>>
>> I have once implemented something like that here:
>>
>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7
>> acd03cb3f95c000926b2/flink-streaming-java/src/main/java/
>> org/apache/flink/streaming/runtime/operators/windowing/Wi
>> ndowOperator.java#L258
>>
>> Hope that helps!
>>
>> Piotrek
>>
>>
>> On 6 Jun 2018, at 17:04, TechnoMage  wrote:
>>
>> We are still pretty new to Flink and I have a conceptual / DevOps
>> question.
>>
>> When a job is modified and we want to deploy the new version, what is the
>> preferred method?  Our jobs have a lot of keyed state.
>>
>> If we use snapshots we have old state that may no longer apply to the new
>> pipeline.
>> If we start a new job we can reprocess historical data from Kafka, but
>> that can be very resource heavy for a while.
>>
>> Is there an option I am missing?  Are there facilities to “patch” or
>> “purge” selectively the keyed state?
>>
>> Michael
>>
>>
>>
>
>


Re: Conceptual question

2018-06-07 Thread Piotr Nowojski
What function are you implementing and how are you using it?

Usually it’s enough if your function implements RichFunction (or rather extend 
from AbstractRichFunction) and then you could use RichFunction#open in the 
similar manner as in the code that I posted in previous message. Flink in many 
places performs instanceof chekcs like: 
org.apache.flink.api.common.functions.util.FunctionUtils#openFunction

public static void openFunction(Function function, Configuration parameters) 
throws Exception{
   if (function instanceof RichFunction) {
  RichFunction richFunction = (RichFunction) function;
  richFunction.open(parameters);
   }
}

Piotrek

> On 7 Jun 2018, at 11:07, Tony Wei  wrote:
> 
> Hi Piotrek,
> 
> It seems that this was implemented by `Operator` API, which is a more low 
> level api compared to `Function` API.
> Since in `Function` API level we can only migrate state by event triggered, 
> it is more convenient in this way to migrate state by foreach all keys in 
> `open()` method.
> If I was implemented state operator by `ProcessFunction` API, is it possible 
> to port it to `KeyedProcessOperator` and do the state migration that you 
> mentioned?
> And are there something concerned and difficulties that will leads to 
> restored state failed or other problems? Thank you!
> 
> Best Regards,
> Tony Wei
> 
> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski  >:
> Hi,
> 
> General solution for state/schema migration is under development and it might 
> be released with Flink 1.6.0.
> 
> Before that, you need to manually handle the state migration in your 
> operator’s open method. Lets assume that your OperatorV1 has a state field 
> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible 
> with previous version. What you can do, is to add a logic in open method, to 
> check:
> 1. If “stateV2” is non empty, do nothing
> 2. If there is no “stateV2”, iterate over all of the keys and manually 
> migrate “stateV1” to “stateV2”
> 
> In your OperatorV3 you could drop the support for “stateV1”.
> 
> I have once implemented something like that here:
> 
> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
>  
> 
> 
> Hope that helps!
> 
> Piotrek
> 
> 
>> On 6 Jun 2018, at 17:04, TechnoMage > > wrote:
>> 
>> We are still pretty new to Flink and I have a conceptual / DevOps question.
>> 
>> When a job is modified and we want to deploy the new version, what is the 
>> preferred method?  Our jobs have a lot of keyed state.
>> 
>> If we use snapshots we have old state that may no longer apply to the new 
>> pipeline.
>> If we start a new job we can reprocess historical data from Kafka, but that 
>> can be very resource heavy for a while.
>> 
>> Is there an option I am missing?  Are there facilities to “patch” or “purge” 
>> selectively the keyed state?
>> 
>> Michael
> 
> 



[flink-connector-filesystem] OutOfMemory in checkpointless environment

2018-06-07 Thread Rinat
Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, 
using Bucketing-Sink.
For some reasons, those jobs are running without checkpointing. For now, it not 
a big problem for us, if some files are remained opened in case of job 
reloading.

Periodically, those jobs fail with OutOfMemory exception, and seems, that I 
found a strange thing in the implementation of BucketingSink.

During the sink lifecycle, we have a state object, implemented as a map, where 
key is a bucket path, and value is a state, that contains information about 
opened files and list of pending files.
After researching of the heap dump, I found, that those state stores 
information about ~ 1_000 buckets and their state, all this stuff weights ~ 120 
Mb.

I’ve looked through the code, and found, that we removing the buckets from the 
state, in notifyCheckpointComplete method. 

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
  Iterator>> bucketStatesIt = 
state.bucketStates.entrySet().iterator();
  while (bucketStatesIt.hasNext()) {
   if (!bucketState.isWriterOpen &&
   bucketState.pendingFiles.isEmpty() &&
   bucketState.pendingFilesPerCheckpoint.isEmpty()) {

   // We've dealt with all the pending files and the writer for this bucket 
is not currently open.
   // Therefore this bucket is currently inactive and we can remove it from 
our state.
   bucketStatesIt.remove();
}
}
}

So, this looks like an issue, when you are using this sink in checkpointless 
environment, because the data always added to the state, but never removed.
Of course, we could enabled checkpointing, and use one of available backends, 
but as for me, it seems like a non expected behaviour, like I have an 
opportunity to run the job without checkpointing, but really, if I do so,
I got an exception in sink component.

What do you think about this ? Do anyone got the same problem, and how’ve you 
solved it ?

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: Conceptual question

2018-06-07 Thread Tony Wei
Hi Piotrek,

It seems that this was implemented by `Operator` API, which is a more low
level api compared to `Function` API.
Since in `Function` API level we can only migrate state by event triggered,
it is more convenient in this way to migrate state by foreach all keys in
`open()` method.
If I was implemented state operator by `ProcessFunction` API, is it
possible to port it to `KeyedProcessOperator` and do the state migration
that you mentioned?
And are there something concerned and difficulties that will leads to
restored state failed or other problems? Thank you!

Best Regards,
Tony Wei

2018-06-07 16:10 GMT+08:00 Piotr Nowojski :

> Hi,
>
> General solution for state/schema migration is under development and it
> might be released with Flink 1.6.0.
>
> Before that, you need to manually handle the state migration in your
> operator’s open method. Lets assume that your OperatorV1 has a state field
> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible
> with previous version. What you can do, is to add a logic in open method,
> to check:
> 1. If “stateV2” is non empty, do nothing
> 2. If there is no “stateV2”, iterate over all of the keys and manually
> migrate “stateV1” to “stateV2”
>
> In your OperatorV3 you could drop the support for “stateV1”.
>
> I have once implemented something like that here:
>
> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f9
> 5c000926b2/flink-streaming-java/src/main/java/org/apache/
> flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
>
> Hope that helps!
>
> Piotrek
>
>
> On 6 Jun 2018, at 17:04, TechnoMage  wrote:
>
> We are still pretty new to Flink and I have a conceptual / DevOps question.
>
> When a job is modified and we want to deploy the new version, what is the
> preferred method?  Our jobs have a lot of keyed state.
>
> If we use snapshots we have old state that may no longer apply to the new
> pipeline.
> If we start a new job we can reprocess historical data from Kafka, but
> that can be very resource heavy for a while.
>
> Is there an option I am missing?  Are there facilities to “patch” or
> “purge” selectively the keyed state?
>
> Michael
>
>
>


Re: Conceptual question

2018-06-07 Thread Piotr Nowojski
Hi,

General solution for state/schema migration is under development and it might 
be released with Flink 1.6.0.

Before that, you need to manually handle the state migration in your operator’s 
open method. Lets assume that your OperatorV1 has a state field “stateV1”. Your 
OperatorV2 defines field “stateV2”, which is incompatible with previous 
version. What you can do, is to add a logic in open method, to check:
1. If “stateV2” is non empty, do nothing
2. If there is no “stateV2”, iterate over all of the keys and manually migrate 
“stateV1” to “stateV2”

In your OperatorV3 you could drop the support for “stateV1”.

I have once implemented something like that here:

https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
 


Hope that helps!

Piotrek

> On 6 Jun 2018, at 17:04, TechnoMage  wrote:
> 
> We are still pretty new to Flink and I have a conceptual / DevOps question.
> 
> When a job is modified and we want to deploy the new version, what is the 
> preferred method?  Our jobs have a lot of keyed state.
> 
> If we use snapshots we have old state that may no longer apply to the new 
> pipeline.
> If we start a new job we can reprocess historical data from Kafka, but that 
> can be very resource heavy for a while.
> 
> Is there an option I am missing?  Are there facilities to “patch” or “purge” 
> selectively the keyed state?
> 
> Michael



Re: Extending stream events with a an aggregate value

2018-06-07 Thread Piotr Nowojski
Hi,

Ńo worries :) You probably need to write your own process function to do 
exactly that, maybe something like this:

DataStream> test;

DataStream> max = test.keyBy(0)
  .process(new KeyedProcessFunction, 
Tuple3>() {
 public ValueState max;

 @Override
 public void open(Configuration parameters) throws Exception {
ValueStateDescriptor descriptor =
  new ValueStateDescriptor<>("max", TypeInformation.of(new 
TypeHint() {
  }));
sum = getRuntimeContext().getState(descriptor);
 }

 @Override
 public void processElement(Tuple2 value, Context ctx, 
Collector> out) throws Exception {
// ...
 }
  });

You need to store max on the state if you care about recovering from 
failures/restarts without loosing previous max value. Please check the online 
documentation for ProcessFunction and handling state in Flink :)

Piotrek

> On 6 Jun 2018, at 15:55, Nicholas Walton  wrote:
> 
> I’m sure I’m being a complete idiot, since this seems so trivial but if 
> someone could point me in the right direction I’d be very grateful.
> 
> I have a simple data stream [(Int, Double)] keyed on the Int. I can calculate 
> the running max of the stream no problem using “.max(2)”. But I want to 
> output the original input together with the running max value as [(Int, 
> Double, Double)]. I’ve hunted high and low for a means to do something so 
> trivial.
> 
> Nick Walton



Re: Question about JVM exit caused by timeout exception with the asynchronous IO of flink 1.4.2

2018-06-07 Thread Piotr Nowojski
Hi,

You can increase a timeout, that’s one way to tackle it. 

In Flink 1.6.0 there will be possibility to override default Flink’s behaviour 
regarding handling timeouts:
https://issues.apache.org/jira/browse/FLINK-7789 
 to handle them, instead of 
out right failing.

Also if you can not wait for the new release, you always could copy 
AsyncWaitOperator with AsyncFunction into your code base and apply the changes 
from the above mentioned ticket 
(https://github.com/apache/flink/pull/6091/files 
)

Piotrek

> On 6 Jun 2018, at 10:39, 陈卓  wrote:
> 
> HI
> The asynchronous IO of flink 1.4.2 will throw timeout exception when the 
> timeout setting is one second and the invoke time setting is greater than 
> twenty seconds. Unfortunately the timeout exception cannot be captured, which 
> leads to abnormal exit of the process. So my question is how to deal with 
> this situation to keep the jvm executing.
>  
> the exception info:
> 
>  
> code as follows
> 
>  
> 
> 
>  
>  
> 
>  
>  
> -- 
> Thanks
> zhuo chen



Re: Datastream[Row] covert to table exception

2018-06-07 Thread Timo Walther
Sorry, I didn't see you last mail. The code looks good actually. What is 
the result of `inputStream.getType` if you print it to the console?


Timo

Am 07.06.18 um 08:24 schrieb Timo Walther:

Hi,

Row is a very special datatype where Flink cannot generate serializers 
based on the generics. By default DeserializationSchema uses 
reflection-based type analysis, you need to override the 
getResultType() method in WormholeDeserializationSchema. And specify 
the type information manually there.


Hope this helps.

Regards,
Timo

Am 06.06.18 um 13:22 schrieb 孙森:

Hi ,

I've tried to to specify such a schema, when I read from kafka, and 
covert inputstream to table . But I got the exception:


  * Exception in thread "main"
org.apache.flink.table.api.TableException: An input of
GenericTypeInfo cannot be converted to Table. Please specify the
type of the input with a RowTypeInfo

And the code here:


|private def getSchemaMap(jsonSchema: String) = { val umsSchema = 
JsonUtils.json2caseClass[UmsSchema](jsonSchema) val fields = 
umsSchema.fields_get val fieldNameList = ListBuffer.empty[String] val 
fieldTypeList = ListBuffer.empty[TypeInformation[_]] fields.foreach { 
field => fieldNameList.append(field.name) 
fieldTypeList.append(fieldTypeMatch(field.`type`)) } 
println(fieldNameList) println(fieldTypeList) (fieldNameList.toArray, 
fieldTypeList.toArray) } private def fieldTypeMatch(umsFieldType: 
UmsFieldType): TypeInformation[_] = { umsFieldType match { case 
STRING => Types.STRING case INT => Types.INT case LONG => Types.LONG 
case FLOAT => Types.FLOAT case DOUBLE => Types.DOUBLE case BOOLEAN => 
Types.BOOLEAN case DATE => Types.SQL_DATE case DATETIME => 
Types.SQL_TIMESTAMP case DECIMAL => Types.DECIMAL } } } val 
myConsumer: FlinkKafkaConsumer010[Row] = new 
FlinkKafkaConsumer010(topics, new 
WormholeDeserializationSchema(jsonSchema), properties) val 
inputStream: DataStream[Row] = env.addSource(myConsumer) val tableEnv 
= TableEnvironment.getTableEnvironment(env)<<—exception here|




Thanks !
sen







Re: Datastream[Row] covert to table exception

2018-06-07 Thread Timo Walther

Hi,

Row is a very special datatype where Flink cannot generate serializers 
based on the generics. By default DeserializationSchema uses 
reflection-based type analysis, you need to override the getResultType() 
method in WormholeDeserializationSchema. And specify the type 
information manually there.


Hope this helps.

Regards,
Timo

Am 06.06.18 um 13:22 schrieb 孙森:

Hi ,

I've tried to to specify such a schema, when I read from kafka, and 
covert inputstream to table . But I got the exception:


  * Exception in thread "main"
org.apache.flink.table.api.TableException: An input of
GenericTypeInfo cannot be converted to Table. Please specify the
type of the input with a RowTypeInfo

And the code here:


|private def getSchemaMap(jsonSchema: String) = { val umsSchema = 
JsonUtils.json2caseClass[UmsSchema](jsonSchema) val fields = 
umsSchema.fields_get val fieldNameList = ListBuffer.empty[String] val 
fieldTypeList = ListBuffer.empty[TypeInformation[_]] fields.foreach { 
field => fieldNameList.append(field.name) 
fieldTypeList.append(fieldTypeMatch(field.`type`)) } 
println(fieldNameList) println(fieldTypeList) (fieldNameList.toArray, 
fieldTypeList.toArray) } private def fieldTypeMatch(umsFieldType: 
UmsFieldType): TypeInformation[_] = { umsFieldType match { case STRING 
=> Types.STRING case INT => Types.INT case LONG => Types.LONG case 
FLOAT => Types.FLOAT case DOUBLE => Types.DOUBLE case BOOLEAN => 
Types.BOOLEAN case DATE => Types.SQL_DATE case DATETIME => 
Types.SQL_TIMESTAMP case DECIMAL => Types.DECIMAL } } } val 
myConsumer: FlinkKafkaConsumer010[Row] = new 
FlinkKafkaConsumer010(topics, new 
WormholeDeserializationSchema(jsonSchema), properties) val 
inputStream: DataStream[Row] = env.addSource(myConsumer) val tableEnv 
= TableEnvironment.getTableEnvironment(env)<<—exception here|




Thanks !
sen