Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-26 Thread Jamie Grier
This is awesome, Stephan!  Thanks for doing this.

-Jamie


On Tue, Feb 26, 2019 at 9:29 AM Stephan Ewen  wrote:

> Here is the pull request with a draft of the roadmap:
> https://github.com/apache/flink-web/pull/178
>
> Best,
> Stephan
>
> On Fri, Feb 22, 2019 at 5:18 AM Hequn Cheng  wrote:
>
>> Hi Stephan,
>>
>> Thanks for summarizing the great roadmap! It is very helpful for users
>> and developers to track the direction of Flink.
>> +1 for putting the roadmap on the website and update it per release.
>>
>> Besides, would be great if the roadmap can add the UpsertSource
>> feature(maybe put it under 'Batch Streaming Unification').
>> It has been discussed a long time ago[1,2] and is moving forward step by
>> step.
>> Currently, Flink can only emit upsert results. With the UpsertSource, we
>> can make our system a more complete one.
>>
>> Best, Hequn
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-TABLE-How-to-handle-empty-delete-for-UpsertSource-td23856.html#a23874
>> [2] https://issues.apache.org/jira/browse/FLINK-8545
>> 
>>
>>
>>
>> On Fri, Feb 22, 2019 at 3:31 AM Rong Rong  wrote:
>>
>>> Hi Stephan,
>>>
>>> Yes. I completely agree. Jincheng & Jark gave some very valuable
>>> feedbacks and suggestions and I think we can definitely move the
>>> conversation forward to reach a more concrete doc first before we put in to
>>> the roadmap. Thanks for reviewing it and driving the roadmap effort!
>>>
>>> --
>>> Rong
>>>
>>> On Thu, Feb 21, 2019 at 8:50 AM Stephan Ewen  wrote:
>>>
 Hi Rong Rong!

 I would add the security / kerberos threads to the roadmap. They seem
 to be advanced enough in the discussions so that there is clarity what will
 come.

 For the window operator with slicing, I would personally like to see
 the discussion advance and have some more clarity and consensus on the
 feature before adding it to the roadmap. Not having that in the first
 version of the roadmap does not mean there will be no activity. And when
 the discussion advances well in the next weeks, we can update the roadmap
 soon.

 What do you think?

 Best,
 Stephan


 On Thu, Feb 14, 2019 at 5:46 PM Rong Rong  wrote:

> Hi Stephan,
>
> Thanks for the clarification, yes I think these issues has already
> been discussed in previous mailing list threads [1,2,3].
>
> I also agree that updating the "official" roadmap every release is a
> very good idea to avoid frequent update.
> One question I might've been a bit confusion is: are we suggesting to
> keep one roadmap on the documentation site (e.g. [4]) per release, or
> simply just one most up-to-date roadmap in the main website [5] ?
> Just like the release notes in every release, the former will probably
> provide a good tracker for users to look back at previous roadmaps as well
> I am assuming.
>
> Thanks,
> Rong
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
> [3]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html
>
> [4] https://ci.apache.org/projects/flink/flink-docs-release-1.7/
> [5] https://flink.apache.org/
>
> On Thu, Feb 14, 2019 at 2:26 AM Stephan Ewen  wrote:
>
>> I think the website is better as well.
>>
>> I agree with Fabian that the wiki is not so visible, and visibility
>> is the main motivation.
>> This type of roadmap overview would not be updated by everyone -
>> letting committers update the roadmap means the listed threads are 
>> actually
>> happening at the moment.
>>
>>
>> On Thu, Feb 14, 2019 at 11:14 AM Fabian Hueske 
>> wrote:
>>
>>> Hi,
>>>
>>> I like the idea of putting the roadmap on the website because it is
>>> much more visible (and IMO more credible, obligatory) there.
>>> However, I share the concerns about frequent updates.
>>>
>>> It think it would be great to update the "official" roadmap on the
>>> website once per release (-bugfix releases), i.e., every three month.
>>> We can use the wiki to collect and draft the roadmap for the next
>>> update.
>>>
>>> Best, Fabian
>>>
>>>
>>> Am Do., 14. Feb. 2019 um 11:03 Uhr schrieb Jeff Zhang <
>>> zjf...@gmail.com>:
>>>
 Hi Stephan,

 Thanks for this proposal. It is a good idea to track the roadmap.
 One suggestion is that it might be better to put it into wiki page 
 first.
 Because it is easier to update the roadmap on wiki compared to on 
 flink web
 site. 

Re: About KafkaConsumer and WM'ing and EventTime charactersitics

2019-01-30 Thread Jamie Grier
Vishal, that answer to your question about IngestionTime is "no".
Ingestion time in this context means the time the data was read by Flink
not the time it was written to Kafka.

To get the effect you're looking for you have to set
TimeCharacteristic.EventTime and follow the instructions here:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010

You still need the code you provided in your original email above and you
also have to do:

FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config =
FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps,
topic, new SimpleStringSchema(),
standardProps);config.setWriteTimestampToKafka(true);




On Wed, Jan 30, 2019 at 2:45 AM Vishal Santoshi 
wrote:

> Thank you. This though is a little different.
>
> The producer of the kafka message attaches a time stamp
> https://issues.apache.org/jira/browse/KAFKA-2511.  I do not see how I can
> get to that timestamp through a any stream abstraction over
> FlinkKafkaConsumer  API even though it is available here
> https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
> being used here
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141
>
> All I want to do is this
>
> * Pull from kafka topic . This topic is been written too with a time stamp
> on each kafka record.
> * Write to hdfs using StreamingSink BUT make buckets that * honor
> ingestion time's  water mark. *
>
> Questions is,
>
> *If  we have TimeCharacteristic as IngestionTime,  does the context's
> watermark  in   getBucketId(KafkaRecord element, Context context)
> in BucketAssigner.html
> 
>  reflect the kafka record time stamp in
> https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
> 
> given this "*automatic timestamp assignment and automatic watermark
> generation." is done if  *TimeCharacteristic is **IngestionTime*  (* here
> )*
>
>
> Regards.
>
>
>
>
>
>
>
>
>
>
>
>
> On Tue, Jan 29, 2019 at 8:42 PM Congxian Qiu 
> wrote:
>
>> Hi Vishal
>>  May this doc[1] be helpful for you.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>> Best,
>> Congxian
>>
>>
>> Vishal Santoshi  于2019年1月30日周三 上午4:36写道:
>>
>>> It seems from
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
>>> that iTimeCharacteristic.IngestionTime should do the trick.
>>>
>>> Just wanted to confirm that the ingestion time is the event time
>>> provided by the kafka producer.
>>>
>>> On Tue, Jan 29, 2019 at 3:21 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
  In case where one needs t to use kafka event time ( ingestion time )
 for watermark generation and timestamp extraction is setting
 EventTimeCharactersitic  as EventTime enough ?

 Or is this  explicit code required ?

 consumer.assignTimestampsAndWatermarks(new 
 AssignerWithPunctuatedWatermarks() {
 @Nullable
 @Override
 public Watermark checkAndGetNextWatermark(KafkaRecord lastElement, 
 long extractedTimestamp) {
 return new Watermark(extractedTimestamp);
 }

 @Override
 public long extractTimestamp(KafkaRecord element, long 
 previousElementTimestamp) {
 return previousElementTimestamp;
 }
 });








Re: Flink Yarn Cluster - Jobs Isolation

2019-01-29 Thread Jamie Grier
Run each job individually as described here:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn

Yes they will run concurrently and be completely isolated from each other.

-Jamie


On Sun, Jan 27, 2019 at 6:08 AM Eran Twili 
wrote:

> Hi,
>
>
>
> In my company we are interesting in running Flink jobs on a Yarn cluster
> (on AWS EMR).
>
> We understand that there are 2 ways ('modes') to execute Flink jobs on a
> yarn cluster.
>
> We *must have the jobs run concurrently!*
>
> From what we understand so far those are the options:
>
>1. Start a long running yarn session, to which we'll send jobs.
>2. Run each job as a 'single job'.
>
> We searched the web to understand the difference and consequences of each
> option,
>
> (We read threw flink-yarn-setup
> 
> and FLIP6
> ,
> along many other references),
>
> but couldn't find clear comprehensive info.
>
>
>
> In the 'session' mode:
>
>1. Does running multiple jobs in single session means there's no job
>isolation?
>2. All jobs will run on the same jvm?
>3. Can we define different classpath for each job in this mode?
>
> In the 'single job' mode:
>
>1. Can we run multiple jobs concurrently?
>2. Is there a complete job isolation by default or do we need to
>configure it (different jvm/classpath)?
>
>
>
> Overall, what will be the different implications in aspects of resource
> management, security, and monitoring?
>
> Another question: what is the difference between multiple sessions of a
> single job vs multiple 'single job' executions?
>
>
>
> We'll be very thankful if someone could provide some answers or reference
> to a comprehensive documentation on those subjects.
>
>
>
> Regards,
>
> Eran
>
>
>
>
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-18 Thread Jamie Grier
Hmm..  I would have to look into the code for the StreamingFileSink more
closely to understand the concern but typically you should not be concerned
at all with *when* checkpoints happen.  They are meant to be a completely
asynchronous background process that has absolutely no bearing on
application semantics.  The output should be thought of as a stream rather
than a snapshot.

Can you rework the downstream consumer of the output data such that you
don't have to worry about this?  It would just read all the files in order
and worry about which data rows are in which files.

Anyway, maybe Kostas can add more since he wrote the StreamingFileSink
code.  I've cc'd him directly.

-Jamie


On Fri, Jan 18, 2019 at 9:44 AM Cristian C  wrote:

> Well, the problem is that, conceptually, the way I'm trying to approach
> this is ok. But in practice, it has some edge cases.
>
> So back to my original premise: if you both, trigger and checkpoint happen
> around the same time, there is a chance that the streaming file sink rolls
> the bucket BEFORE it has received all the data. In other words, it would
> create incomplete snapshots of the table.
>
> Keep in mind that every snapshot is written to a different folder. And
> they are supposed to represent the state of the whole table at a point in
> time.
>
> On Fri, Jan 18, 2019, 8:26 AM Jamie Grier 
>> Oh sorry..  Logically, since the ContinuousProcessingTimeTrigger never
>> PURGES but only FIRES what I said is semantically true.  The window
>> contents are never cleared.
>>
>> What I missed is that in this case since you're using a function that
>> incrementally reduces on the fly rather than processing all the data when
>> it's triggered your state is always kept to one element per key.  Your'e
>> correct but in general with non-incremental window functions the state
>> would grow unbounded in this configuration.
>>
>> So it looks like your approach should work just fine.
>>
>> -Jamie
>>
>>
>>
>> On Thu, Jan 17, 2019 at 10:18 PM knur  wrote:
>>
>>> Hello Jamie.
>>>
>>> Thanks for taking a look at this. So, yes, I want to write only the last
>>> data for each key every X minutes. In other words, I want a snapshot of
>>> the
>>> whole database every X minutes.
>>>
>>> >  The issue is that the window never get's PURGED so the data just
>>> > continues to accumulate in the window.  This will grow without bound.
>>>
>>> The window not being purged does not necessarily mean that the data will
>>> be
>>> accumulated indefinitely. How so? Well, Flink has two mechanisms to
>>> remove
>>> data from a window: triggering a FIRE/FIRE_AND_PURGE or using an evictor.
>>>
>>> The reduce function has an implicit evictor that automatically removes
>>> events from the window pane that are no longer needed. i.e. it keeps in
>>> state only the element that was reduced. Here is an example:
>>>
>>> env.socketTextStream("localhost", )
>>>   .keyBy { it.first().toString() }
>>>   .window(GlobalWindows.create())
>>>
>>> .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
>>>   .reduce { left, right ->
>>> println("left: $left, right: $right")
>>> if (left.length > right.length) {
>>>   left
>>> } else {
>>>   right
>>> }
>>>   }
>>>   .printToErr()
>>>
>>> For your claim to hold true, every time the trigger fires one would
>>> expect
>>> to see ALL the elements by a key being printed over and over again in the
>>> reduce function. However, if you run a job similar to this one in your
>>> lang
>>> of choice, you will notice that the print statement is effectively called
>>> only once per event per key.
>>>
>>> In fact, not using purge is intentional. Because I want to hold every
>>> record
>>> (the last one by its primary key) of the database in state so that I can
>>> write a snapshot of the whole database.
>>>
>>> So for instance, let's say my table has two columns: id and time. And I
>>> have
>>> the following events:
>>>
>>> 1,January
>>> 2,February
>>> 1,March
>>>
>>> I want to write to S3 two records: "1,March", and "2,February".
>>>
>>> Now, let's say two more events come into the stream:
>>>
>>> 3,April
>>> 1,June
>>>
>>> Then I want to write to S3 three records: "1,June", "2,February" and
>>> "3,April".
>>>
>>> In other words, I can't just purge the windows, because I would lose the
>>> record with id 2.
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-18 Thread Jamie Grier
Sorry my earlier comment should read: "It would just read all the files in
order and NOT worry about which data rows are in which files"

On Fri, Jan 18, 2019 at 10:00 AM Jamie Grier  wrote:

> Hmm..  I would have to look into the code for the StreamingFileSink more
> closely to understand the concern but typically you should not be concerned
> at all with *when* checkpoints happen.  They are meant to be a completely
> asynchronous background process that has absolutely no bearing on
> application semantics.  The output should be thought of as a stream rather
> than a snapshot.
>
> Can you rework the downstream consumer of the output data such that you
> don't have to worry about this?  It would just read all the files in order
> and worry about which data rows are in which files.
>
> Anyway, maybe Kostas can add more since he wrote the StreamingFileSink
> code.  I've cc'd him directly.
>
> -Jamie
>
>
> On Fri, Jan 18, 2019 at 9:44 AM Cristian C 
> wrote:
>
>> Well, the problem is that, conceptually, the way I'm trying to approach
>> this is ok. But in practice, it has some edge cases.
>>
>> So back to my original premise: if you both, trigger and checkpoint
>> happen around the same time, there is a chance that the streaming file sink
>> rolls the bucket BEFORE it has received all the data. In other words, it
>> would create incomplete snapshots of the table.
>>
>> Keep in mind that every snapshot is written to a different folder. And
>> they are supposed to represent the state of the whole table at a point in
>> time.
>>
>> On Fri, Jan 18, 2019, 8:26 AM Jamie Grier >
>>> Oh sorry..  Logically, since the ContinuousProcessingTimeTrigger never
>>> PURGES but only FIRES what I said is semantically true.  The window
>>> contents are never cleared.
>>>
>>> What I missed is that in this case since you're using a function that
>>> incrementally reduces on the fly rather than processing all the data when
>>> it's triggered your state is always kept to one element per key.  Your'e
>>> correct but in general with non-incremental window functions the state
>>> would grow unbounded in this configuration.
>>>
>>> So it looks like your approach should work just fine.
>>>
>>> -Jamie
>>>
>>>
>>>
>>> On Thu, Jan 17, 2019 at 10:18 PM knur  wrote:
>>>
>>>> Hello Jamie.
>>>>
>>>> Thanks for taking a look at this. So, yes, I want to write only the last
>>>> data for each key every X minutes. In other words, I want a snapshot of
>>>> the
>>>> whole database every X minutes.
>>>>
>>>> >  The issue is that the window never get's PURGED so the data just
>>>> > continues to accumulate in the window.  This will grow without bound.
>>>>
>>>> The window not being purged does not necessarily mean that the data
>>>> will be
>>>> accumulated indefinitely. How so? Well, Flink has two mechanisms to
>>>> remove
>>>> data from a window: triggering a FIRE/FIRE_AND_PURGE or using an
>>>> evictor.
>>>>
>>>> The reduce function has an implicit evictor that automatically removes
>>>> events from the window pane that are no longer needed. i.e. it keeps in
>>>> state only the element that was reduced. Here is an example:
>>>>
>>>> env.socketTextStream("localhost", )
>>>>   .keyBy { it.first().toString() }
>>>>   .window(GlobalWindows.create())
>>>>
>>>>
>>>> .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
>>>>   .reduce { left, right ->
>>>> println("left: $left, right: $right")
>>>> if (left.length > right.length) {
>>>>   left
>>>> } else {
>>>>   right
>>>> }
>>>>   }
>>>>   .printToErr()
>>>>
>>>> For your claim to hold true, every time the trigger fires one would
>>>> expect
>>>> to see ALL the elements by a key being printed over and over again in
>>>> the
>>>> reduce function. However, if you run a job similar to this one in your
>>>> lang
>>>> of choice, you will notice that the print statement is effectively
>>>> called
>>>> only once per event per key.
>>>>
>>>> In fact, not using purge is intentional. Because I want to hold every
>>>> record
>>>> (the last one by its primary key) of the database in state so that I can
>>>> write a snapshot of the whole database.
>>>>
>>>> So for instance, let's say my table has two columns: id and time. And I
>>>> have
>>>> the following events:
>>>>
>>>> 1,January
>>>> 2,February
>>>> 1,March
>>>>
>>>> I want to write to S3 two records: "1,March", and "2,February".
>>>>
>>>> Now, let's say two more events come into the stream:
>>>>
>>>> 3,April
>>>> 1,June
>>>>
>>>> Then I want to write to S3 three records: "1,June", "2,February" and
>>>> "3,April".
>>>>
>>>> In other words, I can't just purge the windows, because I would lose the
>>>> record with id 2.
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-18 Thread Jamie Grier
Oh sorry..  Logically, since the ContinuousProcessingTimeTrigger never
PURGES but only FIRES what I said is semantically true.  The window
contents are never cleared.

What I missed is that in this case since you're using a function that
incrementally reduces on the fly rather than processing all the data when
it's triggered your state is always kept to one element per key.  Your'e
correct but in general with non-incremental window functions the state
would grow unbounded in this configuration.

So it looks like your approach should work just fine.

-Jamie



On Thu, Jan 17, 2019 at 10:18 PM knur  wrote:

> Hello Jamie.
>
> Thanks for taking a look at this. So, yes, I want to write only the last
> data for each key every X minutes. In other words, I want a snapshot of the
> whole database every X minutes.
>
> >  The issue is that the window never get's PURGED so the data just
> > continues to accumulate in the window.  This will grow without bound.
>
> The window not being purged does not necessarily mean that the data will be
> accumulated indefinitely. How so? Well, Flink has two mechanisms to remove
> data from a window: triggering a FIRE/FIRE_AND_PURGE or using an evictor.
>
> The reduce function has an implicit evictor that automatically removes
> events from the window pane that are no longer needed. i.e. it keeps in
> state only the element that was reduced. Here is an example:
>
> env.socketTextStream("localhost", )
>   .keyBy { it.first().toString() }
>   .window(GlobalWindows.create())
>
> .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
>   .reduce { left, right ->
> println("left: $left, right: $right")
> if (left.length > right.length) {
>   left
> } else {
>   right
> }
>   }
>   .printToErr()
>
> For your claim to hold true, every time the trigger fires one would expect
> to see ALL the elements by a key being printed over and over again in the
> reduce function. However, if you run a job similar to this one in your lang
> of choice, you will notice that the print statement is effectively called
> only once per event per key.
>
> In fact, not using purge is intentional. Because I want to hold every
> record
> (the last one by its primary key) of the database in state so that I can
> write a snapshot of the whole database.
>
> So for instance, let's say my table has two columns: id and time. And I
> have
> the following events:
>
> 1,January
> 2,February
> 1,March
>
> I want to write to S3 two records: "1,March", and "2,February".
>
> Now, let's say two more events come into the stream:
>
> 3,April
> 1,June
>
> Then I want to write to S3 three records: "1,June", "2,February" and
> "3,April".
>
> In other words, I can't just purge the windows, because I would lose the
> record with id 2.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Any advice on how to replay an event-timed stream?

2019-01-18 Thread Jamie Grier
So, do you mean to have your application running in real-time and use the
same instance of it to also process historical data at the same time?

If that's the case then I would advise not to try to do it that way.  What
I would recommend instead is to process that historical data with another
instance of the application.

If this isn't what you're trying to accomplish please be more thorough in
your explanation..  Thanks.

-Jamie


On Thu, Jan 17, 2019 at 10:34 PM Kanstantsin Kamkou 
wrote:

> Thanks for the reply. As mentioned before the data comes from the
> database. Timestams are from one months ago. And I’m searching a way on how
> to dump this data into a working flink application which already processed
> this data (watermarks are far away from those dates).
>
> On Fri 18. Jan 2019 at 03:22, Jamie Grier  wrote:
>
>> I don't think I understood all of your question but with regard to the
>> watermarking and keys..  You are correct that watermarking (event time
>> advancement) is not per key.  Event-time is a local property of each Task
>> in an executing Flink job.  It has nothing to do with keys.  It has only to
>> do with the input data timestamps seen by each task and the watermarking
>> function (which isn't per-key).
>>
>> I hope that helps.
>>
>> With regard to how to play historical data..  Well there are many ways to
>> approach that.  Can you narrow down your constraints?  Where does the
>> historical data live?
>>
>> -Jamie
>>
>>
>>
>>
>> On Thu, Jan 17, 2019 at 4:36 PM Kanstantsin Kamkou 
>> wrote:
>>
>>> Hi guys! As I understood (I hope I’m wrong) the current design concept
>>> of the watermarking mechanism is that it tight to the latest watermark and
>>> there is no way to separate those watermarks by key in keyed stream (I hope
>>> at some point it’l be mentioned in the documentation as it unfortunately
>>> misleading). Could you share your thoughts on how to replay historical data
>>> in event–time manner (i.e. from db to working application)? The solution
>>> with the processing time is not suitable here as the sessions windows are
>>> needed.
>>>
>>> Thank you!
>>>
>>
>
> --
> Best regards, Kanstantsin Kamkou
> email:  kkam...@gmail.com
> web: http://2ka.by/
> mobile: +49 172 5432334
> skype: kkamkou
>


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-18 Thread Jamie Grier
I'm not sure if this is required.  It's quite convenient to be able to just
grab a single tarball and you've got everything you need.

I just did this for the latest binary release and it was 273MB and took
about 25 seconds to download.  Of course I know connection speeds vary
quite a bit but I don't think 273 MB seems onerous to download and I like
the simplicity of it the way it is.



On Fri, Jan 18, 2019 at 3:34 AM Fabian Hueske  wrote:

> Hi Chesnay,
>
> Thank you for the proposal.
> I think this is a good idea.
> We follow a similar approach already for Hadoop dependencies and
> connectors (although in application space).
>
> +1
>
> Fabian
>
> Am Fr., 18. Jan. 2019 um 10:59 Uhr schrieb Chesnay Schepler <
> ches...@apache.org>:
>
>> Hello,
>>
>> the binary distribution that we release by now contains quite a lot of
>> optional components, including various filesystems, metric reporters and
>> libraries. Most users will only use a fraction of these, and as such
>> pretty much only increase the size of flink-dist.
>>
>> With Flink growing more and more in scope I don't believe it to be
>> feasible to ship everything we have with every distribution, and instead
>> suggest more of a "pick-what-you-need" model, where flink-dist is rather
>> lean and additional components are downloaded separately and added by
>> the user.
>>
>> This would primarily affect the /opt directory, but could also be
>> extended to cover flink-dist. For example, the yarn and mesos code could
>> be spliced out into separate jars that could be added to lib manually.
>>
>> Let me know what you think.
>>
>> Regards,
>>
>> Chesnay
>>
>>


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-17 Thread Jamie Grier
If I'm understanding you correctly you're just trying to do some data
reduction so that you write data for each key once every five minutes
rather than for every CDC update..  Is that correct?  You also want to keep
the state for most recent key you've ever seen so you don't apply writes
out of order.

The code you've provided isn't quite right AFAICT.  The issue is that the
window never get's PURGED so the data just continues to accumulate in the
window.  This will grow without bound.

My advise would be to take a look at ProcessFunction and write one that
does exactly what you want rather than messing around with windows and
triggers for this use case.  It will be much simpler in the end.

-Jamie




On Thu, Jan 17, 2019 at 4:32 PM knur  wrote:

> Hello there.
>
> So we have some Postgres tables that are mutable, and we want to create a
> snapshot of them in S3 every X minutes. So we plan to use Debezium to send
> a
> CDC log of every row change into a Kafka topic, and then have Flink keep
> the
> latest state of each row to save that data into S3 subsequently.
>
> Our current job looks like this and works somehow well in most cases:
>
>// checkpoint interval is set to run every 10 minutes
>
> kafkaSource
>   .keyB { it.id }
>   .window(GlobalWindows.create())
>   .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.minutes(5)))
>   .reduce { left, right ->
> if (left.timestamp() > right.timestamp()) {
>   left
> } else {
>   right
> }
>   }
>   .addSink(StreamingFileSink
> .forBulkFormat(Path(outputDir),
> ParquetAvroWriters.forGenericRecord(avroSchema))
>
>
> .withBucketAssigner(DateTimeBucketAssignerr("'date='-MM-dd/'hour='HH/'minute='mm"))
> .build())
>
> We use `GlobalWindows.create()` because we want to hold in Flink's state
> ALL
> the changes send into Kafka (the reduce function, according to the docs,
> will make sure to evict all events except the last one).
>
> This works, but we know there could be some edge cases. For instance, if
> the
> trigger fires around the same time that a checkpoint, we could get into a
> position where StreamingFileSink rolls an incomplete set of all the events
> triggered.
>
> So a couple of questions:
>
> 1. Is there a way to mark the events with the timestamp of the trigger that
> fired them?
> 2. Is the approach we took fine? (keep in mind that we will deal with giant
> tables, so a batch job that queries them every N seconds is not an option).
> 3. Do you foresee any other edge cases?
>
> Thanks for taking a look at this.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Any advice on how to replay an event-timed stream?

2019-01-17 Thread Jamie Grier
I don't think I understood all of your question but with regard to the
watermarking and keys..  You are correct that watermarking (event time
advancement) is not per key.  Event-time is a local property of each Task
in an executing Flink job.  It has nothing to do with keys.  It has only to
do with the input data timestamps seen by each task and the watermarking
function (which isn't per-key).

I hope that helps.

With regard to how to play historical data..  Well there are many ways to
approach that.  Can you narrow down your constraints?  Where does the
historical data live?

-Jamie




On Thu, Jan 17, 2019 at 4:36 PM Kanstantsin Kamkou 
wrote:

> Hi guys! As I understood (I hope I’m wrong) the current design concept of
> the watermarking mechanism is that it tight to the latest watermark and
> there is no way to separate those watermarks by key in keyed stream (I hope
> at some point it’l be mentioned in the documentation as it unfortunately
> misleading). Could you share your thoughts on how to replay historical data
> in event–time manner (i.e. from db to working application)? The solution
> with the processing time is not suitable here as the sessions windows are
> needed.
>
> Thank you!
> --
> Best regards, Kanstantsin Kamkou
> email:  kkam...@gmail.com
> web: http://2ka.by/
> mobile: +49 172 5432334
> skype: kkamkou
>


Re: Issue with counter metrics for large number of keys

2019-01-17 Thread Jamie Grier
+1 to what Zhenghua said.  You're abusing the metrics system I think.
Rather just do a stream.keyBy().sum() and then write a Sink to do something
with the data -- for example push it to your metrics system if you wish.

However, from experience, many metrics systems don't like that sort of
thing.  High cardinality metrics are usually a problem so I would expect to
run into issues depending on what metrics system you are trying to get the
data into.

-Jamie


On Wed, Jan 16, 2019 at 8:32 PM Zhenghua Gao  wrote:

> So what you want is the counts of every keys ?
> Why didn't you use count aggregation?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Getting RemoteTransportException

2019-01-17 Thread Jamie Grier
Avi,

The stack trace there is pretty much a red herring.  That happens whenever
a job shuts down for any reason and is not a root cause.  To diagnose this
you will want to look at all the TaskManager logs as well as the JobManager
logs.  If you have a way to easily grep these (all of them at once) I would
search for a string like "to FAILED" on the taskmanagers and look at those
error lines and stacktraces.

Don't be misled by the exception reported in the Flink UI.  It OFTEN isn't
the true root cause but it's a hard problem to solve.  You have to look at
the TaskManager logs to really be sure.

The taskmanager.network.netty.server.numThreads

is
also a red herring.  I would leave that alone.

Finally, if you have HA and checkpointing setup correctly you will not lose
any state even in the case of losing a JobManager.  The job will
auto-recover as soon as a new JobManager becomes available.

I hope that helps.

-Jamie



On Thu, Jan 17, 2019 at 7:10 AM Dominik Wosiński  wrote:

> *Hey,*
> As for the question about  taskmanager.network.netty.server.numThreads
> .
> It is the size of the thread pool that will be used by the netty server.
> The default value is -1, which will result in the thread pool with size
> equal to the number of task slots for your JobManager.
>
> Best Regards,
> Dom.
>
> czw., 17 sty 2019 o 00:52 Avi Levi  napisał(a):
>
>> Hi Guys,
>>
>> We done some load tests and we got the exception below, I saw that the
>> JobManager was restarted, If I understood correctly, it will get new job id
>> and the state will lost - is that correct? how the state is handled setting
>> HA as described here
>> ,
>>  what
>> actually happens to the state if one of the job manager crashes (keyed
>> state using rocks db) ?
>>
>>
>> One of the property that might be relevant to this exception is
>> taskmanager.network.netty.server.numThreads
>> 
>>  with
>> a default value of -1 - what is this default value actually means?  and
>> should it be set to different value according to #cores?
>>
>>
>> Thanks for your advice .
>>
>> Avi
>>
>>
>>
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Lost connection to task manager ':1234'. This indicates that the remote
>> task manager was lost.
>>
>> at
>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:160)
>>
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>>
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>>
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>>
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>>
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>>
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>>
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>>
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)
>>
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>>
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>>
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>>
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401)
>>
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>>
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>>
>> at
>> 

Re: One TaskManager per node or multiple TaskManager per node

2019-01-15 Thread Jamie Grier
Ethan, it depends on what you mean by easy ;)  It just depends a lot on
what infra tools you already have in place.  On bare metal it's probably
safe to say there is no "easy" way.  You need a lot of automation to make
it easy.

Bastien, IMO, #1 applies to batch jobs as well.

On Tue, Jan 15, 2019 at 6:27 AM bastien dine  wrote:

> Hello Jamie,
>
> Does #1 apply to batch jobs too ?
>
> Regards,
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>
>
> Le lun. 14 janv. 2019 à 20:39, Jamie Grier  a écrit :
>
>> There are a lot of different ways to deploy Flink.  It would be easier to
>> answer your question with a little more context about your use case but in
>> general I would advocate the following:
>>
>> 1) Don't run a "permanent" Flink cluster and then submit jobs to it.
>> Instead what you should do is run an "ephemeral" cluster per job if
>> possible.  This keeps jobs completely isolated from each other which helps
>> a lot with understanding performance, debugging, looking at logs, etc.
>> 2) Given that you can do #1 and you are running on bare metal (as opposed
>> to in containers) then run one TM per physical machine.
>>
>> There are many ways to accomplish the above depending on your deployment
>> infrastructure (YARN, K8S, bare metal, VMs, etc) so it's hard to give
>> detailed input but general you'll have the best luck if you don't run
>> multiple jobs in the same TM/JVM.
>>
>> In terms of the TM memory usage you can set that up by configuring it in
>> the flink-conf.yaml file.  The config key you are looking or is
>> taskmanager.heap.size:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#taskmanager-heap-size
>>
>>
>> On Mon, Jan 14, 2019 at 8:05 AM Ethan Li 
>> wrote:
>>
>>> Hello,
>>>
>>> I am setting up a standalone flink cluster and I am wondering what’s the
>>> best way to distribute TaskManagers.  Do we usually launch one TaskManager
>>> (with many slots) per node or multiple TaskManagers per node (with smaller
>>> number of slots per tm) ?  Also with one TaskManager per node, I am seeing
>>> that TM launches with only 30GB JVM heap by default while the node has 180
>>> GB. Why is it not launching with more memory since there is a lot
>>> available?
>>>
>>> Thank you very much!
>>>
>>> - Ethan
>>
>>


Re: One TaskManager per node or multiple TaskManager per node

2019-01-14 Thread Jamie Grier
There are a lot of different ways to deploy Flink.  It would be easier to
answer your question with a little more context about your use case but in
general I would advocate the following:

1) Don't run a "permanent" Flink cluster and then submit jobs to it.
Instead what you should do is run an "ephemeral" cluster per job if
possible.  This keeps jobs completely isolated from each other which helps
a lot with understanding performance, debugging, looking at logs, etc.
2) Given that you can do #1 and you are running on bare metal (as opposed
to in containers) then run one TM per physical machine.

There are many ways to accomplish the above depending on your deployment
infrastructure (YARN, K8S, bare metal, VMs, etc) so it's hard to give
detailed input but general you'll have the best luck if you don't run
multiple jobs in the same TM/JVM.

In terms of the TM memory usage you can set that up by configuring it in
the flink-conf.yaml file.  The config key you are looking or is
taskmanager.heap.size:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#taskmanager-heap-size


On Mon, Jan 14, 2019 at 8:05 AM Ethan Li  wrote:

> Hello,
>
> I am setting up a standalone flink cluster and I am wondering what’s the
> best way to distribute TaskManagers.  Do we usually launch one TaskManager
> (with many slots) per node or multiple TaskManagers per node (with smaller
> number of slots per tm) ?  Also with one TaskManager per node, I am seeing
> that TM launches with only 30GB JVM heap by default while the node has 180
> GB. Why is it not launching with more memory since there is a lot
> available?
>
> Thank you very much!
>
> - Ethan


Re: What happen to state in Flink Task Manager when crash?

2019-01-11 Thread Jamie Grier
Flink is designed such that local state is backed up to a highly available
system such as HDFS or S3.  When a TaskManager fails state is recovered
from there.

I suggest reading this:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/state_backends.html


On Fri, Jan 11, 2019 at 7:33 AM Siew Wai Yow  wrote:

> Hello,
>
> May i know what happen to state stored in Flink Task Manager when this
> Task manager crash. Say the state storage is rocksdb, would those data
> transfer to other running Task Manager so that complete state data is ready
> for data processing?
>
> Regards,
> Yow
>


Re: Metric on JobManager

2018-11-21 Thread Jamie Grier
What you're describing is not possible.  There is no runtime context or
metrics you can use at that point.

The best you can probably do (at least for start time) is just keep a flag
in your function and log a metric once and only once when it first starts
executing.

On Wed, Nov 21, 2018 at 5:18 AM bastien dine  wrote:

> Hello all,
>
> I am using metric to count some sutff in my topology, this is pretty easy
> with the metric API in getRuntimeContext in a Rich function
> However I would like to use this metric API to log start date & end date
> of my processing, but in the source code executed on the job manager (i.e
> not in a operator) before & after the env.execute..
> How can i retrieve the runtime context, from the execution env maybe ?
>
> Regards,
> Bastien
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>


Re: your advice please regarding state

2018-11-21 Thread Jamie Grier
Hi Avi,

The typical approach would be as you've described in #1.  #2 is not
necessary -- #1 is already doing basically exactly that.

-Jamie


On Wed, Nov 21, 2018 at 3:36 AM Avi Levi  wrote:

> Hi ,
> I am very new to flink so please be gentle :)
>
> *The challenge:*
> I have a road sensor that should scan billons of cars per day. for starter
> I want to recognise if each car that passes by is new or not. new cars
> (never been seen before by that sensor ) will be placed on a different
> topic on kafka than the other (total of two topics for new and old) .
>  under the assumption that the state will contain billions of unique car
> ids.
>
> *Suggested Solutions*
> My question is it which approach is better.
> Both approaches using RocksDB
>
> 1. use the ValueState and to split the steam like
>   *val domainsSrc = env*
> *.addSource(consumer)*
> *.keyBy(car => car.id )*
> *.map(...)*
> and checking if the state value is null to recognise new cars. if new than
> I will update the state
> how will the persistent data will be shard among the nodes in the cluster
> (let's say that I have 10 nodes) ?
>
> 2. use MapState and to partition the stream to groups by some arbitrary
> factor e.g
> *val domainsSrc = env*
> *.addSource(consumer)*
> *.keyBy{ car =>*
> *val h car.id.hashCode % partitionFactor*
> *math.abs(h)*
> *} .map(...)*
> and to check *mapState.keys.contains(car.id ) *if not -
> add it to the state
>
> which approach is better ?
>
> Thanks in advance
> Avi
>


Re: Assign IDs to Operators

2018-11-21 Thread Jamie Grier
Hi Chang,

The partitioning steps, like keyBy() are not operators.  In general you can
let Flink's fluent-style API tell you the answer.  If you can call .uid()
in the API and it compiles then the thing just before that is an operator ;)

-Jamie


On Wed, Nov 21, 2018 at 5:59 AM Chang Liu  wrote:

> Dear All,
>
> As stated here (
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html),
> it is highly recommended to assign IDs to Operators, especially for the
> stateful ones.
>
> My question is: what is the gradually of a so-called Operator.
>
> To be more specific, in the following example, we have the Operators like,
> addSource and map. I am wondering is shuffle and print also a some kind of
> Operator?
>
> DataStream stream = env.
>   // Stateful source (e.g. Kafka) with ID
>   .addSource(new StatefulSource())
>   .uid("source-id") // ID for the source operator
>   .shuffle()
>   // Stateful mapper with ID
>   .map(new StatefulMapper())
>   .uid("mapper-id") // ID for the mapper
>   // Stateless printing sink
>   .print(); // Auto-generated ID
>
>
> Or, in the following example, how many Operator we have (that we can
> assign IDs to)? 3? KeyBy, window and aggregate?
>
>
> input
> .keyBy()
> .window()
> .aggregate(new AverageAggregate)
>
>
> Then, how many Operators (and which are they) do we have in the following
> example?
>
> stream.join(otherStream)
> .where()
> .equalTo()
> .window()
> .apply()
>
>
> Many Thanks.
>
> Best regards/祝好,
>
> Chang Liu 刘畅
>
>
>


Re: Reset kafka offets to latest on restart

2018-11-21 Thread Jamie Grier
Hi Vishal,

No, there is no way to do this currently.


On Wed, Nov 21, 2018 at 10:22 AM Vishal Santoshi 
wrote:

> Any one ?
>
> On Tue, Nov 20, 2018 at 12:48 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Is it possible to have checkpointing but reset the kafka offsets to
>> latest on restart on failure ?
>>
>


Re: Flink 1.5.4 -- issues w/ TaskManager connecting to ResourceManager

2018-09-25 Thread Jamie Grier
Update on this:

The issue was the command being used to start the jobmanager:
`jobmanager.sh start-foreground cluster`.  This was a command leftover in
our automation that used to be the correct way to start the JM -- however
now, in Flink 1.5.4, that second parameter, `cluster`, is being interpreted
as the hostname for the jobmanager to bind to.

The solution was just to remove `cluster` from that command.



On Tue, Sep 25, 2018 at 10:15 AM Jamie Grier  wrote:

> Anybody else seen this and know the solution?  We're dead in the water
> with Flink 1.5.4.
>
> On Sun, Sep 23, 2018 at 11:46 PM alex  wrote:
>
>> We started to see same errors after upgrading to flink 1.6.0 from 1.4.2.
>> We
>> have one JM and 5 TM on kubernetes. JM is running on HA mode. Taskmanagers
>> sometimes are loosing connection to JM and having following error like you
>> have.
>>
>> *2018-09-19 12:36:40,687 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
>> resolve ResourceManager address
>> akka.tcp://flink@flink-jobmanager:50002/user/resourcemanager, retrying in
>> 1 ms: Ask timed out on
>> [ActorSelection[Anchor(akka.tcp://flink@flink-jobmanager:50002/),
>> Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
>> of
>> type "akka.actor.Identify"..*
>>
>> When TM started to have "Could not resolve ResourceManager", it cannot
>> resolve itself until I restart the TM pod.
>>
>> *Here is the content of our flink-conf.yaml:*
>> blob.server.port: 6124
>> jobmanager.rpc.address: flink-jobmanager
>> jobmanager.rpc.port: 6123
>> jobmanager.heap.mb: 4096
>> jobmanager.web.history: 20
>> jobmanager.archive.fs.dir: s3://our_path
>> taskmanager.rpc.port: 6121
>> taskmanager.heap.mb: 16384
>> taskmanager.numberOfTaskSlots: 10
>> taskmanager.log.path: /opt/flink/log/output.log
>> web.log.path: /opt/flink/log/output.log
>> state.checkpoints.num-retained: 3
>> metrics.reporters: prom
>> metrics.reporter.prom.class:
>> org.apache.flink.metrics.prometheus.PrometheusReporter
>>
>> high-availability: zookeeper
>> high-availability.jobmanager.port: 50002
>> high-availability.zookeeper.quorum: zookeeper_instance_list
>> high-availability.zookeeper.path.root: /flink
>> high-availability.cluster-id: profileservice
>> high-availability.storageDir: s3://our_path
>>
>> Any help will be greatly appreciated!
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Flink 1.5.4 -- issues w/ TaskManager connecting to ResourceManager

2018-09-25 Thread Jamie Grier
Anybody else seen this and know the solution?  We're dead in the water with
Flink 1.5.4.

On Sun, Sep 23, 2018 at 11:46 PM alex  wrote:

> We started to see same errors after upgrading to flink 1.6.0 from 1.4.2. We
> have one JM and 5 TM on kubernetes. JM is running on HA mode. Taskmanagers
> sometimes are loosing connection to JM and having following error like you
> have.
>
> *2018-09-19 12:36:40,687 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
> resolve ResourceManager address
> akka.tcp://flink@flink-jobmanager:50002/user/resourcemanager, retrying in
> 1 ms: Ask timed out on
> [ActorSelection[Anchor(akka.tcp://flink@flink-jobmanager:50002/),
> Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
> of
> type "akka.actor.Identify"..*
>
> When TM started to have "Could not resolve ResourceManager", it cannot
> resolve itself until I restart the TM pod.
>
> *Here is the content of our flink-conf.yaml:*
> blob.server.port: 6124
> jobmanager.rpc.address: flink-jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.heap.mb: 4096
> jobmanager.web.history: 20
> jobmanager.archive.fs.dir: s3://our_path
> taskmanager.rpc.port: 6121
> taskmanager.heap.mb: 16384
> taskmanager.numberOfTaskSlots: 10
> taskmanager.log.path: /opt/flink/log/output.log
> web.log.path: /opt/flink/log/output.log
> state.checkpoints.num-retained: 3
> metrics.reporters: prom
> metrics.reporter.prom.class:
> org.apache.flink.metrics.prometheus.PrometheusReporter
>
> high-availability: zookeeper
> high-availability.jobmanager.port: 50002
> high-availability.zookeeper.quorum: zookeeper_instance_list
> high-availability.zookeeper.path.root: /flink
> high-availability.cluster-id: profileservice
> high-availability.storageDir: s3://our_path
>
> Any help will be greatly appreciated!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Flink 1.5.4 -- issues w/ TaskManager connecting to ResourceManager

2018-09-21 Thread Jamie Grier
Anybody else seen this?  I'm running both the JM and TM on the same host in
this setup.  This was working fine w/ Flink 1.5.3.

On the TaskManager:

00:31:30.268 INFO  o.a.f.r.t.TaskExecutor - Could not resolve
ResourceManager address akka.tcp://flink@localhost:6123/user/resourcemanager,
retrying in 1 ms: Ask timed out on
[ActorSelection[Anchor(akka.tcp://flink@localhost:6123/),
Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message
of type "akka.actor.Identify"..

On the JobManager:

00:32:00.339 ERROR a.r.EndpointWriter - dropping message [class
akka.actor.ActorSelectionMessage] for non-local recipient
[Actor[akka.tcp://flink@localhost:6123/]] arriving at
[akka.tcp://flink@localhost:6123] inbound addresses are
[akka.tcp://flink@cluster:6123]


Re: classloading strangeness with Avro in Flink

2018-08-20 Thread Jamie Grier
Hey Cliff, can you provide the stack trace of the issue you were seeing?
We recently ran into a similar issue that we're still debugging.  Did it
look like this:

java.lang.IllegalStateException: Could not initialize operator state
> backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException: Could not find
> required Avro dependency.
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> ... 6 common frames omitted
> 00:17:19.626 INFO o.a.f.r.e.ExecutionGraph - Job
> ClientEventToElasticsearchJob (5cec438674e9a111703c83897f7c8138) switched
> from state RUNNING to FAILING.
> java.lang.IllegalStateException: Could not initialize operator state
> backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException: Could not find
> required Avro dependency.
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> ... 6 common frames omitted


-Jamie


On Mon, Aug 20, 2018 at 10:42 AM, Cliff Resnick  wrote:

> Hi Vino,
>
> You were right in your assumption -- unshaded avro was being added to our
> application jar via third-party dependency. Excluding it in packaging fixed
> the issue. For the record, it looks flink-avro must be loaded from the lib
> or there will be errors in checkpoint restores.
>
> On Mon, Aug 20, 2018 at 8:43 AM Cliff Resnick  wrote:
>
>> Hi Vino,
>>
>> Thanks for the explanation, but the job only ever uses the Avro (1.8.2)
>> pulled in by flink-formats/avro, so it's not a class version conflict
>> there.
>>
>> I'm using default child-first loading. It might be a further transitive
>> dependency, though it's not clear by stack trace or stepping through the
>> process. When I get a chance I'll look further into it but in case anyone
>> is experiencing similar problems, what is clear is that classloader order
>> does matter with Avro.
>>
>> On Sun, Aug 19, 2018, 11:36 PM vino yang  wrote:
>>
>>> Hi Cliff,
>>>
>>> My personal guess is 

Running Flink in multiple AWS availability zones

2018-08-16 Thread Jamie Grier
Hi all,

I'm looking to learn if/how others are running Flink jobs in such a way
that they can survive failure of a single Amazon AWS availability zone.

If you're currently doing this I would love a reply detailing your setup.

Thanks!

-Jamie


Re: Getting key from keyed stream

2017-01-12 Thread Jamie Grier
A simpler and more efficient approach would simply be the following:

val stream = env.addSource(new FlinkKafkaConsumer(...))

stream
  .addSink(new FlinkKafkaProducer(new MyKeyedSerializationSchema(...)))

env.execute()

In MyKeyedSerializationSchema just override the getTargetTopic() method.

That should do it :)

-Jamie

On Thu, Jan 12, 2017 at 12:53 PM, Paul Joireman <paul.joire...@physiq.com>
wrote:

Hi all,
>
>
> Is there a simple way to read the key from a KeyedStream.   Very simply
> I'm trying to read a message from Kafka, separate the incoming messages by
> a field in the message and write the original message back to Kafka using
> that field as a new topic.  I chose to partition the incoming stream by
> creating a KeyedStream and using the field from the message as the key.
>  The only thing left is to write the message to Kafka with a producer but i
> need to know the topic to write to and for that I need to be able to read
> the key.   Is there a way to do this?
>
>
> Is there a better way to do this, rather than using a KeyedStream.
>
>
> Paul
>
​
-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: keyBy called twice. Second time, INetAddress and Array[Byte] are empty

2017-01-10 Thread Jamie Grier
Hi Jonas,

The issue has to do with serializing/deserializing InetAddress.  If you
look at the InetAddress class the data members that hold the actual ip
address are transient fields and such are not serialized/deserialized in
the way that you would expect.  This is what is causing the issue.

I suggest you simply do not use InetAddress in your Person data type but
rather a simple string or other properly serializable type for instance.

-Jamie


On Mon, Jan 9, 2017 at 9:46 AM, Jonas <jo...@huntun.de> wrote:

> So I created a minimal working example where this behaviour can still be
> seen. It is 15 LOC and can be downloaded here:
> https://github.com/JonasGroeger/flink-inetaddress-zeroed
>
> To run it, use sbt:
>
> If you don't want to do the above fear not, here is the code:
>
> For some reason, java.net.InetAddress objects get zeroed. Why is that?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/keyBy-called-
> twice-Second-time-INetAddress-and-Array-Byte-are-empty-tp10907p10947.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Set Parallelism and keyBy

2017-01-02 Thread Jamie Grier
Domink,

This should work just as you expect.  Maybe the output of the print is just
misleading you.  The print() operation will still have a parallelism of two
but the flatMap() with have a parallelism of 16 and all data elements with
the same key will get routed to the same host.

Any sequence of keyBy().flatMap() will always properly partition the data
across the instances of the flatMap() operator by key.

-Jamie


On Mon, Dec 26, 2016 at 10:52 AM, Dominik Bruhn <domi...@dbruhn.de> wrote:

> Hey,
> I have a flink job which has a default parallelism set to 2. I want to key
> the stream and then apply some flatMap on the keyed stream. The flatMap
> operation is quiet costly, so I want to have a much higher parallelism here
> (lets say 16). Additionally, it is important that the flatMap operation is
> executed for the same key always in the same process or in the same task.
>
> I have the following code:
>
> 
> env.setParallelism(2)
> val input: DataStream[Event] = /* from somewhere */
> input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).print()
> 
>
> This works fine, and the "ExpensiveOperation" is executed always on the
> same tasks for the same keys.
>
> Now I tried two things:
>
> 1.
> 
> env.setParallelism(2)
> val input: DataStream[Event] = /* from somewhere */
> input.keyBy(_.eventType).setParallelism(16).flatMap(new
> ExpensiveOperation()).print()
> 
> This fails with an exception because I can't set the parallelism on the
> keyBy operator.
>
> 2.
> -
> env.setParallelism(2)
> val input: DataStream[Event] = /* from somewhere */
> input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).setParal
> lelism(16).print()
> -
> While this executes, it breaks the assignment of the keys to the tasks:
> The "ExpensiveOperation" is now not executed on the same nodes anymore all
> the time (visible by the prefixes in the print()).
>
> What am I doing wrong? Is the only chance to set the whole parallelism of
> the whole flink job to 16?
>
> Thanks, have nice holidays,
> Dominik
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Flink streaming questions

2017-01-02 Thread Jamie Grier
Hi Henri,

#1 - This is by design.  Event time advances with the slowest input
source.  If there are input sources that generate no data this is
indistinguishable from a slow source.  Kafka topics where some partitions
receive no data are a problem in this regard -- but there isn't a simple
solution.  If possible I would address it at the source.

#2 - If it's possible to run these init functions just once when you submit
the job you can run them in the constructor of your FoldFunction.  This
init will then happen exactly once (on the client) and the constructed
FoldFunction is then serialized and distributed around the cluster.  If
this doesn't work because you need something truly dynamic you could also
accomplish this with a simple local variable in your function.

class MyFoldFunction extends FoldFunction {
>   private var initialized = false
>   def fold(accumulator: T, value: O): T = {
> if(!initialized){
>   doInitStuff()
>   initialized = true
> }
>
> doNormalStuff()
>   }
> }


#3 - One way to do this is as you've said which is to attach the profile
information to the event, using a mapper, before it enters the window
operations.


On Mon, Jan 2, 2017 at 1:25 AM, Henri Heiskanen <henri.heiska...@gmail.com>
wrote:

> Hi,
>
> I have few questions related to Flink streaming. I am on 1.2-SNAPSHOT and
> what I would like to accomplish is to have a stream that reads data from
> multiple kafka topics, identifies user sessions, uses an external user user
> profile to enrich the data, evaluates an script to produce session
> aggregates and then create updated profiles from session aggregates. I am
> working with high volume data and user sessions may be long, so using
> generic window apply might not work. Below is the simplification of the
> stream.
>
> stream = createKafkaStreams(...);
> env.setParallelism(4);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> stream
> .keyBy(2)
> .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
> .fold(new SessionData(), new SessionFold(), new
> ProfilerApply())
> .print();
>
> The questions:
>
> 1. Initially when I used event time windowing I could not get any of my
> windows to close. The reason seemed to be that I had 6 partitions in my
> test kafka setup and only 4 of them generated traffic. If I used
> parallelism above 4, then no windows were closed. Is this by design or a
> defect? We use flink-connector-kafka-0.10 because earlier versions did not
> commit the offsets correctly.
>
> 2. Rich fold functions are not supported. However I would like execute a
> piece of custom script in the fold function that requires initialisation
> part. I would have used the open and close lifecycle methods of rich
> functions but they are not available now in fold. What would be the
> preferred way to run some initialisation routines (and closing the
> gracefully) when using fold?
>
> 3. Kind of related to above. I would also like to fetch a user profile
> from external source in the beginning of the session. What would be a best
> practice for that kind of operation? If I would be using the generic window
> apply I could fetch in in the beginning of the apply method. I was thinking
> of introducing a mapper that fetches this profiler periodically and caches
> it to flink state. However, with this setup I would not be able to tie this
> to user sessions identified for windows.
>
> 4. I also may have an additional requirement of writing out each event
> enriched with current session and profile data. I basically could do this
> again with generic window function and write out each event with collector
> when iterating, but would there be a better pattern to use? Maybe sharing
> state with functions or something.
>
> Br,
> Henri H
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Programmatically get live values of accumulators

2017-01-02 Thread Jamie Grier
Hi Gwenhael,

I think what you actually want is to use the Apache Flink metrics
interface.  See the following:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/metrics.html

Sending metrics to StatsD is supported out-of-the-box.

-Jamie


On Mon, Jan 2, 2017 at 1:34 AM, Gwenhael Pasquiers <
gwenhael.pasqui...@ericsson.com> wrote:

> Hi, and best wishes for the year to come J
>
>
>
> I’d like to be able to programmatically get the (live) values of
> accumulators in order to send them using a statsd (or another) client in
> the JobManager of a yarn-deployed application. I say live because I’d like
> to use that in streaming (24/7) applications, and send live stats, I cannot
> way for the application to end.
>
>
>
> I’ve seen that there is a json API (I’d prefer no to have my app poll
> itself).
>
> I’ve seen some code on github (tests files) where it’s done using the
> underlying akka framework, I don’t mind doing it the same way and creating
> an actor to get notifications messages, but I don’t know the best way, and
> there probably is a better one.
>
>
>
> Thanks in advance,
>
>
>
> Gwenhaël PASQUIERS
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Hi, There is possibly an issue with EventTimeSessionWindows where a gap is specified for considering items in the same session. Here the logic is, if two adjacent items have a difference in event

2017-01-02 Thread Jamie Grier
If there is never a gap between elements larger than the session gap -- the
window never ending would be the correct behavior.  So, if this is the case
with some data stream I would not suggest to use session windows at all --
or I would use a smaller session gap.

Another alternative would be to use Session Windows along with a
user-defined trigger that fires periodically whether the session has ended
or not.  For example, if in the normal case the session window logic works
well but sometimes you want to force an eval in case a "natural" session is
too long.

-Jamie

On Mon, Jan 2, 2017 at 3:11 AM, Sujit Sakre <sujit.sa...@northgateps.com>
wrote:

> Hi,
>
> We are using Flink 1.1.4 version.
>
>
> There is possibly an issue with EventTimeSessionWindows where a gap is
> specified for considering items in the same session. Here the logic is, if
> two adjacent items have a difference in event timestamps of more than the
> gap then the items are considered to be in separate session. The issue is,
> what happens if the gap between streaming records is *never*  (or for a
> very long time) less than the session gap. This is likely to lead to a race
> condition.
>
> Is this a bug? How do we deal with this to process windows in finite time?
>
> Please could you suggest.
>
> Thanks.
>
>
> *Sujit Sakre*
>
> Senior Technical Architect
> Tel: +91 22 6660 6600
> Ext:
> 247
> Direct: 6740 5247
>
> Mobile: +91 98672 01204
>
> www.rave-tech.com
>
>
>
> Follow us on: Twitter <https://twitter.com/Rave_Tech> / LinkedIn
> <https://in.linkedin.com/in/ravetechnologies> / YouTube
> <https://www.youtube.com/channel/UCTaO1am-cm4FqnQCGdB6ExA>
>
>
>
> Rave Technologies – A Northgate Public Services Company
> <https://www.google.co.in/maps/place/Rave+Technologies/@19.0058078,72.823516,17z/data=!3m1!4b1!4m5!3m4!1s0x3bae17fcde71c3b9:0x1e2a8c0c4a075145!8m2!3d19.0058078!4d72.8257047>
>
>
>
> Please consider the environment before printing this email
>
> This email is sent on behalf of Northgate Public Services (UK) Limited and
> its associated companies including Rave Technologies (India) Pvt Limited
> (together "Northgate Public Services") and is strictly confidential and
> intended solely for the addressee(s).
> If you are not the intended recipient of this email you must: (i) not
> disclose, copy or distribute its contents to any other person nor use its
> contents in any way or you may be acting unlawfully;  (ii) contact
> Northgate Public Services immediately on +44(0)1908 264500
> <+44%201908%20264500> quoting the name of the sender and the addressee
> then delete it from your system.
> Northgate Public Services has taken reasonable precautions to ensure that
> no viruses are contained in this email, but does not accept any
> responsibility once this email has been transmitted.  You should scan
> attachments (if any) for viruses.
>
> Northgate Public Services (UK) Limited, registered in England and Wales
> under number 00968498 with a registered address of Peoplebuilding 2,
> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 41.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Multiple consumers and custom triggers

2016-12-15 Thread Jamie Grier
To be more clear...

A single source in a Flink program is a logical concept.  Flink jobs are
run with some level of parallelism meaning that multiple copies of your
source (and all other) functions are run distributed across a cluster.  So
if you have a streaming program with two sources and you run with a
parallelism of 8 there are actually a total of 16 source functions
executing on the cluster -- 8 instances of each of the two source operators
you've defined in your Flink job.

For more info on this you may want to read through the following:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/concepts/concepts.html

On Thu, Dec 15, 2016 at 3:21 PM, Jamie Grier <ja...@data-artisans.com>
wrote:

> All streams can be parallelized in Flink even with only one source.  You
> can have multiple sinks as well.
>
> On Thu, Dec 15, 2016 at 7:56 AM, Meghashyam Sandeep V <
> vr1meghash...@gmail.com> wrote:
>
>> 1. If we have multiple sources, can the streams be parallelized ?
>> 2. Can we have multiple sinks as well?
>>
>> On Dec 14, 2016 10:46 PM, <dromitl...@gmail.com> wrote:
>>
>>> Got it. Thanks!
>>>
>>> On Dec 15, 2016, at 02:58, Jamie Grier <ja...@data-artisans.com> wrote:
>>>
>>> Ahh, sorry, for #2: A single Flink job can have as many sources as you
>>> like. They can be combined in multiple ways, via things like joins, or
>>> connect(), etc. They can also be completely independent — in other words
>>> the data flow graph can be completely disjoint. You never to need to call
>>> execute() more than once. Just define you program, with as many sources as
>>> you want, and then call execute().
>>>
>>> val stream1 = env.addSource(...)val stream2 = env.addSource(...)
>>>
>>> stream1
>>>   .map(...)
>>>   .addSink(...)
>>>
>>> stream2
>>>   .map(...)
>>>   .addSink(...)
>>>
>>> env.execute() // this is all you need
>>>
>>> ​
>>>
>>> On Wed, Dec 14, 2016 at 4:02 PM, Matt <dromitl...@gmail.com> wrote:
>>>
>>>> Hey Jamie,
>>>>
>>>> Ok with #1. I guess #2 is just not possible.
>>>>
>>>> I got it about #3. I just checked the code for the tumbling window
>>>> assigner and I noticed it's just its default trigger that gets overwritten
>>>> when using a custom trigger, not the way it assigns windows, it makes sense
>>>> now.
>>>>
>>>> Regarding #4, after doing some more tests I think it's more complex
>>>> than I first thought. I'll probably create another thread explaining more
>>>> that specific question.
>>>>
>>>> Thanks,
>>>> Matt
>>>>
>>>> On Wed, Dec 14, 2016 at 2:52 PM, Jamie Grier <ja...@data-artisans.com>
>>>> wrote:
>>>>
>>>>> For #1 there are a couple of ways to do this.  The easiest is probably
>>>>> stream1.connect(stream2).map(...) where the MapFunction maps the two
>>>>> input types to a common type that you can then process uniformly.
>>>>>
>>>>> For #3 There must always be a WindowAssigner specified.  There are
>>>>> some convenient ways to do this in the API such at timeWindow(), or
>>>>> window(TumblingProcessingTimeWindows.of(...)), etc, however you
>>>>> always must do this whether your provide your own trigger implementation 
>>>>> or
>>>>> not.  The way to use window(...) with and customer trigger is just:
>>>>>  stream.keyBy(...).window(...).trigger(...).apply(...) or something
>>>>> similar.  Not sure if I answered your question though..
>>>>>
>>>>> For #4: If I understand you correctly that is exactly what
>>>>> CountWindow(10, 1) does already.  For example if your input data was a
>>>>> sequence of integers starting with 0 the output would be:
>>>>>
>>>>> (0)
>>>>> (0, 1)
>>>>> (0, 1, 2)
>>>>> (0, 1, 2, 3)
>>>>> (0, 1, 2, 3, 4)
>>>>> (0, 1, 2, 3, 4, 5)
>>>>> (0, 1, 2, 3, 4, 5, 6)
>>>>> (0, 1, 2, 3, 4, 5, 6, 7)
>>>>> (0, 1, 2, 3, 4, 5, 6, 7, 8)
>>>>> (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
>>>>> (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>>>>> (2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
>>>>> (3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
>>>>> (4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
>>>>> ...
>>>>> etc

Re: Multiple consumers and custom triggers

2016-12-15 Thread Jamie Grier
All streams can be parallelized in Flink even with only one source.  You
can have multiple sinks as well.

On Thu, Dec 15, 2016 at 7:56 AM, Meghashyam Sandeep V <
vr1meghash...@gmail.com> wrote:

> 1. If we have multiple sources, can the streams be parallelized ?
> 2. Can we have multiple sinks as well?
>
> On Dec 14, 2016 10:46 PM, <dromitl...@gmail.com> wrote:
>
>> Got it. Thanks!
>>
>> On Dec 15, 2016, at 02:58, Jamie Grier <ja...@data-artisans.com> wrote:
>>
>> Ahh, sorry, for #2: A single Flink job can have as many sources as you
>> like. They can be combined in multiple ways, via things like joins, or
>> connect(), etc. They can also be completely independent — in other words
>> the data flow graph can be completely disjoint. You never to need to call
>> execute() more than once. Just define you program, with as many sources as
>> you want, and then call execute().
>>
>> val stream1 = env.addSource(...)val stream2 = env.addSource(...)
>>
>> stream1
>>   .map(...)
>>   .addSink(...)
>>
>> stream2
>>   .map(...)
>>   .addSink(...)
>>
>> env.execute() // this is all you need
>>
>> ​
>>
>> On Wed, Dec 14, 2016 at 4:02 PM, Matt <dromitl...@gmail.com> wrote:
>>
>>> Hey Jamie,
>>>
>>> Ok with #1. I guess #2 is just not possible.
>>>
>>> I got it about #3. I just checked the code for the tumbling window
>>> assigner and I noticed it's just its default trigger that gets overwritten
>>> when using a custom trigger, not the way it assigns windows, it makes sense
>>> now.
>>>
>>> Regarding #4, after doing some more tests I think it's more complex than
>>> I first thought. I'll probably create another thread explaining more that
>>> specific question.
>>>
>>> Thanks,
>>> Matt
>>>
>>> On Wed, Dec 14, 2016 at 2:52 PM, Jamie Grier <ja...@data-artisans.com>
>>> wrote:
>>>
>>>> For #1 there are a couple of ways to do this.  The easiest is probably
>>>> stream1.connect(stream2).map(...) where the MapFunction maps the two
>>>> input types to a common type that you can then process uniformly.
>>>>
>>>> For #3 There must always be a WindowAssigner specified.  There are some
>>>> convenient ways to do this in the API such at timeWindow(), or
>>>> window(TumblingProcessingTimeWindows.of(...)), etc, however you always
>>>> must do this whether your provide your own trigger implementation or not.
>>>> The way to use window(...) with and customer trigger is just:
>>>>  stream.keyBy(...).window(...).trigger(...).apply(...) or something
>>>> similar.  Not sure if I answered your question though..
>>>>
>>>> For #4: If I understand you correctly that is exactly what
>>>> CountWindow(10, 1) does already.  For example if your input data was a
>>>> sequence of integers starting with 0 the output would be:
>>>>
>>>> (0)
>>>> (0, 1)
>>>> (0, 1, 2)
>>>> (0, 1, 2, 3)
>>>> (0, 1, 2, 3, 4)
>>>> (0, 1, 2, 3, 4, 5)
>>>> (0, 1, 2, 3, 4, 5, 6)
>>>> (0, 1, 2, 3, 4, 5, 6, 7)
>>>> (0, 1, 2, 3, 4, 5, 6, 7, 8)
>>>> (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
>>>> (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>>>> (2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
>>>> (3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
>>>> (4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
>>>> ...
>>>> etc
>>>>
>>>> -Jamie
>>>>
>>>>
>>>> On Wed, Dec 14, 2016 at 9:17 AM, Matt <dromitl...@gmail.com> wrote:
>>>>
>>>>> Hello people,
>>>>>
>>>>> I've written down some quick questions for which I couldn't find much
>>>>> or anything in the documentation. I hope you can answer some of them!
>>>>>
>>>>> *# Multiple consumers*
>>>>>
>>>>> *1.* Is it possible to .union() streams of different classes? It is
>>>>> useful to create a consumer that counts elements on different topics for
>>>>> example, using a key such as the class name of the element, and a tumbling
>>>>> window of 5 mins let's say.
>>>>>
>>>>> *2.* In case #1 is not possible, I need to launch multiple consumers
>>>>> to achieve the same effect. However, I'm getting a "Factory already
>>>>> initialized" error if I run environment

Re: Multiple consumers and custom triggers

2016-12-14 Thread Jamie Grier
Ahh, sorry, for #2: A single Flink job can have as many sources as you
like. They can be combined in multiple ways, via things like joins, or
connect(), etc. They can also be completely independent — in other words
the data flow graph can be completely disjoint. You never to need to call
execute() more than once. Just define you program, with as many sources as
you want, and then call execute().

val stream1 = env.addSource(...)val stream2 = env.addSource(...)

stream1
  .map(...)
  .addSink(...)

stream2
  .map(...)
  .addSink(...)

env.execute() // this is all you need

​

On Wed, Dec 14, 2016 at 4:02 PM, Matt <dromitl...@gmail.com> wrote:

> Hey Jamie,
>
> Ok with #1. I guess #2 is just not possible.
>
> I got it about #3. I just checked the code for the tumbling window
> assigner and I noticed it's just its default trigger that gets overwritten
> when using a custom trigger, not the way it assigns windows, it makes sense
> now.
>
> Regarding #4, after doing some more tests I think it's more complex than I
> first thought. I'll probably create another thread explaining more that
> specific question.
>
> Thanks,
> Matt
>
> On Wed, Dec 14, 2016 at 2:52 PM, Jamie Grier <ja...@data-artisans.com>
> wrote:
>
>> For #1 there are a couple of ways to do this.  The easiest is probably
>> stream1.connect(stream2).map(...) where the MapFunction maps the two
>> input types to a common type that you can then process uniformly.
>>
>> For #3 There must always be a WindowAssigner specified.  There are some
>> convenient ways to do this in the API such at timeWindow(), or
>> window(TumblingProcessingTimeWindows.of(...)), etc, however you always
>> must do this whether your provide your own trigger implementation or not.
>> The way to use window(...) with and customer trigger is just:
>>  stream.keyBy(...).window(...).trigger(...).apply(...) or something
>> similar.  Not sure if I answered your question though..
>>
>> For #4: If I understand you correctly that is exactly what
>> CountWindow(10, 1) does already.  For example if your input data was a
>> sequence of integers starting with 0 the output would be:
>>
>> (0)
>> (0, 1)
>> (0, 1, 2)
>> (0, 1, 2, 3)
>> (0, 1, 2, 3, 4)
>> (0, 1, 2, 3, 4, 5)
>> (0, 1, 2, 3, 4, 5, 6)
>> (0, 1, 2, 3, 4, 5, 6, 7)
>> (0, 1, 2, 3, 4, 5, 6, 7, 8)
>> (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
>> (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>> (2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
>> (3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
>> (4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
>> ...
>> etc
>>
>> -Jamie
>>
>>
>> On Wed, Dec 14, 2016 at 9:17 AM, Matt <dromitl...@gmail.com> wrote:
>>
>>> Hello people,
>>>
>>> I've written down some quick questions for which I couldn't find much or
>>> anything in the documentation. I hope you can answer some of them!
>>>
>>> *# Multiple consumers*
>>>
>>> *1.* Is it possible to .union() streams of different classes? It is
>>> useful to create a consumer that counts elements on different topics for
>>> example, using a key such as the class name of the element, and a tumbling
>>> window of 5 mins let's say.
>>>
>>> *2.* In case #1 is not possible, I need to launch multiple consumers to
>>> achieve the same effect. However, I'm getting a "Factory already
>>> initialized" error if I run environment.execute() for two consumers on
>>> different threads. How do you .execute() more than one consumer on the same
>>> application?
>>>
>>> *# Custom triggers*
>>>
>>> *3.* If a custom .trigger() overwrites the trigger of the
>>> WindowAssigner used previously, why do we have to specify a WindowAssigner
>>> (such as TumblingProcessingTimeWindows) in order to be able to specify a
>>> custom trigger? Shouldn't it be possible to send a trigger to .window()?
>>>
>>> *4.* I need a stream with a CountWindow (size 10, slide 1 let's say)
>>> that may take more than 10 hours fill for the first time, but in the
>>> meanwhile I want to process whatever elements already generated. I guess
>>> the way to do this is to create a custom trigger that fires on every new
>>> element, with up to 10 elements at a time. The result would be windows of
>>> sizes: 1 element, then 2, 3, ..., 9, 10, 10, 10,  Is there a way to
>>> achieve this with predefined triggers or a custom trigger is the only way
>>> to go here?
>>>
>>> Best regards,
>>> Matt
>>>
>>
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier <https://twitter.com/jamiegrier>
>> ja...@data-artisans.com
>>
>>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Multiple consumers and custom triggers

2016-12-14 Thread Jamie Grier
For #1 there are a couple of ways to do this.  The easiest is probably
stream1.connect(stream2).map(...) where the MapFunction maps the two input
types to a common type that you can then process uniformly.

For #3 There must always be a WindowAssigner specified.  There are some
convenient ways to do this in the API such at timeWindow(), or
window(TumblingProcessingTimeWindows.of(...)), etc, however you always must
do this whether your provide your own trigger implementation or not.  The
way to use window(...) with and customer trigger is just:
 stream.keyBy(...).window(...).trigger(...).apply(...) or something
similar.  Not sure if I answered your question though..

For #4: If I understand you correctly that is exactly what CountWindow(10,
1) does already.  For example if your input data was a sequence of integers
starting with 0 the output would be:

(0)
(0, 1)
(0, 1, 2)
(0, 1, 2, 3)
(0, 1, 2, 3, 4)
(0, 1, 2, 3, 4, 5)
(0, 1, 2, 3, 4, 5, 6)
(0, 1, 2, 3, 4, 5, 6, 7)
(0, 1, 2, 3, 4, 5, 6, 7, 8)
(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
(3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
(4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
...
etc

-Jamie


On Wed, Dec 14, 2016 at 9:17 AM, Matt <dromitl...@gmail.com> wrote:

> Hello people,
>
> I've written down some quick questions for which I couldn't find much or
> anything in the documentation. I hope you can answer some of them!
>
> *# Multiple consumers*
>
> *1.* Is it possible to .union() streams of different classes? It is
> useful to create a consumer that counts elements on different topics for
> example, using a key such as the class name of the element, and a tumbling
> window of 5 mins let's say.
>
> *2.* In case #1 is not possible, I need to launch multiple consumers to
> achieve the same effect. However, I'm getting a "Factory already
> initialized" error if I run environment.execute() for two consumers on
> different threads. How do you .execute() more than one consumer on the same
> application?
>
> *# Custom triggers*
>
> *3.* If a custom .trigger() overwrites the trigger of the WindowAssigner
> used previously, why do we have to specify a WindowAssigner (such as
> TumblingProcessingTimeWindows) in order to be able to specify a custom
> trigger? Shouldn't it be possible to send a trigger to .window()?
>
> *4.* I need a stream with a CountWindow (size 10, slide 1 let's say) that
> may take more than 10 hours fill for the first time, but in the meanwhile I
> want to process whatever elements already generated. I guess the way to do
> this is to create a custom trigger that fires on every new element, with up
> to 10 elements at a time. The result would be windows of sizes: 1 element,
> then 2, 3, ..., 9, 10, 10, 10,  Is there a way to achieve this with
> predefined triggers or a custom trigger is the only way to go here?
>
> Best regards,
> Matt
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-02 Thread Jamie Grier
Hi Anchit,

That last bit is very interesting - the fact that it works fine with
subtasks <= 30.  It could be that either Influx or Grafana are not able to
keep up with the data being produced.  I would guess that the culprit is
Grafana if looking at any particular subtask index works fine and only the
full aggregation shows issues.  I'm not familiar enough with Grafana to
know which parts of the queries are "pushed down" to the database and which
are done in Grafana.  This might also very by backend database.

Anecdotally, I've also seen scenarios using Grafana and Influx together
where the system seems to get overwhelmed fairly easily..  I suspect the
Graphite/Grafana combo would work a lot better in production setups.

This might be relevant:

https://github.com/grafana/grafana/issues/2634

-Jamie



On Tue, Nov 1, 2016 at 5:48 PM, Anchit Jatana <development.anc...@gmail.com>
wrote:

> I've set the metric reporting frequency to InfluxDB as 10s. In the
> screenshot, I'm using Grafana query interval of 1s. I've tried 10s and more
> too, the graph shape changes a bit but the incorrect negative values are
> still plotted(makes no difference).
>
> Something to add: If the subtasks are less than equal to 30, the same query
> yields correct results. For subtask index > 30 (for my case being 50) it
> plots junk negative and poistive values.
>
> Regards,
> Anchit
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Flink-Metrics-
> InfluxDB-Grafana-Help-with-query-influxDB-query-for-
> Grafana-to-plot-numRecordsIn-numRen-tp9775p9819.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-01 Thread Jamie Grier
Hmm.  I can't recreate that behavior here.  I have seen some issues like
this if you're grouping by a time interval different from the metrics
reporting interval you're using, though.  How often are you reporting
metrics to Influx?  Are you using the same interval in your Grafana
queries?  I see in your queries you are using a time interval of 10
seconds.  Have you tried 1 second?  Do you see the same behavior?

-Jamie


On Tue, Nov 1, 2016 at 4:30 PM, Anchit Jatana <development.anc...@gmail.com>
wrote:

> Hi Jamie,
>
> Thank you so much for your response.
>
> The below query:
>
> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name" =
> 'Sink: Unnamed' AND $timeFilter GROUP BY time(1s)
>
> behaves the same as with the use of the templating variable in the 'All'
> case i.e. shows a plots of junk 'negative values'
>
> It shows accurate results/plot when an additional where clause for
> "subtask_index" is applied to the query.
>
> But without the "subtask_index" where clause (which means for all the
> subtask_indexes) it shows some junk/incorrect values on the graph (both
> highly positive & highly negative values in orders of millions)
>
> Images:
>
> <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/n9816/Incorrect_%28for_all_subtasks%29.png>
>
> <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/n9816/Correct_for_specific_subtask.png>
>
> Regards,
> Anchit
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Flink-Metrics-
> InfluxDB-Grafana-Help-with-query-influxDB-query-for-
> Grafana-to-plot-numRecordsIn-numRen-tp9775p9816.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-01 Thread Jamie Grier
Ahh.. I haven’t used templating all that much but this also works for your
substask variable so that you don’t have to enumerate all the possible
values:

Template Variable Type: query

query: SHOW TAG VALUES FROM numRecordsIn WITH KEY = "subtask_index"
​

On Tue, Nov 1, 2016 at 2:51 PM, Jamie Grier <ja...@data-artisans.com> wrote:

> Another note.  In the example the template variable type is "custom" and
> the values have to be enumerated manually.  So in your case you would have
> to configure all the possible values of "subtask" to be 0-49.
>
> On Tue, Nov 1, 2016 at 2:43 PM, Jamie Grier <ja...@data-artisans.com>
> wrote:
>
>> This works well for me. This will aggregate the data across all sub-task
>> instances:
>>
>> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name"
>> = 'Sink: Unnamed' AND $timeFilter GROUP BY time(1s)
>>
>> You can also plot each sub-task instance separately on the same graph by
>> doing:
>>
>> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name"
>> = 'Sink: Unnamed' AND $timeFilter GROUP BY time(1s), "subtask_index"
>>
>> Or select just a single subtask instance by using:
>>
>> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name"
>> = 'Sink: Unnamed' AND "subtask_index" = '7' AND $timeFilter GROUP BY
>> time(1s)
>>
>> I haven’t used the templating features much but this also seems to work
>> fine and allows you to select an individual subtask_index or ‘all’ and it
>> works as it should — summing across all subtasks when you select ‘all’.
>>
>> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name"
>> = 'Sink: Unnamed' AND "subtask_index" =~ /^$subtask$/ AND $timeFilter GROUP
>> BY time(1s)
>> ​
>>
>> On Fri, Oct 28, 2016 at 2:53 PM, Anchit Jatana <
>> development.anc...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I'm trying to plot the flink application metrics using grafana backed by
>>> influxdb. I need to plot/monitor the 'numRecordsIn' & 'numRecordsOut' for
>>> each operator/operation. I'm finding it hard to generate the influxdb query
>>> in grafana which can help me make this plot.
>>>
>>> I am able to plot the 'numRecordsIn' & 'numRecordsOut' for each
>>> subtask(parallelism set to 50) of the operator but not the operator as a
>>> whole.
>>>
>>> If somebody has knowledge or has successfully implemented this kind of a
>>> plot on grafana backed by influxdb, please share with me the process/query
>>> to achieve the same.
>>>
>>> Below is the query which I have to monitor the 'numRecordsIn' &
>>> 'numRecordsOut' for each subtask
>>>
>>> SELECT derivative(sum("count"), 10s) FROM "numRecordsOut" WHERE
>>> "task_name" = 'Source: Reading from Kafka' AND "subtask_index" =~
>>> /^$subtask$/ AND $timeFilter GROUP BY time(10s), "task_name"
>>>
>>> PS: $subtask is the templating variable that I'm using in order to have
>>> multiple subtask values. I have tried the 'All' option for this templating
>>> variable- This give me an incorrect plot showing me negative values while
>>> the individual selection of subtask values when selected from the
>>> templating variable drop down yields correct result.
>>>
>>> Thank you!
>>>
>>> Regards,
>>> Anchit
>>>
>>>
>>>
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier <https://twitter.com/jamiegrier>
>> ja...@data-artisans.com
>>
>>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> ja...@data-artisans.com
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-01 Thread Jamie Grier
Another note.  In the example the template variable type is "custom" and
the values have to be enumerated manually.  So in your case you would have
to configure all the possible values of "subtask" to be 0-49.

On Tue, Nov 1, 2016 at 2:43 PM, Jamie Grier <ja...@data-artisans.com> wrote:

> This works well for me. This will aggregate the data across all sub-task
> instances:
>
> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name"
> = 'Sink: Unnamed' AND $timeFilter GROUP BY time(1s)
>
> You can also plot each sub-task instance separately on the same graph by
> doing:
>
> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name"
> = 'Sink: Unnamed' AND $timeFilter GROUP BY time(1s), "subtask_index"
>
> Or select just a single subtask instance by using:
>
> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name"
> = 'Sink: Unnamed' AND "subtask_index" = '7' AND $timeFilter GROUP BY
> time(1s)
>
> I haven’t used the templating features much but this also seems to work
> fine and allows you to select an individual subtask_index or ‘all’ and it
> works as it should — summing across all subtasks when you select ‘all’.
>
> SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name"
> = 'Sink: Unnamed' AND "subtask_index" =~ /^$subtask$/ AND $timeFilter GROUP
> BY time(1s)
> ​
>
> On Fri, Oct 28, 2016 at 2:53 PM, Anchit Jatana <
> development.anc...@gmail.com> wrote:
>
>> Hi All,
>>
>> I'm trying to plot the flink application metrics using grafana backed by
>> influxdb. I need to plot/monitor the 'numRecordsIn' & 'numRecordsOut' for
>> each operator/operation. I'm finding it hard to generate the influxdb query
>> in grafana which can help me make this plot.
>>
>> I am able to plot the 'numRecordsIn' & 'numRecordsOut' for each
>> subtask(parallelism set to 50) of the operator but not the operator as a
>> whole.
>>
>> If somebody has knowledge or has successfully implemented this kind of a
>> plot on grafana backed by influxdb, please share with me the process/query
>> to achieve the same.
>>
>> Below is the query which I have to monitor the 'numRecordsIn' &
>> 'numRecordsOut' for each subtask
>>
>> SELECT derivative(sum("count"), 10s) FROM "numRecordsOut" WHERE
>> "task_name" = 'Source: Reading from Kafka' AND "subtask_index" =~
>> /^$subtask$/ AND $timeFilter GROUP BY time(10s), "task_name"
>>
>> PS: $subtask is the templating variable that I'm using in order to have
>> multiple subtask values. I have tried the 'All' option for this templating
>> variable- This give me an incorrect plot showing me negative values while
>> the individual selection of subtask values when selected from the
>> templating variable drop down yields correct result.
>>
>> Thank you!
>>
>> Regards,
>> Anchit
>>
>>
>>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> ja...@data-artisans.com
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-01 Thread Jamie Grier
This works well for me. This will aggregate the data across all sub-task
instances:

SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name" =
'Sink: Unnamed' AND $timeFilter GROUP BY time(1s)

You can also plot each sub-task instance separately on the same graph by
doing:

SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name" =
'Sink: Unnamed' AND $timeFilter GROUP BY time(1s), "subtask_index"

Or select just a single subtask instance by using:

SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name" =
'Sink: Unnamed' AND "subtask_index" = '7' AND $timeFilter GROUP BY time(1s)

I haven’t used the templating features much but this also seems to work
fine and allows you to select an individual subtask_index or ‘all’ and it
works as it should — summing across all subtasks when you select ‘all’.

SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name" =
'Sink: Unnamed' AND "subtask_index" =~ /^$subtask$/ AND $timeFilter GROUP
BY time(1s)
​

On Fri, Oct 28, 2016 at 2:53 PM, Anchit Jatana <development.anc...@gmail.com
> wrote:

> Hi All,
>
> I'm trying to plot the flink application metrics using grafana backed by
> influxdb. I need to plot/monitor the 'numRecordsIn' & 'numRecordsOut' for
> each operator/operation. I'm finding it hard to generate the influxdb query
> in grafana which can help me make this plot.
>
> I am able to plot the 'numRecordsIn' & 'numRecordsOut' for each
> subtask(parallelism set to 50) of the operator but not the operator as a
> whole.
>
> If somebody has knowledge or has successfully implemented this kind of a
> plot on grafana backed by influxdb, please share with me the process/query
> to achieve the same.
>
> Below is the query which I have to monitor the 'numRecordsIn' &
> 'numRecordsOut' for each subtask
>
> SELECT derivative(sum("count"), 10s) FROM "numRecordsOut" WHERE
> "task_name" = 'Source: Reading from Kafka' AND "subtask_index" =~
> /^$subtask$/ AND $timeFilter GROUP BY time(10s), "task_name"
>
> PS: $subtask is the templating variable that I'm using in order to have
> multiple subtask values. I have tried the 'All' option for this templating
> variable- This give me an incorrect plot showing me negative values while
> the individual selection of subtask values when selected from the
> templating variable drop down yields correct result.
>
> Thank you!
>
> Regards,
> Anchit
>
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Memory Management in Streaming?

2016-09-03 Thread Jamie Grier
Hi Shaosu,

Do you have an estimate on the total size of state you are keeping for the
windows?  How many messages/sec, how large a window, message size, etc
would be good details to include.

Also, which state backend are you using?  Have you considered using the
RocksDB state backend.  This backend will spill Flink state to disk if it's
larger than available RAM.  You'll also probably want to use "fully async"
mode for the RocksDBStateBackend.

-Jamie


On Fri, Sep 2, 2016 at 1:45 PM, Shaosu Liu <s...@uber.com> wrote:

> Hi,
>
> I have had issues when I processed large amount of data (large windows
> where I could not do incremental updates), flink slowed down significantly.
> It did help when I increased the amount of memory and used off heap
> allocation. But it only delayed the onset of the probelm without solving
> it.
>
> Could some one give me some hints on how Flink manage window buffer and
> how streaming manages its memory. I see this page on batch api memory
> management and wonder what is the equivalent for streaming?
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525
>
> --
> Cheers,
> Shaosu
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: set flink yarn jvm options

2016-08-04 Thread Jamie Grier
The YARN client should pass these JVM options along to each of the launched
containers.  Have you tried this?

On Thu, Aug 4, 2016 at 4:07 PM, Prabhu V <vpra...@gmail.com> wrote:

>
> This property i believe affects only the yarn client. I want to set jvm
> opts on application-manager and task-manager containers.
>
> Thanks,
> Prabhu
>
> On Thu, Aug 4, 2016 at 3:07 PM, Jamie Grier <ja...@data-artisans.com>
> wrote:
>
>> Use *env.java.opts*
>>
>> This will be respected by the YARN client.
>>
>>
>>
>> On Thu, Aug 4, 2016 at 11:10 AM, Prabhu V <vpra...@gmail.com> wrote:
>>
>>> The docs mention that
>>>
>>> env.java.opts.jobmanager
>>> env.java.opts.taskmanager
>>>
>>> parameters are available but are ignored by the yarn client, is there a
>>> way to set the jvm opts for yarn ?
>>>
>>> Thanks,
>>> Prabhu
>>>
>>> On Wed, Aug 3, 2016 at 7:03 PM, Prabhu V <vpra...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Is there a way to set jvm options on the yarn application-manager and
>>>> task-manager with flink ?
>>>>
>>>> Thanks,
>>>> Prabhu
>>>>
>>>
>>>
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier <https://twitter.com/jamiegrier>
>> ja...@data-artisans.com
>>
>>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: set flink yarn jvm options

2016-08-04 Thread Jamie Grier
Use *env.java.opts*

This will be respected by the YARN client.



On Thu, Aug 4, 2016 at 11:10 AM, Prabhu V <vpra...@gmail.com> wrote:

> The docs mention that
>
> env.java.opts.jobmanager
> env.java.opts.taskmanager
>
> parameters are available but are ignored by the yarn client, is there a
> way to set the jvm opts for yarn ?
>
> Thanks,
> Prabhu
>
> On Wed, Aug 3, 2016 at 7:03 PM, Prabhu V <vpra...@gmail.com> wrote:
>
>> Hi,
>>
>> Is there a way to set jvm options on the yarn application-manager and
>> task-manager with flink ?
>>
>> Thanks,
>> Prabhu
>>
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Adding and removing operations after execute

2016-07-07 Thread Jamie Grier
Hi Adam,

Another way to do this, depending on your exact requirements, could be to
consume a second stream that essentially "configures" the operators that
make up the Flink job thus dynamically altering the behavior of the job at
runtime.  Whether or not this approach is feasible really depends on
exactly what you're trying to accomplish, though.  For some users this type
of approach works very well.

However, if you really need to add new operators to the running job that's
currently not possible with Flink.  The best approach there is exactly as
Kostas said.

-Jamie


On Thu, Jul 7, 2016 at 5:24 AM, Kostas Kloudas <k.klou...@data-artisans.com>
wrote:

> Hi,
>
> The best way to do so is to use a Flink feature called savepoints. You can
> find more here:
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>
> In a nutshell, savepoints just take a consistent snapshot of the state of
> your job at the time you
> take them, and you can resume execution from that point.
>
> Using this, you can write your initial job, whenever you want to add the
> new operator you take a save point,
> and after adding your new operator, you can start the execution of the new
> job from the point where the old job stopped.
> In addition, the old job can still keep running, in case you need it, so
> there will be no downtime for that.
>
> If this does not cover your use case, it would be helpful to share some
> more information about
> what exactly you want to do, so that we can figure out a solution that
> fits your needs.
>
> Kostas
>
> On Jul 7, 2016, at 1:25 PM, adamlehenbauer <adam.lehenba...@gmail.com>
> wrote:
>
> Hi, I'm exploring using Flink to replace an in-house micro-batch
> application.
> Many of the features and concepts are perfect for what I need, but the
> biggest gap is that there doesn't seem to be a way to add new operations at
> runtime after execute().
>
> What is the preferred approach for adding new operations, windows, etc to a
> running application? Should I start multiple execution contexts?
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Adding-and-removing-operations-after-execute-tp7863.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com <http://nabble.com>.
>
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Timewindow watermarks specific to keys of stream

2016-07-05 Thread Jamie Grier
Hi Madhu,

This is not possible right now but are you sure this is necessary in your
application?  Would the timestamps for stock data really be radically
different for different symbols that occur close together in the input
stream.  The windows themselves are for each key but event time advances at
the operator level, not the key level.  For most applications this works
very well.  If you explain more about why this won't work for you maybe I
can help more.

-Jamie


On Tue, Jul 5, 2016 at 1:14 AM, madhu phatak <phatak@gmail.com> wrote:

> Hi,
> I am trying to analyse the stock market data. In this, for every symbol i
> want to find max of stock price in last 10 mins. I want to generate
> watermarks specific to key rather than across the stream. Is this possible
> in flink?
>
> --
> Regards,
> Madhukara Phatak
> http://datamantra.io/
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: disable console output

2016-07-05 Thread Jamie Grier
Hi Andres,

I believe what you're looking for is to disable the logging.  Have a look
at the log4j.properties file that exists in your /lib directory.
You can configure this to use a NullAppender (or whatever you like).  You
can also selectively just disable logging for particular parts of the class
hierarchy if you can identify the code responsible for the heavy logging.

-Jamie


On Tue, Jul 5, 2016 at 5:13 AM, Andres R. Masegosa <and...@cs.aau.dk> wrote:

> Hi,
>
> I'm having problems when working with flink (local mode) and travis-ci.
> The console output gives raises to big logs files (>4MB).
>
> How can I disable from my Java code (through the Configuration object)
> the progress messages displayed in console?
>
> Thanks,
> Andres
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Failed job restart - flink on yarn

2016-07-05 Thread Jamie Grier
The Kafka client can be configured to commit offsets to Zookeeper
periodically even when those offsets are not used in the normal
fault-tolerance case.  Normally, the Kafka offsets are part of Flink's
normal state.  However, in the absence of this state the FlinkKafkaConsumer
will actually retrieve the last committed offsets so you may not need to do
anything special in your case unless I've misunderstood you.


On Tue, Jul 5, 2016 at 4:18 PM, vpra...@gmail.com <vpra...@gmail.com> wrote:

> Thanks for the reply, It would be great to have the feature to restart a
> failed job from the last checkpoint.
>
>
> Is there a way to pass the initial set of partition-offsets to the
> kafka-client ? In that case I can maintain a list of last processed offsets
> from within my window operation (possibly store the offsets in some
> database) and use that to bootstrap the kafka client upon restart.
>
> I realize that I can probably reset the offsets for the consumer group
> from some external program to the last fully processed offsets and restart
> the job, just want to confirm if there is already a feature in the
> kafka-client.
>
> Thanks,
> Prabhu
>
> On Mon, Jul 4, 2016 at 2:17 AM, Ufuk Celebi [via Apache Flink User Mailing
> List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node=7832=0>> wrote:
>
>> If you just re-submit the job without a savepoint, the Kafka consumer
>> will by default start processing from the latest offset and the
>> operators will be in an empty state. It should be possible to add a
>> feature to Flink, which allows turning the latest checkpoint to a
>> savepoint, from which you then could resume the job after increasing
>> the container memory. But I'm afraid that this won't make it to the
>> next release though. I will open an issue for it though.
>>
>> A work around (more a hack) would be to run in HA mode
>> (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html)
>>
>> and just shut down the YARN containers without cancelling the job. The
>> latest checkpoint meta data should be stored in ZooKeeper and resumed
>> when you restart the cluster. It's really more a hack/abuse of HA
>> though.
>>
>> – Ufuk
>>
>>
>> On Sat, Jul 2, 2016 at 7:09 AM, [hidden email]
>> <http:///user/SendEmail.jtp?type=node=7784=0> <[hidden email]
>> <http:///user/SendEmail.jtp?type=node=7784=1>> wrote:
>>
>> > Hi Jamie,
>> >
>> > Thanks for the reply.
>> >
>> > Yeah i looked at save points, i want to start my job only from the last
>> > checkpoint, this means I have to keep track of when the checkpoint was
>> taken
>> > and the trigger a save point. I am not sure this is the way to go. My
>> state
>> > backend is HDFS and I can see that the checkpoint path has the data
>> that has
>> > been buffered in the window.
>> >
>> > I want to start the job in a way such that it will read the
>> checkpointed
>> > data before the failure and continue processing.
>> >
>> > I realise that the checkpoints are used whenever there is a container
>> > failure, and a new container is obtained. In my case the job failed
>> because
>> > a container failed for the maximum AllowedN umber of failures
>> >
>> > Thanks,
>> > Prabhu
>> >
>> > On Fri, Jul 1, 2016 at 3:54 PM, Jamie Grier [via Apache Flink User
>> Mailing
>> > List archive.] <[hidden email]> wrote:
>>
>> >>
>> >> Hi Prabhu,
>> >>
>> >> Have you taken a look at Flink's savepoints feature?  This allows you
>> to
>> >> make snapshots of your job's state on demand and then at any time
>> restart
>> >> your job from that point:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
>> >>
>> >> Also know that you can use Flink disk-backed state backend as well if
>> >> you're job state is larger than fits in memory.  See
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state_backends.html#the-rocksdbstatebackend
>> >>
>> >>
>> >> -Jamie
>> >>
>> >>
>> >> On Fri, Jul 1, 2016 at 1:34 PM, [hidden email] <[hidden email]> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I have a flink streaming job that reads from kafka, performs a
>> >>> aggregation
>>

Re: Late arriving events

2016-07-05 Thread Jamie Grier
I put a few comments in-line below...

On Tue, Jul 5, 2016 at 4:06 PM, Chen Qin <qinnc...@gmail.com> wrote:

> Hi there,
>
> I understand Flink currently doesn't support handling late arriving
> events. In reality, a exact-once link job needs to deal data missing or
> backfill from time to time without rewind to previous save point, which
> implies restored job suffers blackout while it tried to catch up.
>

Another way to do this is to kick off a parallel job to do the backfill
from the previous savepoint without stopping the current "realtime" job.
This way you would not have to have a "blackout".  This assumes your final
sink can tolerate having some parallel writes to it OR you have two
different sinks and throw a switch from one to another for downstream jobs,
etc.


>
> In general, I don't know if there are good solutions for all of these
> scenarios. Some might keep messages in windows longer.(messages not purged
> yet) Some might kick off another pipeline just dealing with affected
> windows(messages already purged). What would be suggested patterns?
>

Of course, ideally you would just keep data in windows longer such that you
don't purge window state until you're sure there is no more data coming.
The problem with this approach in the real world is that you may be wrong
with whatever time you choose ;)  I would suggest doing the best job
possible upfront by using an appropriate watermark strategy to deal with
most of the data.  Then process the truly late data with a separate path in
the application code.  This "separate" path may have to deal with merging
late data with the data that's already been written to the sink but this is
definitely possible depending on the sink.


> For keeping message longer approach, we need to corrdinate backfill
> messages and current on going messages assigned to windows not firing
> without all of these messages. One challenge of this approach would be
> determine when backfill messages all processed. Ideally there would be a
> customized barrier that travel through entire topology and tell windows
> backfills are done. This works both for non keyed stream and keyed stream.
> I don't think link support this yet. Another way would be use session
> window merge and extent window purging time with some reasonable
> estimation. This approach is based on estimation and may add execution
> latency to those windows.
>
> Which would be suggested way in general?
>
> Thanks,
> Chen
>
>
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Error submitting stand-alone Flink job to EMR YARN cluster

2016-07-03 Thread Jamie Grier
Hi Bruce,

I just spun up an EMR cluster and tried this out.  Hadoop 2.7.2 and Flink
1.0.3.  I ran the exact same command as you and everything works just fine.

Please verify one thing, though.  In your command you do not specify the
path to the Flink executable, which means it's just getting picked up from
your PATH.  Is it possible you're running a flink-1.0.3 example program but
using a different version of Flink?  Try the following:

./flink-1.0.3/bin/flink run -m yarn-cluster -yn 1 -ynm test1:WordCount
./flink-1.0.3/examples/streaming/WordCount.jar

​

-Jamie



On Thu, Jun 30, 2016 at 11:28 AM, Hanson, Bruce <bruce.han...@here.com>
wrote:

> I’m trying to submit a stand-alone Flink job to a YARN cluster running on
> EMR (Elastic MapReduce) nodes in AWS. When it tries to start a container
> for the Job Manager, it fails. The error message from the container is
> below. The command I’m using is:
>
> $ flink run -m yarn-cluster -yn 1 -ynm test1:WordCount
> ./flink-1.0.3/examples/streaming/WordCount.jar
>
> I have tried adding log4j and slf4j libraries to the classpath using -C
> and that doesn’t help.
>
> This does not happen on other YARN clusters I have that are not EMR nodes.
> And it doesn’t happen on my EMR cluster if I use "yarn-session.sh" to
> create a Flink cluster in the YARN cluster and then use “flink run …” to
> submit the job to the Flink cluster.
>
> Does anyone out there know how I could fix this?
>
> Thanks in advance for any help you can give.
>
> Error message in the jobmanager.err file:
>
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>
> SLF4J: Defaulting to no-operation (NOP) logger implementation
>
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/log4j/Level
>
> at org.apache.hadoop.mapred.JobConf.(JobConf.java:357)
>
> at java.lang.Class.forName0(Native Method)
>
> at java.lang.Class.forName(Class.java:278)
>
> at
> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
>
> at
> org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:95)
>
> at
> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:78)
>
> at
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
>
> at org.apache.hadoop.security.Groups.(Groups.java:79)
>
> at org.apache.hadoop.security.Groups.(Groups.java:74)
>
> at
> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:303)
>
> at
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:283)
>
> at
> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:260)
>
> at
> org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:790)
>
> at
> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:760)
>
> at
> org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:633)
>
> at
> org.apache.flink.yarn.ApplicationMasterBase.run(ApplicationMasterBase.scala:64)
>
> at
> org.apache.flink.yarn.ApplicationMaster$.main(ApplicationMaster.scala:36)
>
> at
> org.apache.flink.yarn.ApplicationMaster.main(ApplicationMaster.scala)
>
> Caused by: java.lang.ClassNotFoundException: org.apache.log4j.Level
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>
> ... 18 more
>
>
> *Bruce Hanson*
>
> Software Engineer
>
> HERE Predictive Analytics
>
>
>
> *HERE Seattle*
>
> 701 Pike St, Suite 2000, Seattle, WA 98101
>
> *47° 36' 41" N. 122° 19' 57" W
> <http://here.com/usa/seattle/98101/pike-st/701/map=47.611439,-122.332741,17/title=HERE%20Seattle%20-%20701%20Pike%20Street>*
>
>
>
> <http://360.here.com/>   <https://twitter.com/here>
> <https://www.facebook.com/here><https://linkedin.com/company/heremaps>
>   <https://www.instagram.com/here>
>
>
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Using standalone single node without HA in production, crazy?

2016-07-01 Thread Jamie Grier
I started to answer these questions and then realized I was making an
assumption about your environment.  Do you have a reliable persistent file
system such as HDFS or S3 at your disposal or do you truly mean to run on a
single node?

If the you are truly thinking to run on a single node only there's no way
to make this guaranteed to be reliable.  You would be open to machine and
disk failures, etc.

I think the minimal reasonable production setup must use at least 3
physical nodes with the following services running:

1) HDFS or some other reliable filesystem (for persistent state storage)
2) Zookeeper for the Flink HA JobManager setup

The rest is configuration..

With regard to scaling up after your initial deployment:  right now in the
latest Flink release (1.0.3) you cannot stop and restart a job with a
different parallelism without losing your computed state.  What this means
is that if you know you will likely scale up and you don't want to lose
that state you can provision many, many slots on the TaskManagers you do
run, essentially over-provisioning them, and run your job now with the max
parallelism you expect to need to scale to.  This will all be much simpler
to do in future Flink versions (though not in 1.1) but for now this would
be a decent approach.

In Flink versions after 1.1 Flink will be able to scale parallelism up and
down while preserving all of the previously computed state.

-Jamie


On Fri, Jul 1, 2016 at 6:41 AM, Ryan Crumley <crum...@gmail.com> wrote:

> Hi,
>
> I am evaluating flink for use in stateful streaming application. Some
> information about the intended use:
>
>  - Will run in a mesos cluster and deployed via marathon in a docker
> container
>  - Initial throughput ~ 100 messages per second (from kafka)
>  - Will need to scale to 10x that soon after launch
>  - State will be much larger than memory available
>
> In order to quickly get this out the door I am considering postponing the
> YARN / HA setup of a cluster with the idea that the current application can
> easily fit within a single jvm and handle the throughput. Hopefully by the
> time I need more scale flink support for mesos will be available and I can
> use that to distribute the job to the cluster with minimal code rewrite.
>
> Questions:
> 1. Is this a viable approach? Any pitfalls to be aware of?
>
> 2. What is the correct term for this deployment mode? Single node
> standalone? Local?
>
> 3. Will the RocksDB state backend work in a single jvm mode?
>
> 4. When the single jvm process becomes unhealthy and is restarted by
> marathon will flink recover appropriately or is failure recovery a function
> of HA?
>
> 5. How would I migrate the RocksDB state once I move to HA mode? Is there
> a straight forward path?
>
> Thanks for your time,
>
> Ryan
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Failed job restart - flink on yarn

2016-07-01 Thread Jamie Grier
Hi Prabhu,

Have you taken a look at Flink's savepoints feature?  This allows you to
make snapshots of your job's state on demand and then at any time restart
your job from that point:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html

Also know that you can use Flink disk-backed state backend as well if
you're job state is larger than fits in memory.  See
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state_backends.html#the-rocksdbstatebackend


-Jamie


On Fri, Jul 1, 2016 at 1:34 PM, vpra...@gmail.com <vpra...@gmail.com> wrote:

> Hi,
>
> I have a flink streaming job that reads from kafka, performs a aggregation
> in a window, it ran fine for a while however when the number of events in a
> window crossed a certain limit , the yarn containers failed with Out Of
> Memory. The job was running with 10G containers.
>
> We have about 64G memory on the machine and now I want to restart the job
> with a 20G container (we ran some tests and 20G should be good enough to
> accomodate all the elements from the window).
>
> Is there a way to restart the job from the last checkpoint ?
>
> When I resubmit the job, it starts from the last committed offsets however
> the events that were held in the window at the time of checkpointing seem
> to
> get lost. Is there a way to recover the events buffered within the window
> and were checkpointed before the failure ?
>
> Thanks,
> Prabhu
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7764.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Error submitting stand-alone Flink job to EMR YARN cluster

2016-07-01 Thread Jamie Grier
I know this is really basic but have you verified that you're Flink lib
folder contains log4j-1.2.17.jar?  I imagine that's fine given the
yarn-session.sh approach is working fine.  What version of EMR are you
running?  What version of Flink?

-Jamie


On Thu, Jun 30, 2016 at 11:28 AM, Hanson, Bruce <bruce.han...@here.com>
wrote:

> I’m trying to submit a stand-alone Flink job to a YARN cluster running on
> EMR (Elastic MapReduce) nodes in AWS. When it tries to start a container
> for the Job Manager, it fails. The error message from the container is
> below. The command I’m using is:
>
> $ flink run -m yarn-cluster -yn 1 -ynm test1:WordCount
> ./flink-1.0.3/examples/streaming/WordCount.jar
>
> I have tried adding log4j and slf4j libraries to the classpath using -C
> and that doesn’t help.
>
> This does not happen on other YARN clusters I have that are not EMR nodes.
> And it doesn’t happen on my EMR cluster if I use "yarn-session.sh" to
> create a Flink cluster in the YARN cluster and then use “flink run …” to
> submit the job to the Flink cluster.
>
> Does anyone out there know how I could fix this?
>
> Thanks in advance for any help you can give.
>
> Error message in the jobmanager.err file:
>
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>
> SLF4J: Defaulting to no-operation (NOP) logger implementation
>
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/log4j/Level
>
> at org.apache.hadoop.mapred.JobConf.(JobConf.java:357)
>
> at java.lang.Class.forName0(Native Method)
>
> at java.lang.Class.forName(Class.java:278)
>
> at
> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
>
> at
> org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:95)
>
> at
> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:78)
>
> at
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
>
> at org.apache.hadoop.security.Groups.(Groups.java:79)
>
> at org.apache.hadoop.security.Groups.(Groups.java:74)
>
> at
> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:303)
>
> at
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:283)
>
> at
> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:260)
>
> at
> org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:790)
>
> at
> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:760)
>
> at
> org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:633)
>
> at
> org.apache.flink.yarn.ApplicationMasterBase.run(ApplicationMasterBase.scala:64)
>
> at
> org.apache.flink.yarn.ApplicationMaster$.main(ApplicationMaster.scala:36)
>
> at
> org.apache.flink.yarn.ApplicationMaster.main(ApplicationMaster.scala)
>
> Caused by: java.lang.ClassNotFoundException: org.apache.log4j.Level
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>
> ... 18 more
>
>
> *Bruce Hanson*
>
> Software Engineer
>
> HERE Predictive Analytics
>
>
>
> *HERE Seattle*
>
> 701 Pike St, Suite 2000, Seattle, WA 98101
>
> *47° 36' 41" N. 122° 19' 57" W
> <http://here.com/usa/seattle/98101/pike-st/701/map=47.611439,-122.332741,17/title=HERE%20Seattle%20-%20701%20Pike%20Street>*
>
>
>
> <http://360.here.com/>   <https://twitter.com/here>
> <https://www.facebook.com/here><https://linkedin.com/company/heremaps>
>   <https://www.instagram.com/here>
>
>
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Cassamdra Connector in Scala

2016-06-20 Thread Jamie Grier
This looks like a simple type mismatch.  It's impossible to help with this
without seeing your code, though.  Can you post it here?  Thanks.

-Jamie


On Sun, Jun 19, 2016 at 3:17 PM, Eamon Kavanagh <kavanagh.c.ea...@gmail.com>
wrote:

> Hey Mailing List,
>
> I'm trying to use the Cassandra connector that came out recently (
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/cassandra.html)
> in Scala but I'm having trouble with types when I use
> CassandraSink.addSink(in: DataStream).
>
> If I don't define the type it can't seem to properly infer it and if I do
> define the type I still get an error saying there's a type mismatch.  The
> compile errror is
>
> *error: type arguments [(String, String, Int),Any] do not conform to
> method addSink's type parameter bounds [IN,T <:
> org.apache.flink.api.java.tuple.Tuple]*
>
> Is this a Scala issue?  Should I switch over to Java?
>
>
> Thanks!
> Eamon
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Reading whole files (from S3)

2016-06-07 Thread Jamie Grier
Hi Andrea,

How large are these data files?  The implementation you've mentioned here
is only usable if they are very small.  If so, you're fine.  If not read
on...

Processing XML input files in parallel is tricky.  It's not a great format
for this type of processing as you've seen.  They are tricky to split and
more complex to iterate through than simpler formats. However, others have
implemented XMLInputFormat classes for Hadoop.  Have you looked at these?
Mahout has an XMLInputFormat implementation for example but I haven't used
it directly.

Anyway, you can reuse Hadoop InputFormat implementations in Flink
directly.  This is likely a good route.  See Flink's HadoopInputFormat
class.

-Jamie


On Tue, Jun 7, 2016 at 7:35 AM, Andrea Cisternino <a.cistern...@gmail.com>
wrote:

> Hi all,
>
> I am evaluating Apache Flink for processing large sets of Geospatial data.
> The use case I am working on will involve reading a certain number of GPX
> files stored on Amazon S3.
>
> GPX files are actually XML files and therefore cannot be read on a line by
> line basis.
> One GPX file will produce one or more Java objects that will contain the
> geospatial data we need to process (mostly a list of geographical points).
>
> To cover this use case I tried to extend the FileInputFormat class:
>
> public class WholeFileInputFormat extends FileInputFormat
> {
>   private boolean hasReachedEnd = false;
>
>   public WholeFileInputFormat() {
> unsplittable = true;
>   }
>
>   @Override
>   public void open(FileInputSplit fileSplit) throws IOException {
> super.open(fileSplit);
> hasReachedEnd = false;
>   }
>
>   @Override
>   public String nextRecord(String reuse) throws IOException {
> // uses apache.commons.io.IOUtils
> String fileContent = IOUtils.toString(stream, StandardCharsets.UTF_8);
> hasReachedEnd = true;
> return fileContent;
>   }
>
>   @Override
>   public boolean reachedEnd() throws IOException {
> return hasReachedEnd;
>   }
> }
>
> This class returns the content of the whole file as a string.
>
> Is this the right approach?
> It seems to work when run locally with local files but I wonder if it would
> run into problems when tested in a cluster.
>
> Thanks in advance.
>   Andrea.
>
> --
> Andrea Cisternino, Erlangen, Germany
> GitHub: http://github.com/acisternino
> GitLab: https://gitlab.com/u/acisternino
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Multi-field "sum" function just like "keyBy"

2016-06-07 Thread Jamie Grier
I'm assuming what you're trying to do is essentially sum over two different
fields of your data.  I would do this with my own ReduceFunction.


stream
  .keyBy("someKey")
  .reduce(CustomReduceFunction) // sum whatever fields you want and return
the result

I think it does make sense that Flink could provide a generic sum function
that could sum over multiple fields, though.

-Jamie


On Tue, Jun 7, 2016 at 5:41 AM, Al-Isawi Rami <rami.al-is...@comptel.com>
wrote:

> Hi,
>
> Is there any reason why “keyBy" accepts multi-field, while for example
> “sum” does not.
>
> -Rami
> Disclaimer: This message and any attachments thereto are intended solely
> for the addressed recipient(s) and may contain confidential information. If
> you are not the intended recipient, please notify the sender by reply
> e-mail and delete the e-mail (including any attachments thereto) without
> producing, distributing or retaining any copies thereof. Any review,
> dissemination or other use of, or taking of any action in reliance upon,
> this information by persons or entities other than the intended
> recipient(s) is prohibited. Thank you.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com


Re: Large Numbers of Dynamically Created Jobs

2016-03-22 Thread Jamie Grier
ess powerful than we
>> > would be
>> > > using in production) things fall apart once I get to around
>> 75 jobs.
>> > > Can flink handle a situation like this where we are looking at
>> > > thousands of jobs?
>> > >  2. Is this approach even the right way to go? Is there a
>> different
>> > > approach that would make more sense? Everything will be
>> listening to
>> > > the same kafka topic so the other thought we had was to have
>> 1 job
>> > > that processed everything and was configured by a separate
>> control
>> > > kafka topic. The concern we had there was we would almost
>> completely
>> > > lose insight into what was going on if there was a slow down.
>> > >  3. The current approach we are using for creating dynamic jobs is
>> > > building a common jar and then starting it with the
>> configuration
>> > > data for the individual job. Does this sound reasonable?
>> > >
>> > >
>> > > If any of these questions are answered elsewhere I apologize. I
>> > couldn't
>> > > find any of this being discussed elsewhere.
>> > >
>> > > Thanks for your help.
>> > >
>> > > David
>> >
>> > --
>> > Konstantin Knauf * konstantin.kn...@tngtech.com
>> > <mailto:konstantin.kn...@tngtech.com> * +49-174-3413182
>> > <tel:%2B49-174-3413182>
>> > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> > Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> >
>> >
>>
>> --
>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com