Re: [External] checkpoint metadata not in configured directory state.checkpoints.dir

2019-06-19 Thread Congxian Qiu
Hi, Vishal
If you want to restart from the last competed external checkpoint of the
previous stoped job, you need to track the checkpoint path and restart from
it.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint

Best,
Congxian


Vishal Sharma  于2019年6月19日周三 下午11:38写道:

> Hi Chesnay,
>
> Can you suggest, How should I go about automating job restart from last
> completed externalised checkpoint in case of failure ? I am not sure about
> the path for the latest completed checkpoint.
>
> Thanks,
> Vishal Sharma
>
> On Wed, Jun 19, 2019 at 11:11 PM Chesnay Schepler 
> wrote:
>
>> The _metadata is always stored in the same directory as the checkpoint
>> data.
>>
>> As outlined here
>> 
>> "state.checkpoints.dir" serves as a cluster-wide configuration that _can_
>> be overwritten with a job-specific setting when creating the state-backend.
>>
>> If you want the state-backend to use the configured directory you must
>> configure the state-backend in the configuration as well, as outlined
>> here
>> 
>> .
>>
>> On 19/06/2019 16:26, Vishal Sharma wrote:
>>
>> Hi Folks,
>>
>> I am using flink 1.8 with externalised checkpointing enabled and saving
>> the checkpoints to aws S3.
>>
>> My configuration is as follows :
>>
>> flink-conf.yaml :
>> state.checkpoints.dir: s3a://test-bucket/checkpoint-metadata
>>
>> In application code :
>> env.setStateBackend(new
>> RocksDBStateBackend("s3a://test-bucket/checkpoints", true))
>>
>> As per my understanding, the externalized checkpoint’s meta data is
>> determined from the configuration key "state.checkpoints.dir" and
>> checkpoint data is stored in state backend path.
>>
>> However, In my case, I don't see anything in the metadata directory. The
>> _metadata file is present inside each of the checkpoint directory (chk-6043
>> ...).
>>
>> Is this the expected behavior ? If yes, what is the use of
>> "state.checkpoints.dir" configuration ?
>>
>> My goal is to establish a process to automatically restart the job from
>> last completed externalised checkpoint in case of failure. For this
>> to happen, I need to able to figure out path for the metadata of latest
>> checkpoint.
>>
>> Thanks,
>> Vishal Sharma
>>
>> *Grab is hiring. Learn more at https://grab.careers
>> *
>>
>> By communicating with Grab Inc and/or its subsidiaries, associate
>> companies and jointly controlled entities (“Grab Group”), you are deemed to
>> have consented to the processing of your personal data as set out in the
>> Privacy Notice which can be viewed at https://grab.com/privacy/
>>
>> This email contains confidential information and is only for the intended
>> recipient(s). If you are not the intended recipient(s), please do not
>> disseminate, distribute or copy this email Please notify Grab Group
>> immediately if you have received this by mistake and delete this email from
>> your system. Email transmission cannot be guaranteed to be secure or
>> error-free as any information therein could be intercepted, corrupted,
>> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
>> not accept liability for any errors or omissions in the contents of this
>> email arises as a result of email transmission. All intellectual property
>> rights in this email and attachments therein shall remain vested in Grab
>> Group, unless otherwise provided by law.
>>
>>
>>
> *Grab is hiring. Learn more at https://grab.careers
> *
>
> By communicating with Grab Inc and/or its subsidiaries, associate
> companies and jointly controlled entities (“Grab Group”), you are deemed to
> have consented to the processing of your personal data as set out in the
> Privacy Notice which can be viewed at https://grab.com/privacy/
>
> This email contains confidential information and is only for the intended
> recipient(s). If you are not the intended recipient(s), please do not
> disseminate, distribute or copy this email Please notify Grab Group
> immediately if you have received this by mistake and delete this email from
> your system. Email transmission cannot be guaranteed to be secure or
> error-free as any information therein could be intercepted, corrupted,
> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
> not accept liability for any errors or omissions in the contents of this
> email arises as a result of email transmission. All intellectual property
> rights in this email and attachments therein shall remain vested in Grab
> Group, unless otherwise provided by law.
>


Re: can we do Flink CEP on event stream or batch or both?

2019-06-19 Thread kant kodali
Hi Fabian,

Actually, now that I had gone through my use case I can say that the
equality matches are more like expressions.

for example the *sum(col1, col2) of datasetA = col3 datasetB.*

And these expressions can include, sum, if & else, trim, substring,
absolute_value etc.. and they are submitted by the user in Adhoc fashion.
My job is to apply these expressions on two different streams and identify
the breaks and report.

Any suggestions would be appreciated.

Thanks!



On Tue, Apr 30, 2019 at 2:20 AM Fabian Hueske  wrote:

> Hi,
>
> Stateful streaming applications are typically designed to run continuously
> (i.e., until forever or until they are not needed anymore or replaced).
> May jobs run for weeks or months.
>
> IMO, using CEP for "simple" equality matches would add too much complexity
> for a use case that can be easily solved with a stateful function.
> If your task is to ensure that two streams have the same events, I'd
> recommend to implement a custom DataStream application with a stateful
> ProcessFunction.
> Holding state for two years is certainly possible if you know exactly
> which events to keep, i.e., you do not store the full stream but only those
> few events that have not had a match yet.
>
> If you need to run the same logic also on batch data, you might want to
> check if you can use SQL or the Table API which are designed to work on
> static and streaming data with the same processing semantics.
>
> Best,
> Fabian
>
>
> Am Di., 30. Apr. 2019 um 06:49 Uhr schrieb kant kodali  >:
>
>> Hi All,
>>
>> I have the following questions.
>>
>> 1) can we do Flink CEP on event stream or batch?
>> 2) If we can do streaming I wonder how long can we keep the stream
>> stateful? I also wonder if anyone successfully had done any stateful
>> streaming for days or months(with or without CEP)? or is stateful streaming
>> is mainly to keep state only for a few hours?
>>
>> I have a use case where events are ingested from multiple sources and in
>> theory, the sources are supposed to have the same events however in
>> practice the sources will not have the same events so when the events are
>> ingested from multiple sources the goal is to detect where the "breaks"
>> are(meaning the missing events like exists in one source but not in other)?
>> so I realize this is the typical case for CEP.
>>
>> Also, in this particular use case events that supposed to come 2 years
>> ago can come today and if so, need to update those events also in real time
>> or near real time. Sure there wouldn't be a lot of events that were missed
>> 2 years ago but there will be a few. What would be the best approach?
>>
>> One solution I can think of is to do Stateful CEP with a window of one
>> day or whatever short time period where most events will occur and collect
>> the events that fall beyond that time period(The late ones) into some Kafka
>> topic and have a separate stream analyze the time period of the late ones,
>> construct the corresponding NFA and run through it again.  Please let me
>> know how this sounds or if there is a better way to do it.
>>
>> Thanks!
>>
>>
>>
>>


Updated Flink Documentation and tutorials

2019-06-19 Thread Pankaj Chand
Hello,

Please let me know how to get the updated documentation and tutorials of
Apache Flink.
The stable v1.8 and v1.9-snapshot release of the documentation seems to be
outdated.

Thanks!

Pankaj


Re: [EXTERNAL] Re: How to restart/recover on reboot?

2019-06-19 Thread John Smith
Ok I tried it works! I can setup my cluster with terraform and enable
systemd services! i think I got confused when I looked and it was doing
leader election because all service came up quick!



On Tue, 18 Jun 2019 at 22:35, John Smith  wrote:

> Ah ok we need to pass --host. The command line help sais jobmanager.sh
> ?!?! If I recall. I have to go check tomorrow...
>
> On Tue., Jun. 18, 2019, 10:05 p.m. PoolakkalMukkath, Shakir, <
> shakir_poolakkalmukk...@comcast.com> wrote:
>
>> Hi Nick,
>>
>>
>>
>> It works that way by explicitly setting the –host. I got mislead by the
>> *“only”* word in doc and did not try. Thanks for the help
>>
>>
>>
>> Thanks,
>>
>> Shakir
>>
>> *From: *"Martin, Nick" 
>> *Date: *Tuesday, June 18, 2019 at 6:31 PM
>> *To: *"PoolakkalMukkath, Shakir" ,
>> Till Rohrmann , John Smith 
>> *Cc: *user 
>> *Subject: *RE: [EXTERNAL] Re: How to restart/recover on reboot?
>>
>>
>>
>> Jobmanager.sh takes an optional argument for the hostname to bind to, and
>> start-cluster uses it. If you leave it blank it, the script will use
>> whatever is in flink-conf.yaml (localhost is the default value that ships
>> with flink).
>>
>>
>>
>> The dockerized version of flink runs pretty much the way you’re trying to
>> operate (i.e. each node starts itself), so the entrypoint script out of
>> that is probably a good source of information about how to set it up.
>>
>>
>>
>> *From:* PoolakkalMukkath, Shakir [mailto:
>> shakir_poolakkalmukk...@comcast.com]
>> *Sent:* Tuesday, June 18, 2019 2:15 PM
>> *To:* Till Rohrmann ; John Smith <
>> java.dev@gmail.com>
>> *Cc:* user 
>> *Subject:* EXT :Re: [EXTERNAL] Re: How to restart/recover on reboot?
>>
>>
>>
>> Hi Tim,John,
>>
>>
>>
>> I do agree with the issue John mentioned and have the same problem.
>>
>>
>>
>> We can only *start* a standalone HA cluster with ./start-cluster.sh
>> script. And then when there are failures, we can *restart* those
>> components individually by calling jobmanager.sh/ jobmanager.sh.  This
>> works great
>>
>> But , Like John mentioned, If we want to start the cluster initially
>> itself by running the jobmanager.sh on each JobManager nodes, it is not
>> working. It binds to local and not forming the HA cluster.
>>
>>
>>
>> Thanks,
>>
>> Shakir
>>
>>
>>
>> *From: *Till Rohrmann 
>> *Date: *Tuesday, June 18, 2019 at 4:23 PM
>> *To: *John Smith 
>> *Cc: *user 
>> *Subject: *[EXTERNAL] Re: How to restart/recover on reboot?
>>
>>
>>
>> I guess it should work if you installed a systemd service which simply
>> calls `jobmanager.sh start` or `taskmanager.sh start`.
>>
>>
>>
>> Cheers,
>>
>> Till
>>
>>
>>
>> On Tue, Jun 18, 2019 at 4:29 PM John Smith 
>> wrote:
>>
>> Yes, that is understood. But I don't see why we cannot call jobmanager.sh
>> and taskmanager.sh to build the cluster and have them run as systemd units.
>>
>> I looked at start-cluster.sh and all it does is SSH and call
>> jobmanager.sh which then cascades to taskmanager.sh I just have to pin
>> point what's missing to have systemd service working. In fact calling
>> jobmanager.sh as systemd service actually sees the shared masters, slaves
>> and flink-conf.yaml. But it binds to local host.
>>
>>
>>
>> Maybe one way to do it would be to bootstrap the cluster with
>> ./start-cluster.sh and then install systemd services for jobmanager.sh and
>> tsakmanager.sh
>>
>>
>>
>> Like I said I don't want to have some process in place to remind admins
>> they need to manually start a node every time they patch or a host goes
>> down for what ever reason.
>>
>>
>>
>> On Tue, 18 Jun 2019 at 04:31, Till Rohrmann  wrote:
>>
>> When a single machine fails you should rather call `taskmanager.sh
>> start`/`jobmanager.sh start` to start a single process. `start-cluster.sh`
>> will start multiple processes on different machines.
>>
>>
>>
>> Cheers,
>>
>> Till
>>
>>
>>
>> On Mon, Jun 17, 2019 at 4:30 PM John Smith 
>> wrote:
>>
>> Well some reasons, machine reboots/maintenance etc... Host/VM crashes and
>> restarts. And same goes for the job manager. I don't want/need to have to
>> document/remember some start process for sys admins/devops.
>>
>> So far I have looked at ./start-cluster.sh and all it seems to do is SSH
>> into all the specified nodes and starts the processes using the jobmanager
>> and taskmanager scripts. I don't see anything special in any of the sh
>> scripts.
>> I configured passwordless ssh through terraform and all that works great
>> only when trying to do the manual start through systemd. I may have
>> something missing...
>>
>>
>>
>> On Mon, 17 Jun 2019 at 09:41, Till Rohrmann  wrote:
>>
>> Hi John,
>>
>>
>>
>> I have not much experience wrt setting Flink up via systemd services. Why
>> do you want to do it like that?
>>
>>
>>
>> 1. In standalone mode, Flink won't automatically restart TaskManagers.
>> This only works on Yarn and Mesos atm.
>>
>> 2. In case of a lost TaskManager, you should run `taskmanager.sh start`.
>> This script simply starts a new TaskManager 

Re: [External] checkpoint metadata not in configured directory state.checkpoints.dir

2019-06-19 Thread Vishal Sharma
Hi Chesnay,

Can you suggest, How should I go about automating job restart from last
completed externalised checkpoint in case of failure ? I am not sure about
the path for the latest completed checkpoint.

Thanks,
Vishal Sharma

On Wed, Jun 19, 2019 at 11:11 PM Chesnay Schepler 
wrote:

> The _metadata is always stored in the same directory as the checkpoint
> data.
>
> As outlined here
> 
> "state.checkpoints.dir" serves as a cluster-wide configuration that _can_
> be overwritten with a job-specific setting when creating the state-backend.
>
> If you want the state-backend to use the configured directory you must
> configure the state-backend in the configuration as well, as outlined here
> 
> .
>
> On 19/06/2019 16:26, Vishal Sharma wrote:
>
> Hi Folks,
>
> I am using flink 1.8 with externalised checkpointing enabled and saving
> the checkpoints to aws S3.
>
> My configuration is as follows :
>
> flink-conf.yaml :
> state.checkpoints.dir: s3a://test-bucket/checkpoint-metadata
>
> In application code :
> env.setStateBackend(new
> RocksDBStateBackend("s3a://test-bucket/checkpoints", true))
>
> As per my understanding, the externalized checkpoint’s meta data is
> determined from the configuration key "state.checkpoints.dir" and
> checkpoint data is stored in state backend path.
>
> However, In my case, I don't see anything in the metadata directory. The
> _metadata file is present inside each of the checkpoint directory (chk-6043
> ...).
>
> Is this the expected behavior ? If yes, what is the use of
> "state.checkpoints.dir" configuration ?
>
> My goal is to establish a process to automatically restart the job from
> last completed externalised checkpoint in case of failure. For this
> to happen, I need to able to figure out path for the metadata of latest
> checkpoint.
>
> Thanks,
> Vishal Sharma
>
> *Grab is hiring. Learn more at https://grab.careers
> *
>
> By communicating with Grab Inc and/or its subsidiaries, associate
> companies and jointly controlled entities (“Grab Group”), you are deemed to
> have consented to the processing of your personal data as set out in the
> Privacy Notice which can be viewed at https://grab.com/privacy/
>
> This email contains confidential information and is only for the intended
> recipient(s). If you are not the intended recipient(s), please do not
> disseminate, distribute or copy this email Please notify Grab Group
> immediately if you have received this by mistake and delete this email from
> your system. Email transmission cannot be guaranteed to be secure or
> error-free as any information therein could be intercepted, corrupted,
> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
> not accept liability for any errors or omissions in the contents of this
> email arises as a result of email transmission. All intellectual property
> rights in this email and attachments therein shall remain vested in Grab
> Group, unless otherwise provided by law.
>
>
>

-- 
*_Grab is hiring. Learn more at _**https://grab.careers 
*


By communicating with Grab Inc and/or its 
subsidiaries, associate companies and jointly controlled entities (“Grab 
Group”), you are deemed to have consented to the processing of your 
personal data as set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/ 


This email contains 
confidential information and is only for the intended recipient(s). If you 
are not the intended recipient(s), please do not disseminate, distribute or 
copy this email Please notify Grab Group immediately if you have received 
this by mistake and delete this email from your system. Email transmission 
cannot be guaranteed to be secure or error-free as any information therein 
could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or 
contain viruses. Grab Group do not accept liability for any errors or 
omissions in the contents of this email arises as a result of email 
transmission. All intellectual property rights in this email and 
attachments therein shall remain vested in Grab Group, unless otherwise 
provided by law.



Re: [External] checkpoint metadata not in configured directory state.checkpoints.dir

2019-06-19 Thread Chesnay Schepler

The _metadata is always stored in the same directory as the checkpoint data.

As outlined here 
 
"state.checkpoints.dir" serves as a cluster-wide configuration that 
_can_ be overwritten with a job-specific setting when creating the 
state-backend.


If you want the state-backend to use the configured directory you must 
configure the state-backend in the configuration as well, as outlined 
here 
.


On 19/06/2019 16:26, Vishal Sharma wrote:

Hi Folks,

I am using flink 1.8 with externalised checkpointing enabled and 
saving the checkpoints to aws S3.


My configuration is as follows :

flink-conf.yaml :
state.checkpoints.dir: s3a://test-bucket/checkpoint-metadata

In application code :
env.setStateBackend(new 
RocksDBStateBackend("s3a://test-bucket/checkpoints", true))


As per my understanding, the externalized checkpoint’s meta data is 
determined from the configuration key "|state.checkpoints.dir" and 
checkpoint data is stored in state backend path. |

|
|
|However, In my case, I don't see anything in the metadata directory. 
The _metadata file is present inside each of the checkpoint directory 
(chk-6043 ...).

|
|
|
Is this the expected behavior ? If yes, what is the use of 
"state.checkpoints.dir" configuration ?


My goal is to establish a process to automatically restart the job 
from last completed externalised checkpoint in case of failure. For 
this to happen, I need to able to figure out path for the metadata of 
latest checkpoint.


Thanks,
Vishal Sharma

*/Grab is hiring. Learn more at //https://grab.careers 
/*


By communicating with Grab Inc and/or its subsidiaries, associate 
companies and jointly controlled entities (“Grab Group”), you are 
deemed to have consented to the processing of your personal data as 
set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/


This email contains confidential information and is only for the 
intended recipient(s). If you are not the intended recipient(s), 
please do not disseminate, distribute or copy this email Please notify 
Grab Group immediately if you have received this by mistake and delete 
this email from your system. Email transmission cannot be guaranteed 
to be secure or error-free as any information therein could be 
intercepted, corrupted, lost, destroyed, delayed or incomplete, or 
contain viruses. Grab Group do not accept liability for any errors or 
omissions in the contents of this email arises as a result of email 
transmission. All intellectual property rights in this email and 
attachments therein shall remain vested in Grab Group, unless 
otherwise provided by law.





[External] checkpoint metadata not in configured directory state.checkpoints.dir

2019-06-19 Thread Vishal Sharma
Hi Folks,

I am using flink 1.8 with externalised checkpointing enabled and saving the
checkpoints to aws S3.

My configuration is as follows :

flink-conf.yaml :
state.checkpoints.dir: s3a://test-bucket/checkpoint-metadata

In application code :
env.setStateBackend(new
RocksDBStateBackend("s3a://test-bucket/checkpoints", true))

As per my understanding, the externalized checkpoint’s meta data is
determined from the configuration key "state.checkpoints.dir" and
checkpoint data is stored in state backend path.

However, In my case, I don't see anything in the metadata directory. The
_metadata file is present inside each of the checkpoint directory (chk-6043
...).

Is this the expected behavior ? If yes, what is the use of
"state.checkpoints.dir" configuration ?

My goal is to establish a process to automatically restart the job from
last completed externalised checkpoint in case of failure. For this
to happen, I need to able to figure out path for the metadata of latest
checkpoint.

Thanks,
Vishal Sharma

-- 
*_Grab is hiring. Learn more at _**https://grab.careers 
*


By communicating with Grab Inc and/or its 
subsidiaries, associate companies and jointly controlled entities (“Grab 
Group”), you are deemed to have consented to the processing of your 
personal data as set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/ 


This email contains 
confidential information and is only for the intended recipient(s). If you 
are not the intended recipient(s), please do not disseminate, distribute or 
copy this email Please notify Grab Group immediately if you have received 
this by mistake and delete this email from your system. Email transmission 
cannot be guaranteed to be secure or error-free as any information therein 
could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or 
contain viruses. Grab Group do not accept liability for any errors or 
omissions in the contents of this email arises as a result of email 
transmission. All intellectual property rights in this email and 
attachments therein shall remain vested in Grab Group, unless otherwise 
provided by law.



Re: Simple stateful polling source

2019-06-19 Thread Flavio Pompermaier
Ok great! Thanks everybody for the support

On Wed, Jun 19, 2019 at 3:05 PM Chesnay Schepler  wrote:

> A (Rich)SourceFunction that does not implement RichParallelSourceFunction
> is always run with a parallelism of 1.
>
> On 19/06/2019 14:36, Flavio Pompermaier wrote:
>
> My sourcefunction is intrinsically single-thread. Is there a way to force
> this aspect?
> I can't find a real difference between a RichParallelSourceFunction and
> a RichSourceFunction.
> Is this last (RichSourceFunction) implicitly using parallelism = 1?
>
> On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler 
> wrote:
>
>> It returns a list of states so that state can be re-distributed if the
>> parallelism changes.
>>
>> If you hard-code the interface to return a single value then you're
>> implicitly locking the parallelism.
>> When you reduce the parallelism you'd no longer be able to restore all
>> state, since you have less instances than stored state.
>>
>> On 19/06/2019 14:19, Flavio Pompermaier wrote:
>>
>> It's not clear to me why the source checkpoint returns a list of
>> object...when it could be useful to use a list instead of a single value?
>> The documentation says The returned list should contain one entry for
>> redistributable unit of state" but this is not very clear to me..
>>
>> Best,
>> Flavio
>>
>> On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler 
>> wrote:
>>
>>> This looks fine to me.
>>>
>>> What exactly were you worried about?
>>>
>>> On 19/06/2019 12:33, Flavio Pompermaier wrote:
>>> > Hi to all,
>>> > in my use case I have to ingest data from a rest service, where I
>>> > periodically poll the data (of course a queue would be a better choice
>>> > but this doesn't depend on me).
>>> >
>>> > So I wrote a RichSourceFunction that starts a thread that poll for new
>>> > data.
>>> > However, I'd like to restart from the last "from" value (in the case
>>> > the job is stopped).
>>> >
>>> > My initial thought was to write somewhere the last used date and, on
>>> > job restart, read that date (from a file for example). However, Flink
>>> > stateful source should be a better choice here...am I wrong? So I
>>> > made  my source function implementing ListCheckpointed:
>>> >
>>> > @Override
>>> > public List snapshotState(long checkpointId, long timestamp)
>>> > throws Exception {
>>> >return
>>> Collections.singletonList(pollingThread.getDateFromAsString());
>>> > }
>>> > @Override
>>> > public void restoreState(List state) throws Exception {
>>> > for (String dateFrom : state) {
>>> >  startDateStr = dateFrom;
>>> >  }
>>> > }
>>> >
>>> > @Override
>>> > public void run(SourceContext ctx) throws Exception {
>>> >final Object lock = ctx.getCheckpointLock();
>>> >Client httpClient = getHttpClient();
>>> >try {
>>> >   pollingThread = new MyPollingThread.Builder(baseUrl,
>>> > httpClient)//
>>> >   .setStartDate(startDateStr, datePatternStr)//
>>> >   .build();
>>> >   // start the polling thread
>>> >   new Thread(pr).start();
>>> >  (etc)
>>> > }
>>> >
>>> > Is this the correct approach or did I misunderstood how stateful
>>> > source functions work?
>>> >
>>> > Best,
>>> > Flavio
>>>
>>>
>>>
>>
>>
>
>


Re: Simple stateful polling source

2019-06-19 Thread Chesnay Schepler
A (Rich)SourceFunction that does not implement 
RichParallelSourceFunction is always run with a parallelism of 1.


On 19/06/2019 14:36, Flavio Pompermaier wrote:
My sourcefunction is intrinsically single-thread. Is there a way to 
force this aspect?
I can't find a real difference between a RichParallelSourceFunction 
and a RichSourceFunction.

Is this last (RichSourceFunction) implicitly using parallelism = 1?

On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler > wrote:


It returns a list of states so that state can be re-distributed if
the parallelism changes.

If you hard-code the interface to return a single value then
you're implicitly locking the parallelism.
When you reduce the parallelism you'd no longer be able to restore
all state, since you have less instances than stored state.

On 19/06/2019 14:19, Flavio Pompermaier wrote:

It's not clear to me why the source checkpoint returns a list of
object...when it could be useful to use a list instead of a
single value?
The documentation says The returned list should contain one entry
for redistributable unit of state" but this is not very clear to me..

Best,
Flavio

On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

This looks fine to me.

What exactly were you worried about?

On 19/06/2019 12:33, Flavio Pompermaier wrote:
> Hi to all,
> in my use case I have to ingest data from a rest service,
where I
> periodically poll the data (of course a queue would be a
better choice
> but this doesn't depend on me).
>
> So I wrote a RichSourceFunction that starts a thread that
poll for new
> data.
> However, I'd like to restart from the last "from" value (in
the case
> the job is stopped).
>
> My initial thought was to write somewhere the last used
date and, on
> job restart, read that date (from a file for example).
However, Flink
> stateful source should be a better choice here...am I
wrong? So I
> made  my source function implementing ListCheckpointed:
>
> @Override
> public List snapshotState(long checkpointId, long
timestamp)
> throws Exception {
>return
Collections.singletonList(pollingThread.getDateFromAsString());
> }
> @Override
> public void restoreState(List state) throws Exception {
> for (String dateFrom : state) {
>  startDateStr = dateFrom;
>  }
> }
>
> @Override
> public void run(SourceContext ctx) throws Exception {
>final Object lock = ctx.getCheckpointLock();
>Client httpClient = getHttpClient();
>try {
>   pollingThread = new
MyPollingThread.Builder(baseUrl,
> httpClient)//
>   .setStartDate(startDateStr, datePatternStr)//
>   .build();
>   // start the polling thread
>   new Thread(pr).start();
>  (etc)
> }
>
> Is this the correct approach or did I misunderstood how
stateful
> source functions work?
>
> Best,
> Flavio










Re: Simple stateful polling source

2019-06-19 Thread Flavio Pompermaier
My sourcefunction is intrinsically single-thread. Is there a way to force
this aspect?
I can't find a real difference between a RichParallelSourceFunction and
a RichSourceFunction.
Is this last (RichSourceFunction) implicitly using parallelism = 1?

On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler  wrote:

> It returns a list of states so that state can be re-distributed if the
> parallelism changes.
>
> If you hard-code the interface to return a single value then you're
> implicitly locking the parallelism.
> When you reduce the parallelism you'd no longer be able to restore all
> state, since you have less instances than stored state.
>
> On 19/06/2019 14:19, Flavio Pompermaier wrote:
>
> It's not clear to me why the source checkpoint returns a list of
> object...when it could be useful to use a list instead of a single value?
> The documentation says The returned list should contain one entry for
> redistributable unit of state" but this is not very clear to me..
>
> Best,
> Flavio
>
> On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler 
> wrote:
>
>> This looks fine to me.
>>
>> What exactly were you worried about?
>>
>> On 19/06/2019 12:33, Flavio Pompermaier wrote:
>> > Hi to all,
>> > in my use case I have to ingest data from a rest service, where I
>> > periodically poll the data (of course a queue would be a better choice
>> > but this doesn't depend on me).
>> >
>> > So I wrote a RichSourceFunction that starts a thread that poll for new
>> > data.
>> > However, I'd like to restart from the last "from" value (in the case
>> > the job is stopped).
>> >
>> > My initial thought was to write somewhere the last used date and, on
>> > job restart, read that date (from a file for example). However, Flink
>> > stateful source should be a better choice here...am I wrong? So I
>> > made  my source function implementing ListCheckpointed:
>> >
>> > @Override
>> > public List snapshotState(long checkpointId, long timestamp)
>> > throws Exception {
>> >return
>> Collections.singletonList(pollingThread.getDateFromAsString());
>> > }
>> > @Override
>> > public void restoreState(List state) throws Exception {
>> > for (String dateFrom : state) {
>> >  startDateStr = dateFrom;
>> >  }
>> > }
>> >
>> > @Override
>> > public void run(SourceContext ctx) throws Exception {
>> >final Object lock = ctx.getCheckpointLock();
>> >Client httpClient = getHttpClient();
>> >try {
>> >   pollingThread = new MyPollingThread.Builder(baseUrl,
>> > httpClient)//
>> >   .setStartDate(startDateStr, datePatternStr)//
>> >   .build();
>> >   // start the polling thread
>> >   new Thread(pr).start();
>> >  (etc)
>> > }
>> >
>> > Is this the correct approach or did I misunderstood how stateful
>> > source functions work?
>> >
>> > Best,
>> > Flavio
>>
>>
>>
>
>


partition columns with StreamingFileSink

2019-06-19 Thread Yitzchak Lieberman
Hi.

I'm using the StreamingFileSink for writing partitioned data to s3.
The code is below:

StreamingFileSink sink =
StreamingFileSink.forBulkFormat(new Path("s3a://test-bucket/test"),
ParquetAvroFactory.getParquetWriter(schema, "GZIP"))
.withBucketAssigner(new PartitionBucketAssigner(partitionColumns))
.build();

How can i remove the partition columns from the data (or not populating
them in the GenericRecord)?
My problem is with AWS Glue crawler which creates duplicate columns in the
table.

Thanks,
Yitzchak.


Re: Simple stateful polling source

2019-06-19 Thread Chesnay Schepler
It returns a list of states so that state can be re-distributed if the 
parallelism changes.


If you hard-code the interface to return a single value then you're 
implicitly locking the parallelism.
When you reduce the parallelism you'd no longer be able to restore all 
state, since you have less instances than stored state.


On 19/06/2019 14:19, Flavio Pompermaier wrote:
It's not clear to me why the source checkpoint returns a list of 
object...when it could be useful to use a list instead of a single value?
The documentation says The returned list should contain one entry for 
redistributable unit of state" but this is not very clear to me..


Best,
Flavio

On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler > wrote:


This looks fine to me.

What exactly were you worried about?

On 19/06/2019 12:33, Flavio Pompermaier wrote:
> Hi to all,
> in my use case I have to ingest data from a rest service, where I
> periodically poll the data (of course a queue would be a better
choice
> but this doesn't depend on me).
>
> So I wrote a RichSourceFunction that starts a thread that poll
for new
> data.
> However, I'd like to restart from the last "from" value (in the
case
> the job is stopped).
>
> My initial thought was to write somewhere the last used date
and, on
> job restart, read that date (from a file for example). However,
Flink
> stateful source should be a better choice here...am I wrong? So I
> made  my source function implementing ListCheckpointed:
>
> @Override
> public List snapshotState(long checkpointId, long
timestamp)
> throws Exception {
>return
Collections.singletonList(pollingThread.getDateFromAsString());
> }
> @Override
> public void restoreState(List state) throws Exception {
> for (String dateFrom : state) {
>  startDateStr = dateFrom;
>  }
> }
>
> @Override
> public void run(SourceContext ctx) throws Exception {
>final Object lock = ctx.getCheckpointLock();
>Client httpClient = getHttpClient();
>try {
>   pollingThread = new MyPollingThread.Builder(baseUrl,
> httpClient)//
>   .setStartDate(startDateStr, datePatternStr)//
>   .build();
>   // start the polling thread
>   new Thread(pr).start();
>  (etc)
> }
>
> Is this the correct approach or did I misunderstood how stateful
> source functions work?
>
> Best,
> Flavio







Re: Simple stateful polling source

2019-06-19 Thread Flavio Pompermaier
It's not clear to me why the source checkpoint returns a list of
object...when it could be useful to use a list instead of a single value?
The documentation says The returned list should contain one entry for
redistributable unit of state" but this is not very clear to me..

Best,
Flavio

On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler 
wrote:

> This looks fine to me.
>
> What exactly were you worried about?
>
> On 19/06/2019 12:33, Flavio Pompermaier wrote:
> > Hi to all,
> > in my use case I have to ingest data from a rest service, where I
> > periodically poll the data (of course a queue would be a better choice
> > but this doesn't depend on me).
> >
> > So I wrote a RichSourceFunction that starts a thread that poll for new
> > data.
> > However, I'd like to restart from the last "from" value (in the case
> > the job is stopped).
> >
> > My initial thought was to write somewhere the last used date and, on
> > job restart, read that date (from a file for example). However, Flink
> > stateful source should be a better choice here...am I wrong? So I
> > made  my source function implementing ListCheckpointed:
> >
> > @Override
> > public List snapshotState(long checkpointId, long timestamp)
> > throws Exception {
> >return Collections.singletonList(pollingThread.getDateFromAsString());
> > }
> > @Override
> > public void restoreState(List state) throws Exception {
> > for (String dateFrom : state) {
> >  startDateStr = dateFrom;
> >  }
> > }
> >
> > @Override
> > public void run(SourceContext ctx) throws Exception {
> >final Object lock = ctx.getCheckpointLock();
> >Client httpClient = getHttpClient();
> >try {
> >   pollingThread = new MyPollingThread.Builder(baseUrl,
> > httpClient)//
> >   .setStartDate(startDateStr, datePatternStr)//
> >   .build();
> >   // start the polling thread
> >   new Thread(pr).start();
> >  (etc)
> > }
> >
> > Is this the correct approach or did I misunderstood how stateful
> > source functions work?
> >
> > Best,
> > Flavio
>
>
>


Re: Simple stateful polling source

2019-06-19 Thread Chesnay Schepler

This looks fine to me.

What exactly were you worried about?

On 19/06/2019 12:33, Flavio Pompermaier wrote:

Hi to all,
in my use case I have to ingest data from a rest service, where I 
periodically poll the data (of course a queue would be a better choice 
but this doesn't depend on me).


So I wrote a RichSourceFunction that starts a thread that poll for new 
data.
However, I'd like to restart from the last "from" value (in the case 
the job is stopped).


My initial thought was to write somewhere the last used date and, on 
job restart, read that date (from a file for example). However, Flink 
stateful source should be a better choice here...am I wrong? So I 
made  my source function implementing ListCheckpointed:


@Override
public List snapshotState(long checkpointId, long timestamp) 
throws Exception {

   return Collections.singletonList(pollingThread.getDateFromAsString());
}
@Override
public void restoreState(List state) throws Exception {
for (String dateFrom : state) {
 startDateStr = dateFrom;
 }
}

@Override
public void run(SourceContext ctx) throws Exception {
   final Object lock = ctx.getCheckpointLock();
   Client httpClient = getHttpClient();
   try {
  pollingThread = new MyPollingThread.Builder(baseUrl, 
httpClient)//

  .setStartDate(startDateStr, datePatternStr)//
  .build();
  // start the polling thread
  new Thread(pr).start();
 (etc)
}

Is this the correct approach or did I misunderstood how stateful 
source functions work?


Best,
Flavio





Simple stateful polling source

2019-06-19 Thread Flavio Pompermaier
Hi to all,
in my use case I have to ingest data from a rest service, where I
periodically poll the data (of course a queue would be a better choice but
this doesn't depend on me).

So I wrote a RichSourceFunction that starts a thread that poll for new data.
However, I'd like to restart from the last "from" value (in the case the
job is stopped).

My initial thought was to write somewhere the last used date and, on job
restart, read that date (from a file for example). However, Flink stateful
source should be a better choice here...am I wrong? So I made  my source
function implementing ListCheckpointed:

@Override
public List snapshotState(long checkpointId, long timestamp) throws
Exception {
   return Collections.singletonList(pollingThread.getDateFromAsString());
}
@Override
public void restoreState(List state) throws Exception {
for (String dateFrom : state) {
 startDateStr = dateFrom;
 }
}

@Override
public void run(SourceContext ctx) throws Exception {
   final Object lock = ctx.getCheckpointLock();
   Client httpClient = getHttpClient();
   try {
  pollingThread = new MyPollingThread.Builder(baseUrl,
httpClient)//
  .setStartDate(startDateStr, datePatternStr)//
  .build();
  // start the polling thread
  new Thread(pr).start();
 (etc)
}

Is this the correct approach or did I misunderstood how stateful source
functions work?

Best,
Flavio


Re: Side output in ProcessFunction.onTimer

2019-06-19 Thread Chesnay Schepler
ProcessFunction#onTimer provides an OnTimerContext parameter which 
allows you to use side-outputs.


On 18/06/2019 17:41, Frank Wilson wrote:

Hi,

Is there a way to make side outputs in an onTimer callback in 
ProcessFunction?


I want to side output events that belong to a session that was below 
the minimum duration threshold. Currently these events are just 
discarded but I’d like more traceability.


Thanks,

Frank





Re: Maybe a flink bug. Job keeps in FAILING state

2019-06-19 Thread zhijiang
As long as one task is in canceling state, then the job status might be still 
in canceling state.

@Joshua Do you confirm all of the tasks in topology were already in terminal 
state such as failed or canceled?

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年6月19日(星期三) 16:32
To:Joshua Fan ; user ; Till 
Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

@Till have you see something like this before? Despite all source tasks 
reaching a terminal state on a TM (FAILED) it does not send updates to 
the JM for all of them, but only a single one.

On 18/06/2019 12:14, Joshua Fan wrote:
> Hi All,
> There is a topology of 3 operator, such as, source, parser, and 
> persist. Occasionally, 5 subtasks of the source encounters exception 
> and turns to failed, at the same time, one subtask of the parser runs 
> into exception and turns to failed too. The jobmaster gets a message 
> of the parser's failed. The jobmaster then try to cancel all the 
> subtask, most of the subtasks of the three operator turns to canceled 
> except the 5 subtasks of the source, because the state of the 5 ones 
> is already FAILED before jobmaster try to cancel it. Then the 
> jobmaster can not reach a final state but keeps in  Failing state 
> meanwhile the subtask of the source kees in canceling state.
>
> The job run on a flink 1.7 cluster on yarn, and there is only one tm 
> with 10 slots.
>
> The attached files contains a jm log , tm log and the ui picture.
>
> The exception timestamp is about 2019-06-16 13:42:28.
>
> Yours
> Joshua



Re: Maybe a flink bug. Job keeps in FAILING state

2019-06-19 Thread Chesnay Schepler
@Till have you see something like this before? Despite all source tasks 
reaching a terminal state on a TM (FAILED) it does not send updates to 
the JM for all of them, but only a single one.


On 18/06/2019 12:14, Joshua Fan wrote:

Hi All,
There is a topology of 3 operator, such as, source, parser, and 
persist. Occasionally, 5 subtasks of the source encounters exception 
and turns to failed, at the same time, one subtask of the parser runs 
into exception and turns to failed too. The jobmaster gets a message 
of the parser's failed. The jobmaster then try to cancel all the 
subtask, most of the subtasks of the three operator turns to canceled 
except the 5 subtasks of the source, because the state of the 5 ones 
is already FAILED before jobmaster try to cancel it. Then the 
jobmaster can not reach a final state but keeps in  Failing state 
meanwhile the subtask of the source kees in canceling state.


The job run on a flink 1.7 cluster on yarn, and there is only one tm 
with 10 slots.


The attached files contains a jm log , tm log and the ui picture.

The exception timestamp is about 2019-06-16 13:42:28.

Yours
Joshua





Re: Role of Job Manager

2019-06-19 Thread Pankaj Chand
Hi Biao,

Thank you for your reply!

Please let me know the url of the updated Flink documentation.

The url of the outdated document is:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html


Another page which (tacitly) supports the outdated concept is:
https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html


The website that hosts these pages is also the first result that comes up
when you Google Search for "Flink documentation", and it claims it is a
stable version. The url is:
https://ci.apache.org/projects/flink/flink-docs-stable/

Again, please let me know the url of the updated Flink documentation.

Thank you Biao and Eduardo!

Pankaj

On Tue, Jun 18, 2019 at 11:49 PM Biao Liu  wrote:

> Hi Pankaj,
>
> That's really a good question. There was a refactor of architecture
> before[1]. So there might be some descriptions used the outdated concept.
>
> Before refactoring, Job Manager is a centralized role. It controls whole
> cluster and all jobs which is described in your interpretation 1.
>
> After refactoring, the old Job Manager is separated into several roles,
> Resource Manager, Dispatcher, new Job Manager, etc. The new Job Manager is
> responsible for only one job, which is described in your interpretation 2.
>
> So the document you refer to is outdated. Would you mind telling us the
> URL of this document? I think we should update it to avoid misleading more
> people.
>
> 1.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>
> Eduardo Winpenny Tejedor  于2019年6月19日周三
> 上午1:12写道:
>
>> Hi Pankaj,
>>
>> I have no experience with Hadoop but from the book I gathered there's one
>> Job Manager per application i.e. per jar (as in the example in the first
>> chapter). This is not to say there's one Job Manager per job. Actually I
>> don't think the word Job is defined in the book, I've seen Task defined,
>> and those do have Task Managers
>>
>> Hope this is along the right lines
>>
>> Regards,
>> Eduardo
>>
>> On Tue, 18 Jun 2019, 08:42 Pankaj Chand, 
>> wrote:
>>
>>> I am trying to understand the role of Job Manager in Flink, and have
>>> come across two possibly distinct interpretations.
>>>
>>> 1. The online documentation v1.8 signifies that there is at least one
>>> Job Manager in a cluster, and it is closely tied to the cluster of
>>> machines, by managing all jobs in that cluster of machines.
>>>
>>> This signifies that Flink's Job Manager is much like Hadoop's
>>> Application Manager.
>>>
>>> 2. The book, "Stream Processing with Apache Flink", writes that, "The
>>> Job Manager is the master process that controls the execution of a single
>>> application—each application is controlled by a different Job Manager."
>>>
>>> This signifies that Flink defaults to one Job Manager per job, and the
>>> Job Manager is closely tied to that single job, much like Hadoop's
>>> Application Master for each job.
>>>
>>> Please let me know which one is correct.
>>>
>>> Pankaj
>>>
>>