Re: separation of JVMs for different applications

2016-12-10 Thread Manu Zhang
Created https://issues.apache.org/jira/browse/FLINK-5312.

Thanks,
Manu

On Fri, Dec 9, 2016 at 7:17 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Manu,
>
> afaik there is no JIRA for standalone v2.0 yet. So feel free to open an
> JIRA for it.
>
> Just a small correction, FLIP-6 is not almost finished yet. But we're
> working on it and are happy for every helping hand :-)
>
> Cheers,
> Till
>
> On Fri, Dec 9, 2016 at 2:27 AM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> If there are not any existing jira for standalone v2.0, may I open a new
> one ?
>
> Thanks,
> Manu
>
> On Wed, Dec 7, 2016 at 12:39 PM Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> Good to know that.
>
> Is it the "standalone setup v2.0" section ? The wiki page has no
> Google-Doc-like change histories.
> Any jiras opened for that ? Not sure that will be noticed given FLIP-6 is
> almost finished.
>
> Thanks,
> Manu
>
> On Tue, Dec 6, 2016 at 11:55 PM Stephan Ewen <se...@apache.org> wrote:
>
> Hi!
>
> We are currently changing the resource and process model quite a bit:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> As part of that, I think it makes sense to introduce something like that.
>
> What you can do today is to set TaskManagers to use one slot only, and
> then start multiple TaskManagers per machine. That makes sure that JVMs are
> never shared across machines.
> If you use the "start-cluster.sh" script from Flink, you can enter the
> same hostname multiple times in the workers file, and it will start
> multiple TaskManagers on a machine.
>
> Best,
> Stephan
>
>
>
> On Tue, Dec 6, 2016 at 3:51 AM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> Thanks Stephan,
>
> They don't use YARN now but I think they will consider it.  Do you think
> it would be beneficial to provide such an option as "separate-jvm" in
> stand-alone mode for streaming processor and long running services ? Or do
> you think it would introduce too much complexity ?
>
> Manu
>
> On Tue, Dec 6, 2016 at 1:04 AM Stephan Ewen <se...@apache.org> wrote:
>
> Hi!
>
> Are your customers using YARN? In that case, the default configuration
> will start a new YARN application per Flink job, no JVMs are shared between
> jobs. By default, even each slot has its own JVM.
>
> Greetings,
> Stephan
>
> PS: I think the "spawning new JVMs" is what Till referred to when saying
> "spinning up a new cluster". Keep in mind that Flink is also a batch
> processor, and it handles sequences of short batch jobs (as issued for
> example by interactive shells) and it pre-allocates and manages a lot of
> memory for batch jobs.
>
>
>
> On Mon, Dec 5, 2016 at 3:48 PM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> The pro for the multi-tenant cluster mode is that you can share data
> between jobs and you don't have to spin up a new cluster for each job.
>
>
> I don't think we have to spin up a new cluster for each job if every job
> gets its own JVMs. For examples, Storm will launch a new worker(JVM) for a
> new job when free slots are available. How can we share data between jobs
> and why ?
>
>
>
> On Mon, Dec 5, 2016 at 6:27 PM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
> The pro for the multi-tenant cluster mode is that you can share data
> between jobs and you don't have to spin up a new cluster for each job. This
> might be helpful for scenarios where you want to run many short-lived and
> light-weight jobs.
>
> But the important part is that you don't have to use this method. You can
> also start a new Flink cluster per job which will then execute the job
> isolated from any other jobs (given that you don't submit other jobs to
> this cluster).
>
> Cheers,
> Till
>
> On Sat, Dec 3, 2016 at 2:50 PM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> Thanks Fabian and Till.
>
> We have customers who are interested in using Flink but very concerned
> about that "multiple jobs share the same set of TMs". I've just joined the
> community recently so I'm not sure whether there has been a discussion over
> the "multi-tenant cluster mode" before.
>
> The cons are one job/user's failure may crash another, which is
> unacceptable in a multi-tenant scenario.
> What are the pros ? Do the pros overweigh the cons ?
>
> Manu
>
> On Fri, Dec 2, 2016 at 7:06 PM Till Rohrmann <trohrm...@apache.org> wrote:
>
> Hi Manu,
>
> with Flip-6 we will be able to support stricter application isolation by
> starting for each job a ded

Re: separation of JVMs for different applications

2016-12-08 Thread Manu Zhang
If there are not any existing jira for standalone v2.0, may I open a new
one ?

Thanks,
Manu

On Wed, Dec 7, 2016 at 12:39 PM Manu Zhang <owenzhang1...@gmail.com> wrote:

> Good to know that.
>
> Is it the "standalone setup v2.0" section ? The wiki page has no
> Google-Doc-like change histories.
> Any jiras opened for that ? Not sure that will be noticed given FLIP-6 is
> almost finished.
>
> Thanks,
> Manu
>
> On Tue, Dec 6, 2016 at 11:55 PM Stephan Ewen <se...@apache.org> wrote:
>
> Hi!
>
> We are currently changing the resource and process model quite a bit:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> As part of that, I think it makes sense to introduce something like that.
>
> What you can do today is to set TaskManagers to use one slot only, and
> then start multiple TaskManagers per machine. That makes sure that JVMs are
> never shared across machines.
> If you use the "start-cluster.sh" script from Flink, you can enter the
> same hostname multiple times in the workers file, and it will start
> multiple TaskManagers on a machine.
>
> Best,
> Stephan
>
>
>
> On Tue, Dec 6, 2016 at 3:51 AM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> Thanks Stephan,
>
> They don't use YARN now but I think they will consider it.  Do you think
> it would be beneficial to provide such an option as "separate-jvm" in
> stand-alone mode for streaming processor and long running services ? Or do
> you think it would introduce too much complexity ?
>
> Manu
>
> On Tue, Dec 6, 2016 at 1:04 AM Stephan Ewen <se...@apache.org> wrote:
>
> Hi!
>
> Are your customers using YARN? In that case, the default configuration
> will start a new YARN application per Flink job, no JVMs are shared between
> jobs. By default, even each slot has its own JVM.
>
> Greetings,
> Stephan
>
> PS: I think the "spawning new JVMs" is what Till referred to when saying
> "spinning up a new cluster". Keep in mind that Flink is also a batch
> processor, and it handles sequences of short batch jobs (as issued for
> example by interactive shells) and it pre-allocates and manages a lot of
> memory for batch jobs.
>
>
>
> On Mon, Dec 5, 2016 at 3:48 PM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> The pro for the multi-tenant cluster mode is that you can share data
> between jobs and you don't have to spin up a new cluster for each job.
>
>
> I don't think we have to spin up a new cluster for each job if every job
> gets its own JVMs. For examples, Storm will launch a new worker(JVM) for a
> new job when free slots are available. How can we share data between jobs
> and why ?
>
>
>
> On Mon, Dec 5, 2016 at 6:27 PM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
> The pro for the multi-tenant cluster mode is that you can share data
> between jobs and you don't have to spin up a new cluster for each job. This
> might be helpful for scenarios where you want to run many short-lived and
> light-weight jobs.
>
> But the important part is that you don't have to use this method. You can
> also start a new Flink cluster per job which will then execute the job
> isolated from any other jobs (given that you don't submit other jobs to
> this cluster).
>
> Cheers,
> Till
>
> On Sat, Dec 3, 2016 at 2:50 PM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> Thanks Fabian and Till.
>
> We have customers who are interested in using Flink but very concerned
> about that "multiple jobs share the same set of TMs". I've just joined the
> community recently so I'm not sure whether there has been a discussion over
> the "multi-tenant cluster mode" before.
>
> The cons are one job/user's failure may crash another, which is
> unacceptable in a multi-tenant scenario.
> What are the pros ? Do the pros overweigh the cons ?
>
> Manu
>
> On Fri, Dec 2, 2016 at 7:06 PM Till Rohrmann <trohrm...@apache.org> wrote:
>
> Hi Manu,
>
> with Flip-6 we will be able to support stricter application isolation by
> starting for each job a dedicated JobManager which will execute its tasks
> on TM reserved solely for this job. But at the same time we will continue
> supporting the multi-tenant cluster mode where tasks belonging to multiple
> jobs share the same set of TMs and, thus, might share information between
> them.
>
> Cheers,
> Till
>
> On Fri, Dec 2, 2016 at 11:19 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Manu,
>
> As far as I know, there are not plans to change the stand-alone deployment.
> FLIP-6 is focusing on deployments via resource pr

Re: separation of JVMs for different applications

2016-12-06 Thread Manu Zhang
Good to know that.

Is it the "standalone setup v2.0" section ? The wiki page has no
Google-Doc-like change histories.
Any jiras opened for that ? Not sure that will be noticed given FLIP-6 is
almost finished.

Thanks,
Manu

On Tue, Dec 6, 2016 at 11:55 PM Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> We are currently changing the resource and process model quite a bit:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> As part of that, I think it makes sense to introduce something like that.
>
> What you can do today is to set TaskManagers to use one slot only, and
> then start multiple TaskManagers per machine. That makes sure that JVMs are
> never shared across machines.
> If you use the "start-cluster.sh" script from Flink, you can enter the
> same hostname multiple times in the workers file, and it will start
> multiple TaskManagers on a machine.
>
> Best,
> Stephan
>
>
>
> On Tue, Dec 6, 2016 at 3:51 AM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> Thanks Stephan,
>
> They don't use YARN now but I think they will consider it.  Do you think
> it would be beneficial to provide such an option as "separate-jvm" in
> stand-alone mode for streaming processor and long running services ? Or do
> you think it would introduce too much complexity ?
>
> Manu
>
> On Tue, Dec 6, 2016 at 1:04 AM Stephan Ewen <se...@apache.org> wrote:
>
> Hi!
>
> Are your customers using YARN? In that case, the default configuration
> will start a new YARN application per Flink job, no JVMs are shared between
> jobs. By default, even each slot has its own JVM.
>
> Greetings,
> Stephan
>
> PS: I think the "spawning new JVMs" is what Till referred to when saying
> "spinning up a new cluster". Keep in mind that Flink is also a batch
> processor, and it handles sequences of short batch jobs (as issued for
> example by interactive shells) and it pre-allocates and manages a lot of
> memory for batch jobs.
>
>
>
> On Mon, Dec 5, 2016 at 3:48 PM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> The pro for the multi-tenant cluster mode is that you can share data
> between jobs and you don't have to spin up a new cluster for each job.
>
>
> I don't think we have to spin up a new cluster for each job if every job
> gets its own JVMs. For examples, Storm will launch a new worker(JVM) for a
> new job when free slots are available. How can we share data between jobs
> and why ?
>
>
>
> On Mon, Dec 5, 2016 at 6:27 PM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
> The pro for the multi-tenant cluster mode is that you can share data
> between jobs and you don't have to spin up a new cluster for each job. This
> might be helpful for scenarios where you want to run many short-lived and
> light-weight jobs.
>
> But the important part is that you don't have to use this method. You can
> also start a new Flink cluster per job which will then execute the job
> isolated from any other jobs (given that you don't submit other jobs to
> this cluster).
>
> Cheers,
> Till
>
> On Sat, Dec 3, 2016 at 2:50 PM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> Thanks Fabian and Till.
>
> We have customers who are interested in using Flink but very concerned
> about that "multiple jobs share the same set of TMs". I've just joined the
> community recently so I'm not sure whether there has been a discussion over
> the "multi-tenant cluster mode" before.
>
> The cons are one job/user's failure may crash another, which is
> unacceptable in a multi-tenant scenario.
> What are the pros ? Do the pros overweigh the cons ?
>
> Manu
>
> On Fri, Dec 2, 2016 at 7:06 PM Till Rohrmann <trohrm...@apache.org> wrote:
>
> Hi Manu,
>
> with Flip-6 we will be able to support stricter application isolation by
> starting for each job a dedicated JobManager which will execute its tasks
> on TM reserved solely for this job. But at the same time we will continue
> supporting the multi-tenant cluster mode where tasks belonging to multiple
> jobs share the same set of TMs and, thus, might share information between
> them.
>
> Cheers,
> Till
>
> On Fri, Dec 2, 2016 at 11:19 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Manu,
>
> As far as I know, there are not plans to change the stand-alone deployment.
> FLIP-6 is focusing on deployments via resource providers (YARN, Mesos,
> etc.) which allow to start Flink processes per job.
>
> Till (in CC) is more familiar with the FLIP-6 effort and might be able to
> add more detail.
>
> Best,
> Fabian
>
> 2016-12-01 4:16 GMT+01:00 Manu Zhang <owenzhang1...@gmail.com>:
>
> Hi all,
>
> It seems tasks of different Flink applications can end up in the same JVM
> (TaskManager) in standalone mode. Isn't this fragile since errors in one
> application could crash another ? I checked FLIP-6
> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077> 
> but
> didn't found any mention of changing it in the future.
>
> Any thoughts or have I missed anything ?
>
> Thanks,
> Manu Zhang
>
>
>
>
>
>
>
>


Re: separation of JVMs for different applications

2016-12-05 Thread Manu Zhang
Thanks Stephan,

They don't use YARN now but I think they will consider it.  Do you think it
would be beneficial to provide such an option as "separate-jvm" in
stand-alone mode for streaming processor and long running services ? Or do
you think it would introduce too much complexity ?

Manu

On Tue, Dec 6, 2016 at 1:04 AM Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> Are your customers using YARN? In that case, the default configuration
> will start a new YARN application per Flink job, no JVMs are shared between
> jobs. By default, even each slot has its own JVM.
>
> Greetings,
> Stephan
>
> PS: I think the "spawning new JVMs" is what Till referred to when saying
> "spinning up a new cluster". Keep in mind that Flink is also a batch
> processor, and it handles sequences of short batch jobs (as issued for
> example by interactive shells) and it pre-allocates and manages a lot of
> memory for batch jobs.
>
>
>
> On Mon, Dec 5, 2016 at 3:48 PM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> The pro for the multi-tenant cluster mode is that you can share data
> between jobs and you don't have to spin up a new cluster for each job.
>
>
> I don't think we have to spin up a new cluster for each job if every job
> gets its own JVMs. For examples, Storm will launch a new worker(JVM) for a
> new job when free slots are available. How can we share data between jobs
> and why ?
>
>
>
> On Mon, Dec 5, 2016 at 6:27 PM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
> The pro for the multi-tenant cluster mode is that you can share data
> between jobs and you don't have to spin up a new cluster for each job. This
> might be helpful for scenarios where you want to run many short-lived and
> light-weight jobs.
>
> But the important part is that you don't have to use this method. You can
> also start a new Flink cluster per job which will then execute the job
> isolated from any other jobs (given that you don't submit other jobs to
> this cluster).
>
> Cheers,
> Till
>
> On Sat, Dec 3, 2016 at 2:50 PM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> Thanks Fabian and Till.
>
> We have customers who are interested in using Flink but very concerned
> about that "multiple jobs share the same set of TMs". I've just joined the
> community recently so I'm not sure whether there has been a discussion over
> the "multi-tenant cluster mode" before.
>
> The cons are one job/user's failure may crash another, which is
> unacceptable in a multi-tenant scenario.
> What are the pros ? Do the pros overweigh the cons ?
>
> Manu
>
> On Fri, Dec 2, 2016 at 7:06 PM Till Rohrmann <trohrm...@apache.org> wrote:
>
> Hi Manu,
>
> with Flip-6 we will be able to support stricter application isolation by
> starting for each job a dedicated JobManager which will execute its tasks
> on TM reserved solely for this job. But at the same time we will continue
> supporting the multi-tenant cluster mode where tasks belonging to multiple
> jobs share the same set of TMs and, thus, might share information between
> them.
>
> Cheers,
> Till
>
> On Fri, Dec 2, 2016 at 11:19 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Manu,
>
> As far as I know, there are not plans to change the stand-alone deployment.
> FLIP-6 is focusing on deployments via resource providers (YARN, Mesos,
> etc.) which allow to start Flink processes per job.
>
> Till (in CC) is more familiar with the FLIP-6 effort and might be able to
> add more detail.
>
> Best,
> Fabian
>
> 2016-12-01 4:16 GMT+01:00 Manu Zhang <owenzhang1...@gmail.com>:
>
> Hi all,
>
> It seems tasks of different Flink applications can end up in the same JVM
> (TaskManager) in standalone mode. Isn't this fragile since errors in one
> application could crash another ? I checked FLIP-6
> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077> 
> but
> didn't found any mention of changing it in the future.
>
> Any thoughts or have I missed anything ?
>
> Thanks,
> Manu Zhang
>
>
>
>
>
>
>


Re: separation of JVMs for different applications

2016-12-05 Thread Manu Zhang
>
> The pro for the multi-tenant cluster mode is that you can share data
> between jobs and you don't have to spin up a new cluster for each job.


I don't think we have to spin up a new cluster for each job if every job
gets its own JVMs. For examples, Storm will launch a new worker(JVM) for a
new job when free slots are available. How can we share data between jobs
and why ?



On Mon, Dec 5, 2016 at 6:27 PM, Till Rohrmann <trohrm...@apache.org> wrote:

> The pro for the multi-tenant cluster mode is that you can share data
> between jobs and you don't have to spin up a new cluster for each job. This
> might be helpful for scenarios where you want to run many short-lived and
> light-weight jobs.
>
> But the important part is that you don't have to use this method. You can
> also start a new Flink cluster per job which will then execute the job
> isolated from any other jobs (given that you don't submit other jobs to
> this cluster).
>
> Cheers,
> Till
>
> On Sat, Dec 3, 2016 at 2:50 PM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
>> Thanks Fabian and Till.
>>
>> We have customers who are interested in using Flink but very concerned
>> about that "multiple jobs share the same set of TMs". I've just joined the
>> community recently so I'm not sure whether there has been a discussion over
>> the "multi-tenant cluster mode" before.
>>
>> The cons are one job/user's failure may crash another, which is
>> unacceptable in a multi-tenant scenario.
>> What are the pros ? Do the pros overweigh the cons ?
>>
>> Manu
>>
>> On Fri, Dec 2, 2016 at 7:06 PM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Manu,
>>>
>>> with Flip-6 we will be able to support stricter application isolation by
>>> starting for each job a dedicated JobManager which will execute its tasks
>>> on TM reserved solely for this job. But at the same time we will continue
>>> supporting the multi-tenant cluster mode where tasks belonging to multiple
>>> jobs share the same set of TMs and, thus, might share information between
>>> them.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Dec 2, 2016 at 11:19 AM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>> Hi Manu,
>>>
>>> As far as I know, there are not plans to change the stand-alone
>>> deployment.
>>> FLIP-6 is focusing on deployments via resource providers (YARN, Mesos,
>>> etc.) which allow to start Flink processes per job.
>>>
>>> Till (in CC) is more familiar with the FLIP-6 effort and might be able
>>> to add more detail.
>>>
>>> Best,
>>> Fabian
>>>
>>> 2016-12-01 4:16 GMT+01:00 Manu Zhang <owenzhang1...@gmail.com>:
>>>
>>> Hi all,
>>>
>>> It seems tasks of different Flink applications can end up in the same
>>> JVM (TaskManager) in standalone mode. Isn't this fragile since errors in
>>> one application could crash another ? I checked FLIP-6
>>> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077> 
>>> but
>>> didn't found any mention of changing it in the future.
>>>
>>> Any thoughts or have I missed anything ?
>>>
>>> Thanks,
>>> Manu Zhang
>>>
>>>
>>>
>>>
>


Re: separation of JVMs for different applications

2016-12-03 Thread Manu Zhang
Thanks Fabian and Till.

We have customers who are interested in using Flink but very concerned
about that "multiple jobs share the same set of TMs". I've just joined the
community recently so I'm not sure whether there has been a discussion over
the "multi-tenant cluster mode" before.

The cons are one job/user's failure may crash another, which is
unacceptable in a multi-tenant scenario.
What are the pros ? Do the pros overweigh the cons ?

Manu

On Fri, Dec 2, 2016 at 7:06 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Manu,
>
> with Flip-6 we will be able to support stricter application isolation by
> starting for each job a dedicated JobManager which will execute its tasks
> on TM reserved solely for this job. But at the same time we will continue
> supporting the multi-tenant cluster mode where tasks belonging to multiple
> jobs share the same set of TMs and, thus, might share information between
> them.
>
> Cheers,
> Till
>
> On Fri, Dec 2, 2016 at 11:19 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Manu,
>
> As far as I know, there are not plans to change the stand-alone deployment.
> FLIP-6 is focusing on deployments via resource providers (YARN, Mesos,
> etc.) which allow to start Flink processes per job.
>
> Till (in CC) is more familiar with the FLIP-6 effort and might be able to
> add more detail.
>
> Best,
> Fabian
>
> 2016-12-01 4:16 GMT+01:00 Manu Zhang <owenzhang1...@gmail.com>:
>
> Hi all,
>
> It seems tasks of different Flink applications can end up in the same JVM
> (TaskManager) in standalone mode. Isn't this fragile since errors in one
> application could crash another ? I checked FLIP-6
> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077> 
> but
> didn't found any mention of changing it in the future.
>
> Any thoughts or have I missed anything ?
>
> Thanks,
> Manu Zhang
>
>
>
>


separation of JVMs for different applications

2016-11-30 Thread Manu Zhang
Hi all,

It seems tasks of different Flink applications can end up in the same JVM
(TaskManager) in standalone mode. Isn't this fragile since errors in one
application could crash another ? I checked FLIP-6
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077> but
didn't found any mention of changing it in the future.

Any thoughts or have I missed anything ?

Thanks,
Manu Zhang


Re: emit partial state in window (streaming)

2016-11-03 Thread Manu Zhang
Hi Luis,

You may try ContinuousEventTimeTrigger

 that continuously fire on given time interval instead of writing your own.
Note that we recently fixed a bug for this trigger so I think only the
trunk version is working.

Cheers,
Manu

On Thu, Nov 3, 2016 at 9:07 PM Kostas Kloudas 
wrote:

> Hi Luis,
>
> Can you try to comment the whole final windowing and see if this is works?
> This includes the following lines:
>
>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
>
>   .trigger(new PartialWindowTrigger<>(partialWindowTime, timeUnit,
> windowTime, timeUnit))
>   .apply(creator.create(), windowAllFold, windowAllMerge);
>
> An additional note is that I would go for registering an event time timer
> at the onEventTime
> instead of checking the timestamp on the onElement(). This is because with
> your implementation,
> in order to fire a computation, you always have to wait for an element
> outside the partial window interval to arrive.
>
> Cheers,
> Kostas
>
> On Nov 3, 2016, at 11:31 AM, Luis Mariano Guerra 
> wrote:
>
>   .windowAll(TumblingEventTimeWindows.of(Time.of(windowTime, timeUnit)))
> //.trigger(new PartialWindowTrigger<>(partialWindowTime,
> timeUnit, windowTime, timeUnit))
> .apply(creator.create(), windowAllFold, windowAllMerge);
>
>
>


Re: watermark trigger doesn't check whether element's timestamp is passed

2016-11-02 Thread Manu Zhang
Thanks, that will be great. I'd like to test against my particular use
cases once your PR is available.

Manu

On Wed, Nov 2, 2016 at 11:09 PM Ventura Del Monte <venturadelmo...@gmail.com>
wrote:

> Hello,
>
> I have just opened the JIRA issue
> <https://issues.apache.org/jira/browse/FLINK-4997> and I have almost
> completed the implementation of this feature. I will keep you posted :)
>
> Cheers,
> Ventura
>
>
>
> This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain
> confidential and/or privileged information. If you are not the addressee or
> authorized to receive this for the addressee, you must not use, copy,
> disclose or take any action based on this message or any information
> herein. If you have received this message in error, please advise the
> sender immediately by reply e-mail and delete this message. Thank you for
> your cooperation.
>
> On Wed, Nov 2, 2016 at 2:18 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> Hi,
> a contributor (Bonaventure Del Monte) has started working on this. He
> should open a Jira this week.
>
> Cheer,
> Aljoscha
>
> On Tue, 1 Nov 2016 at 23:57 aj heller <drfl...@gmail.com> wrote:
>
> Hi Manu, Aljoscha,
>
> I had been interested in implementing FLIP-2, but I haven't been able to
> make time for it. There is no implementation yet that I'm aware of, and
> I'll gladly step aside (or help out how I can) if you or anyone is
> interested to take charge of it.
>
> That said, I'm also not sure if discussions are ongoing. I had hoped to
> prototype the proposal as is, to have something more concrete to discuss.
>
> Cheers,
> aj
> On Nov 1, 2016 3:24 PM, "Manu Zhang" <owenzhang1...@gmail.com> wrote:
>
> Thanks.  The ideal case is to fire after watermark past each element from
> the window but that requires a custom trigger and FLIP-2 as well. The
> enhanced window evictor will help to avoid the last firing.
>
> Are the discussions on FLIP-2 still going on ?
> Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction`
> will be sufficient for my case)
> Is there a workaround now for my case ?
>
> Thanks again for following through this.
> Manu
>
> On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> Ah, I finally understand it. You would a way to query the current
> watermark in the window function to only emit those elements where the
> timestamp is lower than the watermark.
>
> When the window fires again, do you want to emit elements that you emitted
> during the last firing again? If not, I think you also need to use an
> evictor to evict the elements from the window where the timestamp is lower
> than the watermark. With this FLIP
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
>  we
> should be able to extend the WindowFunction Context to also provide the
> current watermark. With this recent PR
> https://github.com/apache/flink/pull/2736 you would be able to evict
> elements from the window state after the window function was called.
>
> Cheers,
> Aljoscha
>
> On Tue, 1 Nov 2016 at 02:27 Manu Zhang <owenzhang1...@gmail.com> wrote:
>
> Yes, here's the example
> https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala
>
> If you print and compare the timestamp of timer with that of "PageView" in
> the outputs, you could see what I mean.
>
> I think the recently introduced TimelyFlatMapFunction is close to what I
> want to achieve. It will be great if we can query time information in the
> window function so I filed
> https://issues.apache.org/jira/browse/FLINK-4953
>
> Thanks for your time.
>
> Manu
>
> On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> Hmm, I don't completely understand what's going on. Could you maybe post
> an example, with the trigger code that shows this behaviour?
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 17:12 Manu Zhang <owenzhang1...@gmail.com> wrote:
>
> Hi,
>
> It's what I'm seeing. If timers are not fired at the end of window, a
> state (in the window) whose timestamp is *after *the timer will also be
> emitted. That's a problem for event-time trigger.
>
> Thanks,
> Manu
>
>
> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> 

Re: watermark trigger doesn't check whether element's timestamp is passed

2016-11-01 Thread Manu Zhang
Thanks.  The ideal case is to fire after watermark past each element from
the window but that requires a custom trigger and FLIP-2 as well. The
enhanced window evictor will help to avoid the last firing.

Are the discussions on FLIP-2 still going on ?
Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction`
will be sufficient for my case)
Is there a workaround now for my case ?

Thanks again for following through this.
Manu

On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek <aljos...@apache.org> wrote:

> Ah, I finally understand it. You would a way to query the current
> watermark in the window function to only emit those elements where the
> timestamp is lower than the watermark.
>
> When the window fires again, do you want to emit elements that you emitted
> during the last firing again? If not, I think you also need to use an
> evictor to evict the elements from the window where the timestamp is lower
> than the watermark. With this FLIP
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
>  we
> should be able to extend the WindowFunction Context to also provide the
> current watermark. With this recent PR
> https://github.com/apache/flink/pull/2736 you would be able to evict
> elements from the window state after the window function was called.
>
> Cheers,
> Aljoscha
>
> On Tue, 1 Nov 2016 at 02:27 Manu Zhang <owenzhang1...@gmail.com> wrote:
>
> Yes, here's the example
> https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala
>
> If you print and compare the timestamp of timer with that of "PageView" in
> the outputs, you could see what I mean.
>
> I think the recently introduced TimelyFlatMapFunction is close to what I
> want to achieve. It will be great if we can query time information in the
> window function so I filed
> https://issues.apache.org/jira/browse/FLINK-4953
>
> Thanks for your time.
>
> Manu
>
> On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> Hmm, I don't completely understand what's going on. Could you maybe post
> an example, with the trigger code that shows this behaviour?
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 17:12 Manu Zhang <owenzhang1...@gmail.com> wrote:
>
> Hi,
>
> It's what I'm seeing. If timers are not fired at the end of window, a
> state (in the window) whose timestamp is *after *the timer will also be
> emitted. That's a problem for event-time trigger.
>
> Thanks,
> Manu
>
>
> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang <owenzhang1...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo;, 1)
> PageView("user1", "http://foo/bar;, 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> <http://foo/bar>*", [1,6])
> PageView("user1", "http://foo/bar/foobar;, 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar <http://foo/bar/foobar>*", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your ex

Re: watermark trigger doesn't check whether element's timestamp is passed

2016-10-31 Thread Manu Zhang
Yes, here's the example
https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala

If you print and compare the timestamp of timer with that of "PageView" in
the outputs, you could see what I mean.

I think the recently introduced TimelyFlatMapFunction is close to what I
want to achieve. It will be great if we can query time information in the
window function so I filed https://issues.apache.org/jira/browse/FLINK-4953

Thanks for your time.

Manu

On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hmm, I don't completely understand what's going on. Could you maybe post
> an example, with the trigger code that shows this behaviour?
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 17:12 Manu Zhang <owenzhang1...@gmail.com> wrote:
>
> Hi,
>
> It's what I'm seeing. If timers are not fired at the end of window, a
> state (in the window) whose timestamp is *after *the timer will also be
> emitted. That's a problem for event-time trigger.
>
> Thanks,
> Manu
>
>
> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang <owenzhang1...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo;, 1)
> PageView("user1", "http://foo/bar;, 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> <http://foo/bar>*", [1,6])
> PageView("user1", "http://foo/bar/foobar;, 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar <http://foo/bar/foobar>*", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang <owenzhang1...@gmail.com> wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>


Re: watermark trigger doesn't check whether element's timestamp is passed

2016-10-27 Thread Manu Zhang
Hi,

It's what I'm seeing. If timers are not fired at the end of window, a state
(in the window) whose timestamp is *after *the timer will also be emitted.
That's a problem for event-time trigger.

Thanks,
Manu


On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang <owenzhang1...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo;, 1)
> PageView("user1", "http://foo/bar;, 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> <http://foo/bar>*", [1,6])
> PageView("user1", "http://foo/bar/foobar;, 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar <http://foo/bar/foobar>*", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang <owenzhang1...@gmail.com> wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>


watermark trigger doesn't check whether element's timestamp is passed

2016-10-23 Thread Manu Zhang
Hi,

Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
which is triggered to emit when watermark passes the timestamp of an
element. For example,

on watermark(1:01), List(("a", 1:00)) is emitted
on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted

It seems that if *("c", 1:06) is processed before watermark(1:04)*
List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
watermark(1:04). This is incorrect since there could be elements with
timestamp between 1:04 and 1:06 that have not arrived yet.

I guess this is because watermark trigger doesn't check whether element's
timestamp has been passed.

Please correct me if any of the above is not right.

Thanks,
Manu Zhang