Re: Use different S3 access key for different S3 bucket

2024-01-18 Thread Josh Mahonin via user
Oops my syntax was a bit off there, as shown in the Hadoop docs, it looks
like:
fs.s3a.bucket..

Josh

>


Re: Use different S3 access key for different S3 bucket

2024-01-18 Thread Josh Mahonin via user
Hi Qing,

You may have some luck with using per-bucket S3 configuration. Assuming
you're using the flink-s3-fs-hadoop plugin, you should be able to apply
different access keys to different buckets, eg:
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Configuring_different_S3_buckets_with_Per-Bucket_Configuration

I'm not certain if the 's3..' syntax will work, you may
need to set the 'fs.s3a..' values directly.

Josh

On Thu, Jan 18, 2024 at 6:02 AM Qing Lim  wrote:

> Hi Jun
>
>
>
> I am indeed talking about processing two different tables, but I don’t see
> any option that allow configuring credentials at Flink table level, do you
> know where is it documented?
>
>
>
> Today we are setting the credentials via Flink conf yaml, which is
> documented here:
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>
>
>
> Thanks
>
>
>
> *From:* Jun Qin 
> *Sent:* 18 January 2024 10:51
> *To:* User ; Qing Lim 
> *Subject:* Re: Use different S3 access key for different S3 bucket
>
>
>
> Hi Qing
>
> The S3 credentials are associated with Flink SQL tables.
>
> I assume you are talking about processing/joining from two different
> tables, backed up by two different S3 buckets. If so, you can provide
> different credentials for different tables, then use the two tables in your
> pipeline.
>
>
>
> Jun
>
> On 18. Jan 2024 at 11:32 +0100, Qing Lim , wrote:
>
> Hi, I am using Flink SQL to create table backed by S3 buckets.
>
>
>
> We are not using AWS S3, so we have to use access key and secret for Auth.
>
>
>
> My pipeline depends on 2 different buckets, each requires different
> credentials, can flink support this?
>
>
>
>
>
>
>
> *Qing Lim* | Marshall Wace LLP, George House, 131 Sloane Street, London,
> SW1X 9AT | E-mail: q@mwam.com | Tel: +44 207 925 4865
>
>
>
> This e-mail and any attachments are confidential to the addressee(s) and
> may contain information that is legally privileged and/or confidential. If
> you are not the intended recipient of this e-mail you are hereby notified
> that any dissemination, distribution, or copying of its content is strictly
> prohibited. If you have received this message in error, please notify the
> sender by return e-mail and destroy the message and all copies in your
> possession.
>
>
> To find out more details about how we may collect, use and share your
> personal information, please see https://www.mwam.com/privacy-policy.
> This includes details of how calls you make to us may be recorded in order
> for us to comply with our legal and regulatory obligations.
>
>
> To the extent that the contents of this email constitutes a financial
> promotion, please note that it is issued only to and/or directed only at
> persons who are professional clients or eligible counterparties as defined
> in the FCA Rules. Any investment products or services described in this
> email are available only to professional clients and eligible
> counterparties. Persons who are not professional clients or eligible
> counterparties should not rely or act on the contents of this email.
>
>
> Marshall Wace LLP is authorised and regulated by the Financial Conduct
> Authority. Marshall Wace LLP is a limited liability partnership registered
> in England and Wales with registered number OC302228 and registered office
> at George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving
> this e-mail as a client, or an investor in an investment vehicle, managed
> or advised by Marshall Wace North America L.P., the sender of this e-mail
> is communicating with you in the sender's capacity as an associated or
> related person of Marshall Wace North America L.P. ("MWNA"), which is
> registered with the US Securities and Exchange Commission ("SEC") as an
> investment adviser. Registration with the SEC does not imply that MWNA or
> its employees possess a certain level of skill or training.
>
>


Re: Filter push-down not working for a custom BatchTableSource

2019-05-03 Thread Josh Bradt
Hi Fabian,

Thanks for taking a look. I've filed this ticket:
https://issues.apache.org/jira/browse/FLINK-12399

Thanks,

Josh

On Fri, May 3, 2019 at 3:41 AM Fabian Hueske  wrote:

> Hi Josh,
>
> The code looks good to me.
> This seems to be a bug then.
> It's strange that it works for ORC.
>
> Would you mind opening a Jira ticket and maybe a simple reproducable code
> example?
>
> Thank you,
> Fabian
>
> Am Do., 2. Mai 2019 um 18:23 Uhr schrieb Josh Bradt <
> josh.br...@klaviyo.com>:
>
>> Hi Fabian,
>>
>> Thanks for your reply. My custom table source does not implement
>> ProjectableTableSource. I believe that isFilterPushedDown is implemented
>> correctly since it's nearly identical to what's written in the
>> OrcTableSource. I pasted a slightly simplified version of the
>> implementation below. If you wouldn't mind reading over it, is there
>> anything obviously wrong?
>>
>> public final class CustomerTableSource implements BatchTableSource,
>> FilterableTableSource {
>>
>> // Iterator that gets data from a REST API as POJO instances
>> private final AppResourceIterator resourceIterator;
>> private final String tableName;
>> private final Class modelClass;
>> private final AppRequestFilter[] filters;
>>
>> public CustomerTableSource(
>> AppResourceIterator resourceIterator,
>> String tableName,
>> Class modelClass) {
>>
>> this(resourceIterator, tableName, modelClass, null);
>> }
>>
>> protected CustomerTableSource(
>> AppResourceIterator resourceIterator,
>> String tableName,
>> Class modelClass,
>> AppRequestFilter[] filters) {
>>
>> this.resourceIterator = resourceIterator;
>> this.tableName = tableName;
>> this.modelClass = modelClass;
>> this.filters = filters;
>> }
>>
>> @Override
>> public TableSource applyPredicate(List predicates) 
>> {
>> List acceptedPredicates = new ArrayList<>();
>> List acceptedFilters = new ArrayList<>();
>>
>> for (final Expression predicate : predicates) {
>> buildFilterForPredicate(predicate).ifPresent(filter -> {
>> acceptedFilters.add(filter);
>> acceptedPredicates.add(predicate);
>> });
>> }
>>
>> predicates.removeAll(acceptedPredicates);
>>
>> return new CustomerTableSource(
>> resourceIterator.withFilters(acceptedFilters),
>> tableName,
>> modelClass,
>> acceptedFilters.toArray(new AppRequestFilter[0])
>> );
>> }
>>
>> public Optional buildFilterForPredicate(Expression 
>> predicate) {
>> // Code for translating an Expression into an AppRequestFilter
>> // Returns Optional.empty() for predicates we don't want to / can't 
>> apply
>> }
>>
>> @Override
>> public boolean isFilterPushedDown() {
>>     return filters != null;
>> }
>>
>> @Override
>> public DataSet getDataSet(ExecutionEnvironment execEnv) {
>> return execEnv.fromCollection(resourceIterator, modelClass);
>> }
>>
>> @Override
>> public TypeInformation getReturnType() {
>> return TypeInformation.of(modelClass);
>> }
>>
>> @Override
>> public TableSchema getTableSchema() {
>> return TableSchema.fromTypeInfo(getReturnType());
>> }
>> }
>>
>>
>> Thanks,
>>
>> Josh
>>
>> On Thu, May 2, 2019 at 3:42 AM Fabian Hueske  wrote:
>>
>>> Hi Josh,
>>>
>>> Does your TableSource also implement ProjectableTableSource?
>>> If yes, you need to make sure that the filter information is also
>>> forwarded if ProjectableTableSource.projectFields() is called after
>>> FilterableTableSource.applyPredicate().
>>> Also make sure to correctly implement
>>> FilterableTableSource.isFilterPushedDown().
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt <
>>> josh.br...@klaviyo.com>:
>>>
>>>> Hi all,
>>>>
>>>> I'm trying to implement filter push-down on a custom BatchTableSource
>>>> that retrieves data from a REST A

Re: Filter push-down not working for a custom BatchTableSource

2019-05-02 Thread Josh Bradt
Hi Fabian,

Thanks for your reply. My custom table source does not implement
ProjectableTableSource. I believe that isFilterPushedDown is implemented
correctly since it's nearly identical to what's written in the
OrcTableSource. I pasted a slightly simplified version of the
implementation below. If you wouldn't mind reading over it, is there
anything obviously wrong?

public final class CustomerTableSource implements BatchTableSource,
FilterableTableSource {

// Iterator that gets data from a REST API as POJO instances
private final AppResourceIterator resourceIterator;
private final String tableName;
private final Class modelClass;
private final AppRequestFilter[] filters;

public CustomerTableSource(
AppResourceIterator resourceIterator,
String tableName,
Class modelClass) {

this(resourceIterator, tableName, modelClass, null);
}

protected CustomerTableSource(
AppResourceIterator resourceIterator,
String tableName,
Class modelClass,
AppRequestFilter[] filters) {

this.resourceIterator = resourceIterator;
this.tableName = tableName;
this.modelClass = modelClass;
this.filters = filters;
}

@Override
public TableSource applyPredicate(List predicates) {
List acceptedPredicates = new ArrayList<>();
List acceptedFilters = new ArrayList<>();

for (final Expression predicate : predicates) {
buildFilterForPredicate(predicate).ifPresent(filter -> {
acceptedFilters.add(filter);
acceptedPredicates.add(predicate);
});
}

predicates.removeAll(acceptedPredicates);

return new CustomerTableSource(
resourceIterator.withFilters(acceptedFilters),
tableName,
modelClass,
acceptedFilters.toArray(new AppRequestFilter[0])
);
}

public Optional
buildFilterForPredicate(Expression predicate) {
// Code for translating an Expression into an AppRequestFilter
// Returns Optional.empty() for predicates we don't want to /
can't apply
}

@Override
public boolean isFilterPushedDown() {
return filters != null;
}

@Override
public DataSet getDataSet(ExecutionEnvironment execEnv) {
return execEnv.fromCollection(resourceIterator, modelClass);
}

@Override
public TypeInformation getReturnType() {
return TypeInformation.of(modelClass);
}

@Override
public TableSchema getTableSchema() {
return TableSchema.fromTypeInfo(getReturnType());
}
}


Thanks,

Josh

On Thu, May 2, 2019 at 3:42 AM Fabian Hueske  wrote:

> Hi Josh,
>
> Does your TableSource also implement ProjectableTableSource?
> If yes, you need to make sure that the filter information is also
> forwarded if ProjectableTableSource.projectFields() is called after
> FilterableTableSource.applyPredicate().
> Also make sure to correctly implement
> FilterableTableSource.isFilterPushedDown().
>
> Hope this helps,
> Fabian
>
> Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt <
> josh.br...@klaviyo.com>:
>
>> Hi all,
>>
>> I'm trying to implement filter push-down on a custom BatchTableSource
>> that retrieves data from a REST API and returns it as POJO instances. I've
>> implemented FilterableTableSource as described in the docs, returning a new
>> instance of my table source containing the predicates that I've removed
>> from the list of predicates passed into applyPredicate. However, when
>> getDataSet is eventually called, it's called on the instance of the table
>> source that was originally registered with the table environment, which
>> does not have any filters in it. I've stepped through the code in a
>> debugger, and applyPredicates is definitely being called, and it's
>> definitely returning new instances of my table source, but they don't seem
>> to be being used.
>>
>> I also played with the OrcTableSource, which is the only example of a
>> push-down filter implementation I could find, and it doesn't behave this
>> way. When I set a breakpoint in getDataSet in that case, it's being called
>> on one of the new instances of the table source that contains the accepted
>> filters.
>>
>> Are there any other requirements for implementing push-down filters that
>> aren't listed in the docs? Or does anyone have any tips for this?
>>
>> Thanks,
>>
>> Josh
>>
>> --
>> Josh Bradt
>> Software Engineer
>> 225 Franklin St, Boston, MA 02110
>> klaviyo.com <https://www.klaviyo.com>
>> [image: Klaviyo Logo]
>>
>

-- 
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com <https://www.klaviyo.com>
[image: Klaviyo Logo]


Filter push-down not working for a custom BatchTableSource

2019-04-30 Thread Josh Bradt
Hi all,

I'm trying to implement filter push-down on a custom BatchTableSource that
retrieves data from a REST API and returns it as POJO instances. I've
implemented FilterableTableSource as described in the docs, returning a new
instance of my table source containing the predicates that I've removed
from the list of predicates passed into applyPredicate. However, when
getDataSet is eventually called, it's called on the instance of the table
source that was originally registered with the table environment, which
does not have any filters in it. I've stepped through the code in a
debugger, and applyPredicates is definitely being called, and it's
definitely returning new instances of my table source, but they don't seem
to be being used.

I also played with the OrcTableSource, which is the only example of a
push-down filter implementation I could find, and it doesn't behave this
way. When I set a breakpoint in getDataSet in that case, it's being called
on one of the new instances of the table source that contains the accepted
filters.

Are there any other requirements for implementing push-down filters that
aren't listed in the docs? Or does anyone have any tips for this?

Thanks,

Josh

-- 
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com <https://www.klaviyo.com>
[image: Klaviyo Logo]


Get get file name when reading from files? Or assign timestamps from file creation time?

2018-04-06 Thread Josh Lemer
Hey there, is it possible to somehow read the filename of elements that are
read from `env.readFile`? In our case, the date of creation is encoded in
the file name.

Otherwise, maybe it is possible to assign timestamps somehow by the file's
creation time directly?

Thanks!


Bucketing Sink does not complete files, when source is from a collection

2018-04-04 Thread Josh Lemer
Hello, I was wondering if I could get some pointers on what I'm doing wrong
here. I posted this on stack overflow
,
but I thought I'd also ask here.

I'm trying to generate some test data using a collection, and write that
data to s3, Flink doesn't seem to do any checkpointing at all when I do
this, but it does do checkpointing when the source comes from s3.

For example, this DOES checkpoint and leaves output files in a completed
state:

```scala

  val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment

  env.setMaxParallelism(128)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  env.enableCheckpointing(2000L)

  env.setStateBackend(new
RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))


  val lines: DataStream[String] = {

val path = "s3a://my_bucket/simple_job/in"

env

  .readFile(

inputFormat = new TextInputFormat(new Path(path)),

filePath = path,

watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,

interval = 5000L

  )

  }


  val sinkFunction: BucketingSink[String] =

new BucketingSink[String]("s3a://my_bucket/simple_job/out")

  .setBucketer(new DateTimeBucketer("-MM-dd--HHmm"))


  lines.addSink(sinkFunction)


  env.execute()
```

Meanwhile, this DOES NOT checkpoint, and leaves files in a .pending state
even after the job has finished:


```scala

  val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment

  env.setMaxParallelism(128)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  env.enableCheckpointing(2000L)

  env.setStateBackend(new
RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))


  val lines: DataStream[String] = env.fromCollection((1 to
100).map(_.toString))


  val sinkFunction: BucketingSink[String] =

new BucketingSink[String]("s3a://my_bucket/simple_job/out")

  .setBucketer(new DateTimeBucketer("-MM-dd--HHmm"))


  lines.addSink(sinkFunction)


  env.execute()
```
Is this a bug in flink or something I'm doing wrong? Thank you!


Re: ExpiredIteratorException when reading from a Kinesis stream

2016-11-05 Thread Josh
I've reset the state and the job appears to be running smoothly again now.
My guess is that this problem was somehow related to my state becoming too
large (it got to around 20GB before the problem began). I would still like
to get to the bottom of what caused this as resetting the job's state is
not a long term solution, so if anyone has any ideas I can restore the
large state and investigate further.

Btw sorry for going a bit off topic on this thread!

On Fri, Nov 4, 2016 at 11:19 AM, Josh <jof...@gmail.com> wrote:

> Hi Scott & Stephan,
>
> The problem has happened a couple more times since yesterday, it's very
> strange as my job was running fine for over a week before this started
> happening. I find that if I restart the job (and restore from the last
> checkpoint) it runs fine for a while (couple of hours) before breaking
> again.
>
> @Scott thanks, I'll try testing with the upgraded versions, though since
> my job was running fine for over a week it feels like there might be
> something else going on here.
>
> @Stephan I see, my sink is a Kafka topic. I only have two nodes in my
> Kafka cluster and CPU and memory usage seems normal on both nodes. I can't
> see anything bad in the task manager logs relating to the Kafka producer
> either. I do have a fairly large state (20GB) but I'm using the latest
> RocksDB state backend (with asynchronous checkpointing). I'm not sure what
> else I can do to investigate this, but let me know if you have any more
> ideas!
>
> Thanks,
> Josh
>
>
> On Thu, Nov 3, 2016 at 6:27 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Is it possible that you have stalls in your topology?
>>
>> Reasons could be:
>>
>>   - The data sink blocks or becomes slow for some periods (where are you
>> sending the data to?)
>>
>>   - If you are using large state and a state backend that only supports
>> synchronous checkpointing, there may be a delay introduced by the checkpoint
>>
>>
>> On Thu, Nov 3, 2016 at 7:21 PM, Scott Kidder <kidder.sc...@gmail.com>
>> wrote:
>>
>>> Hi Steffan & Josh,
>>>
>>> For what it's worth, I've been using the Kinesis connector with very
>>> good results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis
>>> connector KCL and AWS SDK dependencies to the following versions:
>>>
>>> aws.sdk.version: 1.11.34
>>> aws.kinesis-kcl.version: 1.7.0
>>>
>>> My customizations are visible in this commit on my fork:
>>> https://github.com/apache/flink/commit/6d69f99d7cd52b3c2f039
>>> cb4d37518859e159b32
>>>
>>> It might be worth testing with newer AWS SDK & KCL libraries to see if
>>> the problem persists.
>>>
>>> Best,
>>>
>>> --Scott Kidder
>>>
>>>
>>> On Thu, Nov 3, 2016 at 7:08 AM, Josh <jof...@gmail.com> wrote:
>>>
>>>> Hi Gordon,
>>>>
>>>> Thanks for the fast reply!
>>>> You're right about the expired iterator exception occurring just before
>>>> each spike. I can't see any signs of long GC on the task managers... CPU
>>>> has been <15% the whole time when the spikes were taking place and I can't
>>>> see anything unusual in the task manager logs.
>>>>
>>>> But actually I just noticed that the Flink UI showed no successful
>>>> checkpoints during the time of the problem even though my checkpoint
>>>> interval is 15 minutes. So I guess this is probably some kind of Flink
>>>> problem rather than a problem with the Kinesis consumer. Unfortunately I
>>>> can't find anything useful in the logs so not sure what happened!
>>>>
>>>> Josh
>>>>
>>>>
>>>>
>>>> On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <
>>>> tzuli...@apache.org> wrote:
>>>>
>>>>> Hi Josh,
>>>>>
>>>>> That warning message was added as part of FLINK-4514. It pops out
>>>>> whenever a shard iterator was used after 5 minutes it was returned from
>>>>> Kinesis.
>>>>> The only time spent between after a shard iterator was returned and
>>>>> before it was used to fetch the next batch of records, is on deserializing
>>>>> and emitting of the records of the last fetched batch.
>>>>> So unless processing of the last fetched batch took over 5 minutes,
>>>>> this normally shouldn’t happen.
>>>>>
>>>>> Have you noticed any sign of long, constant full GC for your Flink
>

Re: Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

2016-11-04 Thread Josh
Thanks, I didn't know about the -z flag!

I haven't been able to get it to work though (using yarn-cluster, with a
zookeeper root configured to /flink in my flink-conf.yaml)

I can see my job directory in ZK under
/flink/application_1477475694024_0015 and I've tried a few ways to restore
the job:

./bin/flink run -m yarn-cluster -yz /application_1477475694024_0015 
./bin/flink run -m yarn-cluster -yz application_1477475694024_0015 
./bin/flink run -m yarn-cluster -yz /flink/application_1477475694024_0015/

./bin/flink run -m yarn-cluster -yz /flink/application_1477475694024_0015


The job starts from scratch each time, without restored state.

Am I doing something wrong? I've also tried with -z instead of -yz but I'm
using yarn-cluster to run a single job, so I think it should be -yz.



On Fri, Nov 4, 2016 at 2:33 PM, Ufuk Celebi <u...@apache.org> wrote:

> If the configured ZooKeeper paths are still the same, the job should
> be recovered automatically. On each submission a unique ZK namespace
> is used based on the app ID.
>
> So you have in ZK:
> /flink/app_id/...
>
> You would have to set that manually to resume an old application. You
> can do this via -z flag
> (https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/setup/cli.html).
>
> Does this work?
>
> On Fri, Nov 4, 2016 at 3:28 PM, Josh <jof...@gmail.com> wrote:
> > Hi Ufuk,
> >
> > I see, but in my case the failure caused YARN application moved into a
> > finished/failed state - so the application itself is no longer running.
> How
> > can I restart the application (or start a new YARN application) and
> ensure
> > that it uses the checkpoint pointer stored in Zookeeper?
> >
> > Thanks,
> > Josh
> >
> > On Fri, Nov 4, 2016 at 1:52 PM, Ufuk Celebi <u...@apache.org> wrote:
> >>
> >> No you don't need to manually trigger a savepoint. With HA checkpoints
> >> are persisted externally and store a pointer in ZooKeeper to recover
> >> them after a JobManager failure.
> >>
> >> On Fri, Nov 4, 2016 at 2:27 PM, Josh <jof...@gmail.com> wrote:
> >> > I have a follow up question to this - if I'm running a job in
> >> > 'yarn-cluster'
> >> > mode with HA and then at some point the YARN application fails due to
> >> > some
> >> > hardware failure (i.e. the YARN application moves to
> "FINISHED"/"FAILED"
> >> > state), how can I restore the job from the most recent checkpoint?
> >> >
> >> > I can use `flink run -m yarn-cluster -s s3://my-savepoints/id .`
> to
> >> > restore from a savepoint, but what if I haven't manually taken a
> >> > savepoint
> >> > recently?
> >> >
> >> > Thanks,
> >> > Josh
> >> >
> >> > On Fri, Nov 4, 2016 at 10:06 AM, Maximilian Michels <m...@apache.org>
> >> > wrote:
> >> >>
> >> >> Hi Anchit,
> >> >>
> >> >> The documentation mentions that you need Zookeeper in addition to
> >> >> setting the application attempts. Zookeeper is needed to retrieve the
> >> >> current leader for the client and to filter out old leaders in case
> >> >> multiple exist (old processes could even stay alive in Yarn).
> Moreover,
> >> >> it
> >> >> is needed to persist the state of the application.
> >> >>
> >> >>
> >> >> -Max
> >> >>
> >> >>
> >> >> On Thu, Nov 3, 2016 at 7:43 PM, Anchit Jatana
> >> >> <development.anc...@gmail.com> wrote:
> >> >> > Hi Maximilian,
> >> >> >
> >> >> > Thanks for you response. Since, I'm running the application on YARN
> >> >> > cluster
> >> >> > using 'yarn-cluster' mode i.e. using 'flink run -m yarn-cluster ..'
> >> >> > command.
> >> >> > Is there anything more that I need to configure apart from setting
> up
> >> >> > 'yarn.application-attempts: 10' property inside
> conf/flink-conf.yaml.
> >> >> >
> >> >> > Just wished to confirm if there is anything more that I need to
> >> >> > configure to
> >> >> > set up HA on 'yarn-cluster' mode.
> >> >> >
> >> >> > Thank you
> >> >> >
> >> >> > Regards,
> >> >> > Anchit
> >> >> >
> >> >> >
> >> >> >
> >> >> > --
> >> >> > View this message in context:
> >> >> >
> >> >> > http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Flink-Application-on-YARN-failed-on-losing-Job-Manager-No-
> recovery-Need-help-debug-the-cause-from-los-tp9839p9887.html
> >> >> > Sent from the Apache Flink User Mailing List archive. mailing list
> >> >> > archive at Nabble.com.
> >> >
> >> >
> >
> >
>


Re: Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

2016-11-04 Thread Josh
Hi Ufuk,

I see, but in my case the failure caused YARN application moved into a
finished/failed state - so the application itself is no longer running. How
can I restart the application (or start a new YARN application) and ensure
that it uses the checkpoint pointer stored in Zookeeper?

Thanks,
Josh

On Fri, Nov 4, 2016 at 1:52 PM, Ufuk Celebi <u...@apache.org> wrote:

> No you don't need to manually trigger a savepoint. With HA checkpoints
> are persisted externally and store a pointer in ZooKeeper to recover
> them after a JobManager failure.
>
> On Fri, Nov 4, 2016 at 2:27 PM, Josh <jof...@gmail.com> wrote:
> > I have a follow up question to this - if I'm running a job in
> 'yarn-cluster'
> > mode with HA and then at some point the YARN application fails due to
> some
> > hardware failure (i.e. the YARN application moves to "FINISHED"/"FAILED"
> > state), how can I restore the job from the most recent checkpoint?
> >
> > I can use `flink run -m yarn-cluster -s s3://my-savepoints/id .` to
> > restore from a savepoint, but what if I haven't manually taken a
> savepoint
> > recently?
> >
> > Thanks,
> > Josh
> >
> > On Fri, Nov 4, 2016 at 10:06 AM, Maximilian Michels <m...@apache.org>
> wrote:
> >>
> >> Hi Anchit,
> >>
> >> The documentation mentions that you need Zookeeper in addition to
> >> setting the application attempts. Zookeeper is needed to retrieve the
> >> current leader for the client and to filter out old leaders in case
> >> multiple exist (old processes could even stay alive in Yarn). Moreover,
> it
> >> is needed to persist the state of the application.
> >>
> >>
> >> -Max
> >>
> >>
> >> On Thu, Nov 3, 2016 at 7:43 PM, Anchit Jatana
> >> <development.anc...@gmail.com> wrote:
> >> > Hi Maximilian,
> >> >
> >> > Thanks for you response. Since, I'm running the application on YARN
> >> > cluster
> >> > using 'yarn-cluster' mode i.e. using 'flink run -m yarn-cluster ..'
> >> > command.
> >> > Is there anything more that I need to configure apart from setting up
> >> > 'yarn.application-attempts: 10' property inside conf/flink-conf.yaml.
> >> >
> >> > Just wished to confirm if there is anything more that I need to
> >> > configure to
> >> > set up HA on 'yarn-cluster' mode.
> >> >
> >> > Thank you
> >> >
> >> > Regards,
> >> > Anchit
> >> >
> >> >
> >> >
> >> > --
> >> > View this message in context:
> >> > http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Flink-Application-on-YARN-failed-on-losing-Job-Manager-No-
> recovery-Need-help-debug-the-cause-from-los-tp9839p9887.html
> >> > Sent from the Apache Flink User Mailing List archive. mailing list
> >> > archive at Nabble.com.
> >
> >
>


Re: Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

2016-11-04 Thread Josh
I have a follow up question to this - if I'm running a job in
'yarn-cluster' mode with HA and then at some point the YARN application
fails due to some hardware failure (i.e. the YARN application moves to
"FINISHED"/"FAILED" state), how can I restore the job from the most recent
checkpoint?

I can use `flink run -m yarn-cluster -s s3://my-savepoints/id .` to
restore from a savepoint, but what if I haven't manually taken a savepoint
recently?

Thanks,
Josh

On Fri, Nov 4, 2016 at 10:06 AM, Maximilian Michels <m...@apache.org> wrote:

> Hi Anchit,
>
> The documentation mentions that you need Zookeeper in addition to
> setting the application attempts. Zookeeper is needed to retrieve the
> current leader for the client and to filter out old leaders in case
> multiple exist (old processes could even stay alive in Yarn). Moreover, it
> is needed to persist the state of the application.
>
>
> -Max
>
>
> On Thu, Nov 3, 2016 at 7:43 PM, Anchit Jatana
> <development.anc...@gmail.com> wrote:
> > Hi Maximilian,
> >
> > Thanks for you response. Since, I'm running the application on YARN
> cluster
> > using 'yarn-cluster' mode i.e. using 'flink run -m yarn-cluster ..'
> command.
> > Is there anything more that I need to configure apart from setting up
> > 'yarn.application-attempts: 10' property inside conf/flink-conf.yaml.
> >
> > Just wished to confirm if there is anything more that I need to
> configure to
> > set up HA on 'yarn-cluster' mode.
> >
> > Thank you
> >
> > Regards,
> > Anchit
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Flink-
> Application-on-YARN-failed-on-losing-Job-Manager-No-
> recovery-Need-help-debug-the-cause-from-los-tp9839p9887.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>


Re: ExpiredIteratorException when reading from a Kinesis stream

2016-11-04 Thread Josh
Hi Scott & Stephan,

The problem has happened a couple more times since yesterday, it's very
strange as my job was running fine for over a week before this started
happening. I find that if I restart the job (and restore from the last
checkpoint) it runs fine for a while (couple of hours) before breaking
again.

@Scott thanks, I'll try testing with the upgraded versions, though since my
job was running fine for over a week it feels like there might be something
else going on here.

@Stephan I see, my sink is a Kafka topic. I only have two nodes in my Kafka
cluster and CPU and memory usage seems normal on both nodes. I can't see
anything bad in the task manager logs relating to the Kafka producer
either. I do have a fairly large state (20GB) but I'm using the latest
RocksDB state backend (with asynchronous checkpointing). I'm not sure what
else I can do to investigate this, but let me know if you have any more
ideas!

Thanks,
Josh


On Thu, Nov 3, 2016 at 6:27 PM, Stephan Ewen <se...@apache.org> wrote:

> Is it possible that you have stalls in your topology?
>
> Reasons could be:
>
>   - The data sink blocks or becomes slow for some periods (where are you
> sending the data to?)
>
>   - If you are using large state and a state backend that only supports
> synchronous checkpointing, there may be a delay introduced by the checkpoint
>
>
> On Thu, Nov 3, 2016 at 7:21 PM, Scott Kidder <kidder.sc...@gmail.com>
> wrote:
>
>> Hi Steffan & Josh,
>>
>> For what it's worth, I've been using the Kinesis connector with very good
>> results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis connector KCL
>> and AWS SDK dependencies to the following versions:
>>
>> aws.sdk.version: 1.11.34
>> aws.kinesis-kcl.version: 1.7.0
>>
>> My customizations are visible in this commit on my fork:
>> https://github.com/apache/flink/commit/6d69f99d7cd52b3c2f039
>> cb4d37518859e159b32
>>
>> It might be worth testing with newer AWS SDK & KCL libraries to see if
>> the problem persists.
>>
>> Best,
>>
>> --Scott Kidder
>>
>>
>> On Thu, Nov 3, 2016 at 7:08 AM, Josh <jof...@gmail.com> wrote:
>>
>>> Hi Gordon,
>>>
>>> Thanks for the fast reply!
>>> You're right about the expired iterator exception occurring just before
>>> each spike. I can't see any signs of long GC on the task managers... CPU
>>> has been <15% the whole time when the spikes were taking place and I can't
>>> see anything unusual in the task manager logs.
>>>
>>> But actually I just noticed that the Flink UI showed no successful
>>> checkpoints during the time of the problem even though my checkpoint
>>> interval is 15 minutes. So I guess this is probably some kind of Flink
>>> problem rather than a problem with the Kinesis consumer. Unfortunately I
>>> can't find anything useful in the logs so not sure what happened!
>>>
>>> Josh
>>>
>>>
>>>
>>> On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org> wrote:
>>>
>>>> Hi Josh,
>>>>
>>>> That warning message was added as part of FLINK-4514. It pops out
>>>> whenever a shard iterator was used after 5 minutes it was returned from
>>>> Kinesis.
>>>> The only time spent between after a shard iterator was returned and
>>>> before it was used to fetch the next batch of records, is on deserializing
>>>> and emitting of the records of the last fetched batch.
>>>> So unless processing of the last fetched batch took over 5 minutes,
>>>> this normally shouldn’t happen.
>>>>
>>>> Have you noticed any sign of long, constant full GC for your Flink task
>>>> managers? From your description and check in code, the only possible guess
>>>> I can come up with now is that
>>>> the source tasks completely seized to be running for a period of time,
>>>> and when it came back, the shard iterator was unexpectedly found to be
>>>> expired. According to the graph you attached,
>>>> when the iterator was refreshed and tasks successfully fetched a few
>>>> more batches, the source tasks again halted, and so on.
>>>> So you should see that same warning message right before every small
>>>> peak within the graph.
>>>>
>>>> Best Regards,
>>>> Gordon
>>>>
>>>>
>>>> On November 3, 2016 at 7:46:42 PM, Josh (jof...@gmail.com) wrote:
>>>>
>>>> Hey Gordon,
>>>>
>>>> I'

Re: Using Flink with Accumulo

2016-11-03 Thread Josh Elser

Hi Oliver,

Cool stuff. I wish I knew more about Flink to make some better 
suggestions. Some points inline, and sorry in advance if I suggest 
something outright wrong. Hopefully someone from the Flink side can help 
give context where necessary :)


Oliver Swoboda wrote:

Hello,

I'm using Flink with Accumulo and wanted to read data from the database
by using the createHadoopInput function. Therefore I configure an
AccumuloInputFormat. The source code you can find here:
https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java


I'm using a 5 Node Cluster (1 Master, 4 Worker).
Accumulo is installed with Ambari and has 1 Master Server on the Master
Node and 4 Tablet Servers (one on each Worker).
Flink is installed standalone with the Jobmanager on the Master Node and
4 Taskmanagers (one on each Worker). Every Taskmanager can have 4 Tasks,
so there are 32 in total.

First problem I have:
If I start serveral Flink Jobs the client count for Zookeeper in the
Accumulo Overview is constantly increasing. I assume that the used
scanner isn't correctly closed. The client count only decreases to
normal values when I restart Flink.


Hrm, this does seem rather bad. Eventually, you'll saturate the 
connections to ZK and ZK itself will start limiting new connections (per 
the maxClientCnxns property).


This sounds somewhat familiar to 
https://issues.apache.org/jira/browse/ACCUMULO-2113. The lack of a 
proper "close()" method on the Instance interface is a known deficiency. 
I'm not sure how Flink execution happens, so I am kind of just guessing.


You might be able to try to use the CleanUp[1] utility to close out the 
thread pools/connections when your Flink "task" is done.



Second problem I have:
I want to compare aggregations on time series data with Accumulo (with
Iterators) and with flink. Unfortunately, the results vary inexplicable
when I'm using Flink. I wanted to compare the results for a full table
scan (called baseline in the code), but sometimes it takes 17-18 minutes
and sometimes its between 30 and 60 minutes. In the longer case I can
see in the Accumulo Overview that after some time only one worker is
left with running scans and there are just a few entries/s sanned (4
million at the beginning when all workers are running to 200k when the
one worker is left). Because there are 2.5 billion records to scan and
almost 500 million left it takes really long.
This problem doesn't occur with Accumulo using Iterators and a batch
scanner on the master node, each scan has almost identical durations and
graphics in the Accumulo Overview for entries/s, MB/s scanned and seeks
are for each scan the same.


It sounds like maybe your partitioning was sub-optimal and caused one 
task to get a majority of the data? Having the autoAdjustRanges=true (as 
you do by default) should help get many batches of work based on the 
tablet boundaries in Accumulo. I'm not sure how Flink actually executes 
them though.



Yours faithfully,
Oliver Swoboda



[1] 
https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java#L36




Re: ExpiredIteratorException when reading from a Kinesis stream

2016-11-03 Thread Josh
Hi Gordon,

Thanks for the fast reply!
You're right about the expired iterator exception occurring just before
each spike. I can't see any signs of long GC on the task managers... CPU
has been <15% the whole time when the spikes were taking place and I can't
see anything unusual in the task manager logs.

But actually I just noticed that the Flink UI showed no successful
checkpoints during the time of the problem even though my checkpoint
interval is 15 minutes. So I guess this is probably some kind of Flink
problem rather than a problem with the Kinesis consumer. Unfortunately I
can't find anything useful in the logs so not sure what happened!

Josh



On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Josh,
>
> That warning message was added as part of FLINK-4514. It pops out whenever
> a shard iterator was used after 5 minutes it was returned from Kinesis.
> The only time spent between after a shard iterator was returned and before
> it was used to fetch the next batch of records, is on deserializing and
> emitting of the records of the last fetched batch.
> So unless processing of the last fetched batch took over 5 minutes, this
> normally shouldn’t happen.
>
> Have you noticed any sign of long, constant full GC for your Flink task
> managers? From your description and check in code, the only possible guess
> I can come up with now is that
> the source tasks completely seized to be running for a period of time, and
> when it came back, the shard iterator was unexpectedly found to be expired.
> According to the graph you attached,
> when the iterator was refreshed and tasks successfully fetched a few more
> batches, the source tasks again halted, and so on.
> So you should see that same warning message right before every small peak
> within the graph.
>
> Best Regards,
> Gordon
>
>
> On November 3, 2016 at 7:46:42 PM, Josh (jof...@gmail.com) wrote:
>
> Hey Gordon,
>
> I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514)
> with no problems, but yesterday the Kinesis consumer started behaving
> strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec,
> however the Flink Kinesis consumer started to stop consuming for periods of
> time (see the spikes in graph attached which shows data consumed by the
> Flink Kinesis consumer)
>
> Looking in the task manager logs, there are no exceptions however there is
> this log message which I believe is related to the problem:
>
> 2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.
> connectors.kinesis.internals.ShardConsumer  - Encountered an unexpected
> expired iterator AAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/
> tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/
> EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//
> Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore for
> shard KinesisStreamShard{streamName='stream001', shard='{ShardId:
> shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
> 85070511730234615865841151857942042863},SequenceNumberRange:
> {StartingSequenceNumber: 495665429169236488921642479266
> 79091159472198219567464450,}}'}; refreshing the iterator ...
>
> Having restarted the job from my last savepoint, it's consuming the stream
> fine again with no problems.
>
> Do you have any idea what might be causing this, or anything I should do
> to investigate further?
>
> Cheers,
>
> Josh
>
> On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi Steffen,
>>
>> Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in
>> the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for
>> noticing this!).
>> The Flink community is going to release 1.1.3 asap, which will include
>> the fix.
>> If you don’t want to wait for the release and want to try the fix now,
>> you can also build on the current “release-1.1” branch, which already has
>> FLINK-4514 merged.
>> Sorry for the inconvenience. Let me know if you bump into any other
>> problems afterwards.
>>
>> Best Regards,
>> Gordon
>>
>>
>> On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (
>> stef...@hausmann-family.de) wrote:
>>
>> Hi there,
>>
>> I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
>> from a Kinesis stream. However, after a while (the exact duration varies
>> and is in the order of minutes) the Kinesis source doesn't emit any
>> further events and hence Flink doesn't produce any further output.
>> Eventually, an ExpiredIteratorException occurs in one of the task,
>> causing the entire job to fail:
>>
>

Re: ExpiredIteratorException when reading from a Kinesis stream

2016-11-03 Thread Josh
Hey Gordon,

I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) with
no problems, but yesterday the Kinesis consumer started behaving
strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec,
however the Flink Kinesis consumer started to stop consuming for periods of
time (see the spikes in graph attached which shows data consumed by the
Flink Kinesis consumer)

Looking in the task manager logs, there are no exceptions however there is
this log message which I believe is related to the problem:

2016-11-03 09:27:53,782 WARN
 org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer  -
Encountered an unexpected expired iterator
AAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore
for shard KinesisStreamShard{streamName='stream001', shard='{ShardId:
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
85070511730234615865841151857942042863},SequenceNumberRange:
{StartingSequenceNumber:
49566542916923648892164247926679091159472198219567464450,}}'}; refreshing
the iterator ...

Having restarted the job from my last savepoint, it's consuming the stream
fine again with no problems.

Do you have any idea what might be causing this, or anything I should do to
investigate further?

Cheers,

Josh

On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Steffen,
>
> Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in
> the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for
> noticing this!).
> The Flink community is going to release 1.1.3 asap, which will include the
> fix.
> If you don’t want to wait for the release and want to try the fix now, you
> can also build on the current “release-1.1” branch, which already has
> FLINK-4514 merged.
> Sorry for the inconvenience. Let me know if you bump into any other
> problems afterwards.
>
> Best Regards,
> Gordon
>
>
> On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (
> stef...@hausmann-family.de) wrote:
>
> Hi there,
>
> I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
> from a Kinesis stream. However, after a while (the exact duration varies
> and is in the order of minutes) the Kinesis source doesn't emit any
> further events and hence Flink doesn't produce any further output.
> Eventually, an ExpiredIteratorException occurs in one of the task,
> causing the entire job to fail:
>
> > com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator
> expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016
> while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the
> future than the tolerated delay of 30 milliseconds. (Service:
> AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException;
> Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)
>
> This seems to be related to FLINK-4514, which is marked as resovled for
> Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
> running isn't suspended but hangs just a few minutes after the job has
> been started.
>
> I've attached a log file showing the described behavior.
>
> Any idea what may be wrong?
>
> Thanks,
> Steffen
>
>


Re: Checkpointing large RocksDB state to S3 - tips?

2016-10-25 Thread Josh
Hi Aljoscha,

Thanks for the reply!

I found that my stateful operator (with parallelism 10) wasn't equally
split between the task managers on the two nodes (it was split 9/1) - so I
tweaked the task manager / slot configuration until Flink allocated them
equally with 5 instances of the operator on each node. (Just wondering if
there's a better way to get Flink to allocate this specific operator
equally between nodes, regardless of the number of slots available on
each?) Having split the stateful operator equally between 2 nodes, I am
actually able to checkpoint 18.5MB of state in ~4 minutes. Which indicates
an overall throughput of ~77MB/sec (38.5MB/sec per node).

I did what you said and tried uploading a large file from one of those VMs
to S3 using the AWS command line tool. It uploaded at a speed of ~76MB/sec.
Which is nearly double 38MB/sec but at least it's not orders of magnitude
out. Does that sound ok? - I guess there's more that goes on when Flink
takes a checkpoint than just uploading anyway... I've upgraded my cluster
to Flink 1.2-SNAPSHOT yesterday so yeah should be using the fully async
mode.

I'll have a proper look in the logs if I see it crash again, and for now
will just add more nodes whenever we need to speed up the checkpointing.

Thanks,
Josh




On Tue, Oct 25, 2016 at 3:12 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Josh,
> Checkpoints that take longer than the checkpoint interval should not be an
> issue (if you use an up-to-date version of Flink). The checkpoint
> coordinator will not issue another checkpoint while another one is still
> ongoing. Is there maybe some additional data for the crashes? A log perhaps?
>
> Regarding upload speed, yes, each instance of an operator is responsible
> for uploading its state so if state is equally distributed between
> operators on TaskManagers that would mean that each TaskManager would
> upload roughly the same amount of state. It might be interesting to see
> what the raw upload speed is when you have those to VMs upload to S3, if it
> is a lot larger than the speed you're seeing something would be wrong and
> we should investigate. One last thing: are you using the "fully async" mode
> of RocksDB? I think I remember that you do, just checking.
>
> If it is indeed a problem of upload speed to S3 per machine then yes,
> using more instances should speed up checkpointing.
>
> About incremental checkpoints: they're not going to make it into 1.2 with
> the current planning but after that, I don't know yet.
>
> Cheers,
> Aljoscha
>
>
> On Mon, 24 Oct 2016 at 19:06 Josh <jof...@gmail.com> wrote:
>
> Hi all,
>
> I'm running Flink on EMR/YARN with 2x m3.xlarge instances and am
> checkpointing a fairly large RocksDB state to S3.
>
> I've found that when the state size hits 10GB, the checkpoint takes around
> 6 minutes, according to the Flink dashboard. Originally my checkpoint
> interval was 5 minutes for the job, but I've found that the YARN container
> crashes (I guess because the checkpoint time is greater than the checkpoint
> interval), so have now decreased the checkpoint frequency to every 10
> minutes.
>
> I was just wondering if anyone has any tips about how to reduce the
> checkpoint time. Taking 6 minutes to checkpoint ~10GB state means it's
> uploading at ~30MB/sec. I believe the m3.xlarge instances should have
> around 125MB/sec network bandwidth each, so I think the bottleneck is S3.
> Since there are 2 instances, I'm not sure if that means each instance is
> uploading at 15MB/sec - do the state uploads get shared equally among the
> instances, assuming the state is split equally between the task managers?
>
> If the state upload is split between the instances, perhaps the only way
> to speed up the checkpoints is to add more instances and task managers, and
> split the state equally among the task managers?
>
> Also just wondering - is there any chance the incremental checkpoints work
> will be complete any time soon?
>
> Thanks,
> Josh
>
>


Checkpointing large RocksDB state to S3 - tips?

2016-10-24 Thread Josh
Hi all,

I'm running Flink on EMR/YARN with 2x m3.xlarge instances and am
checkpointing a fairly large RocksDB state to S3.

I've found that when the state size hits 10GB, the checkpoint takes around
6 minutes, according to the Flink dashboard. Originally my checkpoint
interval was 5 minutes for the job, but I've found that the YARN container
crashes (I guess because the checkpoint time is greater than the checkpoint
interval), so have now decreased the checkpoint frequency to every 10
minutes.

I was just wondering if anyone has any tips about how to reduce the
checkpoint time. Taking 6 minutes to checkpoint ~10GB state means it's
uploading at ~30MB/sec. I believe the m3.xlarge instances should have
around 125MB/sec network bandwidth each, so I think the bottleneck is S3.
Since there are 2 instances, I'm not sure if that means each instance is
uploading at 15MB/sec - do the state uploads get shared equally among the
instances, assuming the state is split equally between the task managers?

If the state upload is split between the instances, perhaps the only way to
speed up the checkpoints is to add more instances and task managers, and
split the state equally among the task managers?

Also just wondering - is there any chance the incremental checkpoints work
will be complete any time soon?

Thanks,
Josh


Re: Exception when restoring state from RocksDB - how to recover?

2016-10-11 Thread Josh
Ah ok great, thanks! I will try upgrading sometime this week then.

Cheers,
Josh

On Tue, Oct 11, 2016 at 5:37 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Josh!
>
> I think the master has gotten more stable with respect to that. The issue
> you mentioned should be fixed.
>
> Another big set of changes (the last big batch) is going in in the next
> days - this time for re-sharding timers (window operator) and other state
> that is not organized by key.
>
> If you want to be a bit conservative, give it a few days before jumping
> onto the latest master. If you are brave, give it a shot now ;-)
>
> Greetings,
> Stephan
>
>
> On Tue, Oct 11, 2016 at 5:43 PM, Josh <jof...@gmail.com> wrote:
>
>> Hi Stephan,
>>
>> Thanks, that sounds good!
>>
>> I'm planning to upgrade to Flink 1.2-SNAPSHOT as soon as possible - I was
>> delaying upgrading due to the issues with restoring operator state you
>> mentioned on my other thread here:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Flink-job-fails-to-restore-RocksDB-state-after-
>> upgrading-to-1-2-SNAPSHOT-td9110.html
>>
>> Sorry to jump around but do you know if that's fixed in the latest
>> 1.2-SNAPSHOT? Was it resolved by Flink-4788?
>>
>> Thanks,
>> Josh
>>
>> On Tue, Oct 11, 2016 at 4:13 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi Josh!
>>>
>>> There are two ways to improve the RocksDB / S3 behavior
>>>
>>> (1) Use the FullyAsync mode. It stores the data in one file, not in a
>>> directory. Since directories are the "eventual consistent" part of S3, this
>>> prevents many issues.
>>>
>>> (2) Flink 1.2-SNAPSHOT has some additional fixes that circumvent
>>> additional S3 issues.
>>>
>>> Hope that helps,
>>> Stephan
>>>
>>>
>>> On Tue, Oct 11, 2016 at 4:42 PM, Josh <jof...@gmail.com> wrote:
>>>
>>>> Hi Aljoscha,
>>>>
>>>> Yeah I'm using S3. Is this a known problem when using S3? Do you have
>>>> any ideas on how to restore my job from this state, or prevent it from
>>>> happening again?
>>>>
>>>> Thanks,
>>>> Josh
>>>>
>>>>
>>>> On Tue, Oct 11, 2016 at 1:58 PM, Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> you are using S3 to store the checkpoints, right? It might be that
>>>>> you're running into a problem with S3 "directory listings" not being
>>>>> consistent.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Tue, 11 Oct 2016 at 12:40 Josh <jof...@gmail.com> wrote:
>>>>>
>>>>> Hi all,
>>>>>
>>>>>
>>>>> I just have a couple of questions about checkpointing and restoring state 
>>>>> from RocksDB.
>>>>>
>>>>>
>>>>> 1) In some cases, I find that it is impossible to restore a job from a 
>>>>> checkpoint, due to an exception such as the one pasted below[*]. In this 
>>>>> case, it appears that the last checkpoint is somehow corrupt. Does anyone 
>>>>> know why this might happen?
>>>>>
>>>>>
>>>>> 2) When the above happens, I have no choice but to cancel the job, as it 
>>>>> repeatedly attempts to restart and keeps getting the same exception. 
>>>>> Given that no savepoint was taken recently, is it possible for me to 
>>>>> restore the job from an older checkpoint (e.g. the second-last 
>>>>> checkpoint)?
>>>>>
>>>>>
>>>>> The version of Flink I'm using Flink-1.1-SNAPSHOT, from mid-June.
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Josh
>>>>>
>>>>>
>>>>> [*]The exception when restoring state:
>>>>>
>>>>> java.lang.Exception: Could not restore checkpointed state to operators 
>>>>> and functions
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:480)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:219)
>>>>>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
>>>>>   at ja

Re: Exception when restoring state from RocksDB - how to recover?

2016-10-11 Thread Josh
Hi Stephan,

Thanks, that sounds good!

I'm planning to upgrade to Flink 1.2-SNAPSHOT as soon as possible - I was
delaying upgrading due to the issues with restoring operator state you
mentioned on my other thread here:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-job-fails-to-restore-RocksDB-state-after-upgrading-to-1-2-SNAPSHOT-td9110.html

Sorry to jump around but do you know if that's fixed in the latest
1.2-SNAPSHOT? Was it resolved by Flink-4788?

Thanks,
Josh

On Tue, Oct 11, 2016 at 4:13 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Josh!
>
> There are two ways to improve the RocksDB / S3 behavior
>
> (1) Use the FullyAsync mode. It stores the data in one file, not in a
> directory. Since directories are the "eventual consistent" part of S3, this
> prevents many issues.
>
> (2) Flink 1.2-SNAPSHOT has some additional fixes that circumvent
> additional S3 issues.
>
> Hope that helps,
> Stephan
>
>
> On Tue, Oct 11, 2016 at 4:42 PM, Josh <jof...@gmail.com> wrote:
>
>> Hi Aljoscha,
>>
>> Yeah I'm using S3. Is this a known problem when using S3? Do you have any
>> ideas on how to restore my job from this state, or prevent it from
>> happening again?
>>
>> Thanks,
>> Josh
>>
>>
>> On Tue, Oct 11, 2016 at 1:58 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> you are using S3 to store the checkpoints, right? It might be that
>>> you're running into a problem with S3 "directory listings" not being
>>> consistent.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Tue, 11 Oct 2016 at 12:40 Josh <jof...@gmail.com> wrote:
>>>
>>> Hi all,
>>>
>>>
>>> I just have a couple of questions about checkpointing and restoring state 
>>> from RocksDB.
>>>
>>>
>>> 1) In some cases, I find that it is impossible to restore a job from a 
>>> checkpoint, due to an exception such as the one pasted below[*]. In this 
>>> case, it appears that the last checkpoint is somehow corrupt. Does anyone 
>>> know why this might happen?
>>>
>>>
>>> 2) When the above happens, I have no choice but to cancel the job, as it 
>>> repeatedly attempts to restart and keeps getting the same exception. Given 
>>> that no savepoint was taken recently, is it possible for me to restore the 
>>> job from an older checkpoint (e.g. the second-last checkpoint)?
>>>
>>>
>>> The version of Flink I'm using Flink-1.1-SNAPSHOT, from mid-June.
>>>
>>>
>>> Thanks,
>>>
>>> Josh
>>>
>>>
>>> [*]The exception when restoring state:
>>>
>>> java.lang.Exception: Could not restore checkpointed state to operators and 
>>> functions
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:480)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:219)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.RuntimeException: Error while restoring RocksDB state 
>>> from 
>>> /mnt/yarn/usercache/hadoop/appcache/application_1476181294189_0001/flink-io-09ad1cb1-8dff-4f9a-9f61-6cae27ee6f1d/d236820a793043bd63360df6f175cae9/StreamFlatMap_9_8/dummy_state/dc5beab1-68fb-48b3-b3d6-272497d15a09/chk-1
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:537)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.injectKeyValueStateSnapshots(RocksDBStateBackend.java:489)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.restoreState(AbstractStreamOperator.java:204)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:154)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:472)
>>> ... 3 more
>>> Caused by: org.rocksdb.RocksDBException: NotFound: Backup not found
>>> at org.rocksdb.BackupEngine.restoreDbFromLatestBackup(Native Method)
>>> at 
>>> org.rocksdb.BackupEngine.restoreDbFromLatestBackup(BackupEngine.java:177)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:535)
>>> ... 7 more
>>>
>>>
>>>
>>
>


Re: Exception when restoring state from RocksDB - how to recover?

2016-10-11 Thread Josh
Hi Aljoscha,

Yeah I'm using S3. Is this a known problem when using S3? Do you have any
ideas on how to restore my job from this state, or prevent it from
happening again?

Thanks,
Josh


On Tue, Oct 11, 2016 at 1:58 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> you are using S3 to store the checkpoints, right? It might be that you're
> running into a problem with S3 "directory listings" not being consistent.
>
> Cheers,
> Aljoscha
>
> On Tue, 11 Oct 2016 at 12:40 Josh <jof...@gmail.com> wrote:
>
> Hi all,
>
>
> I just have a couple of questions about checkpointing and restoring state 
> from RocksDB.
>
>
> 1) In some cases, I find that it is impossible to restore a job from a 
> checkpoint, due to an exception such as the one pasted below[*]. In this 
> case, it appears that the last checkpoint is somehow corrupt. Does anyone 
> know why this might happen?
>
>
> 2) When the above happens, I have no choice but to cancel the job, as it 
> repeatedly attempts to restart and keeps getting the same exception. Given 
> that no savepoint was taken recently, is it possible for me to restore the 
> job from an older checkpoint (e.g. the second-last checkpoint)?
>
>
> The version of Flink I'm using Flink-1.1-SNAPSHOT, from mid-June.
>
>
> Thanks,
>
> Josh
>
>
> [*]The exception when restoring state:
>
> java.lang.Exception: Could not restore checkpointed state to operators and 
> functions
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:480)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:219)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error while restoring RocksDB state 
> from 
> /mnt/yarn/usercache/hadoop/appcache/application_1476181294189_0001/flink-io-09ad1cb1-8dff-4f9a-9f61-6cae27ee6f1d/d236820a793043bd63360df6f175cae9/StreamFlatMap_9_8/dummy_state/dc5beab1-68fb-48b3-b3d6-272497d15a09/chk-1
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:537)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.injectKeyValueStateSnapshots(RocksDBStateBackend.java:489)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.restoreState(AbstractStreamOperator.java:204)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:154)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:472)
>   ... 3 more
> Caused by: org.rocksdb.RocksDBException: NotFound: Backup not found
>   at org.rocksdb.BackupEngine.restoreDbFromLatestBackup(Native Method)
>   at 
> org.rocksdb.BackupEngine.restoreDbFromLatestBackup(BackupEngine.java:177)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:535)
>   ... 7 more
>
>
>


Exception when restoring state from RocksDB - how to recover?

2016-10-11 Thread Josh
Hi all,


I just have a couple of questions about checkpointing and restoring
state from RocksDB.


1) In some cases, I find that it is impossible to restore a job from a
checkpoint, due to an exception such as the one pasted below[*]. In
this case, it appears that the last checkpoint is somehow corrupt.
Does anyone know why this might happen?


2) When the above happens, I have no choice but to cancel the job, as
it repeatedly attempts to restart and keeps getting the same
exception. Given that no savepoint was taken recently, is it possible
for me to restore the job from an older checkpoint (e.g. the
second-last checkpoint)?


The version of Flink I'm using Flink-1.1-SNAPSHOT, from mid-June.


Thanks,

Josh


[*]The exception when restoring state:

java.lang.Exception: Could not restore checkpointed state to operators
and functions
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:480)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:219)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error while restoring RocksDB
state from 
/mnt/yarn/usercache/hadoop/appcache/application_1476181294189_0001/flink-io-09ad1cb1-8dff-4f9a-9f61-6cae27ee6f1d/d236820a793043bd63360df6f175cae9/StreamFlatMap_9_8/dummy_state/dc5beab1-68fb-48b3-b3d6-272497d15a09/chk-1
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:537)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.injectKeyValueStateSnapshots(RocksDBStateBackend.java:489)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.restoreState(AbstractStreamOperator.java:204)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:154)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:472)
... 3 more
Caused by: org.rocksdb.RocksDBException: NotFound: Backup not found
at org.rocksdb.BackupEngine.restoreDbFromLatestBackup(Native Method)
at 
org.rocksdb.BackupEngine.restoreDbFromLatestBackup(BackupEngine.java:177)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.restoreFromSemiAsyncSnapshot(RocksDBStateBackend.java:535)
... 7 more


Re: Flink job fails to restore RocksDB state after upgrading to 1.2-SNAPSHOT

2016-10-03 Thread Josh
Hi Stefan,

Sorry for the late reply - I was away last week.
I've just got round to retrying my above scenario (run my job, take a
savepoint, restore my job) using the latest Flink 1.2-SNAPSHOT  -- and am
now seeing a different exception when restoring the state:

10/03/2016 11:29:02 Job execution switched to status FAILING.
java.lang.RuntimeException: Could not deserialize NFA.
at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(
JavaSerializer.java:86)
at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(
JavaSerializer.java:31)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.
getPartitionableState(DefaultOperatorStateBackend.java:107)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.
initializeState(FlinkKafkaConsumerBase.java:323)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(
AbstractUdfStreamOperator.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.
openAllOperators(StreamTask.java:396)
at org.apache.flink.streaming.runtime.tasks.StreamTask.
invoke(StreamTask.java:269)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.
connectors.kafka.internals.KafkaTopicPartition

Any ideas what's going on here? Is the Kafka consumer state management
broken right now in Flink master?

Thanks,
Josh


On Thu, Sep 22, 2016 at 9:28 AM, Stefan Richter <s.rich...@data-artisans.com
> wrote:

> Hi,
>
> to me, this looks like you are running into the problem described under
> [FLINK-4603] : KeyedStateBackend cannot restore user code classes. I have
> opened a pull request (PR 2533) this morning that should fix this behavior
> as soon as it is merged into master.
>
> Best,
> Stefan
>
> Am 21.09.2016 um 23:49 schrieb Josh <jof...@gmail.com>:
>
> Hi Stephan,
>
> Thanks for the reply. I should have been a bit clearer but actually I was
> not trying to restore a 1.0 savepoint with 1.2. I ran my job on 1.2 from
> scratch (starting with no state), then took a savepoint and tried to
> restart it from the savepoint - and that's when I get this exception. If I
> do this with the same job using an older version of Flink (1.1-SNAPSHOT
> taken in June), the savepoint and restore works fine.
>
> I wanted to use 1.2-SNAPSHOT because it has some changes I'd like to use
> (improvements to Kinesis connector + the bucketing sink). Anyway for now I
> have things working with an older version of Flink - but it would be good
> to know what's changed recently that's causing the restore to break and if
> my job is not going to be compatible with future releases of Flink.
>
> Best,
> Josh
>
> On Wed, Sep 21, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi Josh!
>>
>> The state code on 1.2-SNAPSHOT is undergoing pretty heavy changes right
>> now, in order to add the elasticity feature (change parallelism or running
>> jobs and still maintaining exactly once guarantees).
>> At this point, no 1.0 savepoints can be restored in 1.2-SNAPSHOT. We will
>> try and add compatibility towards 1.1 savepoints before the release of
>> version 1.2.
>>
>> I think the exception is probably caused by the fact that old savepoint
>> stored some serialized user code (the new one is not expected to) which
>> cannot be loaded.
>>
>> Adding Aljoscha and Stefan to this, to see if they can add anything.
>> In any case, this should have a much better error message.
>>
>> I hope you understand that the 1.2-SNAPSHOT version is in some parts WIP,
>> so not really recommended for general use.
>>
>> Does version 1.1 not work for you?
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Sep 21, 2016 at 7:44 PM, Josh <jof...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a Flink job which uses the RocksDBStateBackend, which has been
>>> running on a Flink 1.0 cluster.
>>>
>>> The job is written in Scala, and I previously made some changes to the
>>> job to ensure that state could be restored. For example, whenever I call
>>> `map` or `flatMap` on a DataStream, I pass a named class: `.flatMap(new
>>> MyCustomFlatMapper())` instead of an anonymous function.
>>>
>>> I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able
>>> to restore state. I'm seeing exceptions which look like this when trying to
>>> restore from a savepoint:
>>>
>>> java.lang.RuntimeException: Could not initialize keyed state backend.
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor.open(Abstr

Re: Flink job fails to restore RocksDB state after upgrading to 1.2-SNAPSHOT

2016-09-21 Thread Josh
Hi Stephan,

Thanks for the reply. I should have been a bit clearer but actually I was
not trying to restore a 1.0 savepoint with 1.2. I ran my job on 1.2 from
scratch (starting with no state), then took a savepoint and tried to
restart it from the savepoint - and that's when I get this exception. If I
do this with the same job using an older version of Flink (1.1-SNAPSHOT
taken in June), the savepoint and restore works fine.

I wanted to use 1.2-SNAPSHOT because it has some changes I'd like to use
(improvements to Kinesis connector + the bucketing sink). Anyway for now I
have things working with an older version of Flink - but it would be good
to know what's changed recently that's causing the restore to break and if
my job is not going to be compatible with future releases of Flink.

Best,
Josh

On Wed, Sep 21, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Josh!
>
> The state code on 1.2-SNAPSHOT is undergoing pretty heavy changes right
> now, in order to add the elasticity feature (change parallelism or running
> jobs and still maintaining exactly once guarantees).
> At this point, no 1.0 savepoints can be restored in 1.2-SNAPSHOT. We will
> try and add compatibility towards 1.1 savepoints before the release of
> version 1.2.
>
> I think the exception is probably caused by the fact that old savepoint
> stored some serialized user code (the new one is not expected to) which
> cannot be loaded.
>
> Adding Aljoscha and Stefan to this, to see if they can add anything.
> In any case, this should have a much better error message.
>
> I hope you understand that the 1.2-SNAPSHOT version is in some parts WIP,
> so not really recommended for general use.
>
> Does version 1.1 not work for you?
>
> Greetings,
> Stephan
>
>
> On Wed, Sep 21, 2016 at 7:44 PM, Josh <jof...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a Flink job which uses the RocksDBStateBackend, which has been
>> running on a Flink 1.0 cluster.
>>
>> The job is written in Scala, and I previously made some changes to the
>> job to ensure that state could be restored. For example, whenever I call
>> `map` or `flatMap` on a DataStream, I pass a named class: `.flatMap(new
>> MyCustomFlatMapper())` instead of an anonymous function.
>>
>> I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able
>> to restore state. I'm seeing exceptions which look like this when trying to
>> restore from a savepoint:
>>
>> java.lang.RuntimeException: Could not initialize keyed state backend.
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor.open(AbstractStreamOperator.java:148)
>> Caused by: java.lang.ClassNotFoundException:
>> com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2
>> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>> ckend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDB
>> KeyedStateBackend.java:653)
>>
>> I'm not passing any anonymous functions to `map` or `flatMap` on Flink
>> DataStreams, so it looks like this exception is caused just from using
>> Scala functions like `filter`, `map`, `flatMap` on standard Scala
>> collections, within my class `MyCustomFlatMapper`.
>>
>> Are there any changes to the way Flink state is restored or to
>> RocksDBStateBackend, in the last 2-3 months, which could cause this to
>> happen?
>>
>> If so, any advice on fixing it?
>>
>> I'm hoping there's a better solution to this than rewriting the Flink job
>> in Java.
>>
>> Thanks,
>>
>> Josh
>>
>
>


Flink job fails to restore RocksDB state after upgrading to 1.2-SNAPSHOT

2016-09-21 Thread Josh
Hi,

I have a Flink job which uses the RocksDBStateBackend, which has been
running on a Flink 1.0 cluster.

The job is written in Scala, and I previously made some changes to the job
to ensure that state could be restored. For example, whenever I call `map`
or `flatMap` on a DataStream, I pass a named class: `.flatMap(new
MyCustomFlatMapper())` instead of an anonymous function.

I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able to
restore state. I'm seeing exceptions which look like this when trying to
restore from a savepoint:

java.lang.RuntimeException: Could not initialize keyed state backend.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.open(AbstractStreamOperator.java:148)
Caused by: java.lang.ClassNotFoundException:
com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:653)

I'm not passing any anonymous functions to `map` or `flatMap` on Flink
DataStreams, so it looks like this exception is caused just from using
Scala functions like `filter`, `map`, `flatMap` on standard Scala
collections, within my class `MyCustomFlatMapper`.

Are there any changes to the way Flink state is restored or to
RocksDBStateBackend, in the last 2-3 months, which could cause this to
happen?

If so, any advice on fixing it?

I'm hoping there's a better solution to this than rewriting the Flink job
in Java.

Thanks,

Josh


Re: Kinesis connector - Iterator expired exception

2016-08-26 Thread Josh
Hi Gordon,

My job only went down for around 2-3 hours, and I'm using the default
Kinesis retention of 24 hours. When I restored the job, it got this
exception after around 15 minutes (and then restarted again, and got the
same exception 15 minutes later etc) - but actually I found that after this
happened around 5 times the job fully caught up to the head of the stream
and started running smoothly again.

Thanks for looking into this!

Best,
Josh


On Fri, Aug 26, 2016 at 1:57 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Josh,
>
> Thank you for reporting this, I’m looking into it. There was some major
> changes to the Kinesis connector after mid June, but the changes don’t seem
> to be related to the iterator timeout, so it may be a bug that had always
> been there.
>
> I’m not sure yet if it may be related, but may I ask how long was your
> Flink job down before restarting it again from the existing state? Was it
> longer than the retention duration of the Kinesis records (default is 24
> hours)?
>
> Regards,
> Gordon
>
>
> On August 26, 2016 at 7:20:59 PM, Josh (jof...@gmail.com) wrote:
>
> Hi all,
>
> I guess this is probably a question for Gordon - I've been using the
> Flink-Kinesis connector for a while now and seen this exception a couple of
> times:
>
> com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator 
> expired. The iterator was created at time Fri Aug 26 10:47:47 UTC 2016 while 
> right now it is Fri Aug 26 11:05:40 UTC 2016 which is further in the future 
> than the tolerated delay of 30 milliseconds. (Service: AmazonKinesis; 
> Status Code: 400; Error Code: ExpiredIteratorException; Request ID: 
> d3db1d90-df97-912b-83e1-3954e766bbe0)
>
>
> It happens when my Flink job goes down for a couple of hours, then I restart 
> from the existing state and it needs to catch up on all the data that has 
> been put in Kinesis stream in the hours where the job was down. The job then 
> runs for ~15 mins and fails with this exception (and this happens repeatedly 
> - meaning I can't restore the job from the existing state).
>
>
> Any ideas what's causing this? It's possible that it's been fixed in recent 
> commits, as the version of the Kinesis connector I'm using is behind master - 
> I'm not sure exactly what commit I'm using (doh!) but it was built around mid 
> June.
>
>
> Thanks,
>
> Josh
>
>


Re: Reprocessing data in Flink / rebuilding Flink state

2016-08-01 Thread Josh
Cool, thanks - I've tried out the approach where we replay data from the
Kafka compacted log, then take a savepoint and switch to the live stream.

It works but I did have to add in a dummy operator for every operator that
was removed. Without doing this, I got an exception:
java.lang.IllegalStateException: Failed to rollback to savepoint Checkpoint
1 @ 1470059433553 for cb321c233dfd28f73c565030481657cd. Cannot map old
state for task 02ea922553bc7522bdea373f52a702d6 to the new program. This
indicates that the program has been changed in a non-compatible way  after
the savepoint.

I had a Kafka source and a flat mapper chained together when replaying, so
to make it work I had to add two dummy operators and assign the same UID I
used when replaying, like this:
stream.map(x =>
x).uid("kafka-replay").name("dummy-1").startNewChain().map(x =>
x).name("dummy-2")

I guess it would be nice if Flink could recover from removed
tasks/operators without needing to add dummy operators like this.

Josh

On Fri, Jul 29, 2016 at 5:46 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> I have to try this to verify but I think the approach works if you give
> the two sources different UIDs. The reason is that Flink will ignore state
> for which it doesn't have an operator to assign it to. Therefore, the state
> of the "historical Kafka source" should be silently discarded.
>
> Cheers,
> Aljoscha
>
> On Fri, 29 Jul 2016 at 18:12 Josh <jof...@gmail.com> wrote:
>
>> @Aljoscha - The N-input operator way sounds very nice, for now I think
>> I'll try and get something quick running the hacky way, then if we decide
>> to make this a permanent solution maybe I can work on the proper solution.
>> I was wondering about your suggestion for "warming up" the state and then
>> taking a savepoint and switching sources - since the Kafka sources are
>> stateful and are part of Flink's internal state, wouldn't this break when
>> trying to restore the job with a different source? Would I need to assign
>> the replay source a UID, and when switching from replay to live, remove the
>> replay source and replace it with an dummy operator with the same UID?
>>
>> @Jason - I see what you mean now, with the historical and live Flink
>> jobs. That's an interesting approach - I guess it's solving a slightly
>> different problem to my 'rebuilding Flink state upon starting job' - as
>> you're rebuilding state as part of the main job when it comes across events
>> that require historical data. Actually I think we'll need to do something
>> very similar in the future but right now I can probably get away with
>> something simpler!
>>
>> Thanks for the replies!
>>
>> Josh
>>
>> On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch <jb.bc@gmail.com>
>> wrote:
>>
>>> Aljoscha's approach is probably better, but to answer your questions...
>>>
>>> >How do you send a request from one Flink job to another?
>>> All of our different flink jobs communicate over kafka.  So the main
>>> flink job would be listening to both a "live" kafka source, and a
>>> "historical" kafka source.  The historical flink job would listen to a
>>> "request" kafka source.  When the main job gets an event that it does not
>>> have state for it writes to the "request" topic.  The historical job would
>>> read the request, grab the relevant old events from GCS, and write them to
>>> the "historical" kafka topic.  The "historical" source and the "live"
>>> source are merged and proceed through the main flink job as one stream.
>>>
>>> >How do you handle the switchover between the live stream and the
>>> historical stream? Do you somehow block the live stream source and detect
>>> when the historical data source is no longer emitting new elements?
>>> When the main job sends a request to the historical job, the main job
>>> starts storing any events that are come in for that key.  As the historical
>>> events come in they are processed immediately.  The historical flink job
>>> flags the last event it sends.  When the main flink job sees the flagged
>>> event it knows it is caught up to where it was when it sent the request.
>>> You can then process the events that the main job stored, and when that is
>>> done you are caught up to the live stream, and can stop storing events for
>>> that key and just process them as normal.
>>>
>>> Keep in mind that this is the dangerous part that I was talking about,
>>> where memory in t

Re: Reprocessing data in Flink / rebuilding Flink state

2016-07-29 Thread Josh
@Aljoscha - The N-input operator way sounds very nice, for now I think I'll
try and get something quick running the hacky way, then if we decide to
make this a permanent solution maybe I can work on the proper solution. I
was wondering about your suggestion for "warming up" the state and then
taking a savepoint and switching sources - since the Kafka sources are
stateful and are part of Flink's internal state, wouldn't this break when
trying to restore the job with a different source? Would I need to assign
the replay source a UID, and when switching from replay to live, remove the
replay source and replace it with an dummy operator with the same UID?

@Jason - I see what you mean now, with the historical and live Flink jobs.
That's an interesting approach - I guess it's solving a slightly different
problem to my 'rebuilding Flink state upon starting job' - as you're
rebuilding state as part of the main job when it comes across events that
require historical data. Actually I think we'll need to do something very
similar in the future but right now I can probably get away with something
simpler!

Thanks for the replies!

Josh

On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch <jb.bc@gmail.com> wrote:

> Aljoscha's approach is probably better, but to answer your questions...
>
> >How do you send a request from one Flink job to another?
> All of our different flink jobs communicate over kafka.  So the main flink
> job would be listening to both a "live" kafka source, and a "historical"
> kafka source.  The historical flink job would listen to a "request" kafka
> source.  When the main job gets an event that it does not have state for it
> writes to the "request" topic.  The historical job would read the request,
> grab the relevant old events from GCS, and write them to the "historical"
> kafka topic.  The "historical" source and the "live" source are merged and
> proceed through the main flink job as one stream.
>
> >How do you handle the switchover between the live stream and the
> historical stream? Do you somehow block the live stream source and detect
> when the historical data source is no longer emitting new elements?
> When the main job sends a request to the historical job, the main job
> starts storing any events that are come in for that key.  As the historical
> events come in they are processed immediately.  The historical flink job
> flags the last event it sends.  When the main flink job sees the flagged
> event it knows it is caught up to where it was when it sent the request.
> You can then process the events that the main job stored, and when that is
> done you are caught up to the live stream, and can stop storing events for
> that key and just process them as normal.
>
> Keep in mind that this is the dangerous part that I was talking about,
> where memory in the main job would continue to build until the "historical"
> events are all processed.
>
> >In my case I would want the Flink state to always contain the latest
> state of every item (except when the job first starts and there's a period
> of time where it's rebuilding its state from the Kafka log).
> You could absolutely do it by reading from the beginning of a kafka
> topic.  The reason we do it with GCS is it is really cheap storage, and we
> are not planning on storing forever on the kafka topic.
>
> >Since I would have everything needed to rebuild the state persisted in a
> Kafka topic, I don't think I would need a second Flink job for this?
> The reason for the second flink job in our case is that we didn't really
> want to block the flink task slot while a single key gets caught up.  We
> have a much larger key domain then we have number of task slots, so there
> would be multiple keys on single task slot.  If you go with the single job
> approach (which might be the right approach for you guys) any other keys on
> that task slot will be blocked until the one key is getting it's state
> built up.
>
> Hope that helps,
>
> On Fri, Jul 29, 2016 at 5:27 AM, Josh <jof...@gmail.com> wrote:
>
>> Hi Jason,
>>
>> Thanks for the reply - I didn't quite understand all of it though!
>>
>> > it sends a request to the historical flink job for the old data
>> How do you send a request from one Flink job to another?
>>
>> > It continues storing the live events until all the events form the
>> historical job have been processed, then it processes the stored events,
>> and finally starts processing the live stream again.
>> How do you handle the switchover between the live stream and the
>> historical stream? Do you somehow block the live stream source and detect
>> when the historical data source is n

Re: Reprocessing data in Flink / rebuilding Flink state

2016-07-29 Thread Josh
Hi Jason,

Thanks for the reply - I didn't quite understand all of it though!

> it sends a request to the historical flink job for the old data
How do you send a request from one Flink job to another?

> It continues storing the live events until all the events form the
historical job have been processed, then it processes the stored events,
and finally starts processing the live stream again.
How do you handle the switchover between the live stream and the historical
stream? Do you somehow block the live stream source and detect when the
historical data source is no longer emitting new elements?

> So in you case it looks like what you could do is send a request to the
"historical" job whenever you get a item that you don't yet have the
current state of.
In my case I would want the Flink state to always contain the latest state
of every item (except when the job first starts and there's a period of
time where it's rebuilding its state from the Kafka log). Since I would
have everything needed to rebuild the state persisted in a Kafka topic, I
don't think I would need a second Flink job for this?

Thanks,
Josh




On Thu, Jul 28, 2016 at 6:57 PM, Jason Brelloch <jb.bc@gmail.com> wrote:

> Hey Josh,
>
> The way we replay historical data is we have a second Flink job that
> listens to the same live stream, and stores every single event in Google
> Cloud Storage.
>
> When the main Flink job that is processing the live stream gets a request
> for a specific data set that it has not been processing yet, it sends a
> request to the historical flink job for the old data.  The live job then
> starts storing relevant events from the live stream in state.  It continues
> storing the live events until all the events form the historical job have
> been processed, then it processes the stored events, and finally starts
> processing the live stream again.
>
> As long as it's properly keyed (we key on the specific data set) then it
> doesn't block anything, keeps everything ordered, and eventually catches
> up.  It also allows us to completely blow away state and rebuild it from
> scratch.
>
> So in you case it looks like what you could do is send a request to the
> "historical" job whenever you get a item that you don't yet have the
> current state of.
>
> The potential problems you may have are that it may not be possible to
> store every single historical event, and that you need to make sure there
> is enough memory to handle the ever increasing state size while the
> historical events are being replayed (and make sure to clear the state when
> it is done).
>
> It's a little complicated, and pretty expensive, but it works.  Let me
> know if something doesn't make sense.
>
>
> On Thu, Jul 28, 2016 at 1:14 PM, Josh <jof...@gmail.com> wrote:
>
>> Hi all,
>>
>> I was wondering what approaches people usually take with reprocessing
>> data with Flink - specifically the case where you want to upgrade a Flink
>> job, and make it reprocess historical data before continuing to process a
>> live stream.
>>
>> I'm wondering if we can do something similar to the 'simple rewind' or
>> 'parallel rewind' which Samza uses to solve this problem, discussed here:
>> https://samza.apache.org/learn/documentation/0.10/jobs/reprocessing.html
>>
>> Having used Flink over the past couple of months, the main issue I've had
>> involves Flink's internal state - from my experience it seems it is easy to
>> break the state when upgrading a job, or when changing the parallelism of
>> operators, plus there's no easy way to view/access an internal key-value
>> state from outside Flink.
>>
>> For an example of what I mean, consider a Flink job which consumes a
>> stream of 'updates' to items, and maintains a key-value store of items
>> within Flink's internal state (e.g. in RocksDB). The job also writes the
>> updated items to a Kafka topic:
>>
>> http://oi64.tinypic.com/34q5opf.jpg
>>
>> My worry with this is that the state in RocksDB could be lost or become
>> incompatible with an updated version of the job. If this happens, we need
>> to be able to rebuild Flink's internal key-value store in RocksDB. So I'd
>> like to be able to do something like this (which I believe is the Samza
>> solution):
>>
>> http://oi67.tinypic.com/219ri95.jpg
>>
>> Has anyone done something like this already with Flink? If so are there
>> any examples of how to do this replay & switchover (rebuild state by
>> consuming from a historical log, then switch over to processing the live
>> stream)?
>>
>> Thanks for any insights,
>> Josh
>>
>>
>
>
> --
> *Jason Brelloch* | Product Developer
> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
> <http://www.bettercloud.com/>
> Subscribe to the BetterCloud Monitor
> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email_medium=email_signature_campaign=monitor_launch>
>  -
> Get IT delivered to your inbox
>


Reprocessing data in Flink / rebuilding Flink state

2016-07-28 Thread Josh
Hi all,

I was wondering what approaches people usually take with reprocessing data
with Flink - specifically the case where you want to upgrade a Flink job,
and make it reprocess historical data before continuing to process a live
stream.

I'm wondering if we can do something similar to the 'simple rewind' or
'parallel rewind' which Samza uses to solve this problem, discussed here:
https://samza.apache.org/learn/documentation/0.10/jobs/reprocessing.html

Having used Flink over the past couple of months, the main issue I've had
involves Flink's internal state - from my experience it seems it is easy to
break the state when upgrading a job, or when changing the parallelism of
operators, plus there's no easy way to view/access an internal key-value
state from outside Flink.

For an example of what I mean, consider a Flink job which consumes a stream
of 'updates' to items, and maintains a key-value store of items within
Flink's internal state (e.g. in RocksDB). The job also writes the updated
items to a Kafka topic:

http://oi64.tinypic.com/34q5opf.jpg

My worry with this is that the state in RocksDB could be lost or become
incompatible with an updated version of the job. If this happens, we need
to be able to rebuild Flink's internal key-value store in RocksDB. So I'd
like to be able to do something like this (which I believe is the Samza
solution):

http://oi67.tinypic.com/219ri95.jpg

Has anyone done something like this already with Flink? If so are there any
examples of how to do this replay & switchover (rebuild state by consuming
from a historical log, then switch over to processing the live stream)?

Thanks for any insights,
Josh


Re: State in external db (dynamodb)

2016-07-25 Thread Josh
Nevermind I think I understand the point about partial writes. Is it that
if we write out our buffer of updates to the external store and the batch
update is not atomic, then the external store is in an inconsistent state?
(with some state from the attempted checkpoint, and some from the previous
checkpoint).

I guess that means this solution will only work in the case where you only
write to the external store, and writes are idempotent - and it won't work
for my use case where I need to read from the external store, apply an
update and then write.

I wonder if it can be solved by storing state in the external store with a
tuple: (state, previous_state, checkpoint_id). Then when reading from the
store, if checkpoint_id is in the future, read the previous_state,
otherwise take the current_state.


On Mon, Jul 25, 2016 at 3:20 PM, Josh <jof...@gmail.com> wrote:

> Hi Chen,
>
> Can you explain what you mean a bit more? I'm not sure I understand the
> problem.
>
> Does anyone know if the tooling discussed here has been merged into Flink
> already? Or if there's an example of what this custom sink would look like?
> I guess the sink would buffer updates in-memory between checkpoints. Then
> it would implement the Checkpointed interface and write to the external
> store in snapshotState(...)?
>
> Thanks,
> Josh
>
> On Sun, Jul 24, 2016 at 6:00 PM, Chen Qin <qinnc...@gmail.com> wrote:
>
>>
>>
>> On Jul 22, 2016, at 2:54 AM, Josh <jof...@gmail.com> wrote:
>>
>> Hi all,
>>
>> >(1)  Only write to the DB upon a checkpoint, at which point it is known
>> that no replay of that data will occur any more. Values from partially
>> successful writes will be overwritten >with correct value. I assume that is
>> what you thought of when referring to the State Backend, because in some 
>> sense,
>> that is what that state backend would do.
>>
>>
>> I feel the problem is about how to commit all snapshots as a transaction.
>> Partial writes pose cleanup challenges when job restore.
>> A easy hack would be treat Rocksdb as cache and keep states updates
>> there. Aka aemanifest. do cleanup check before actual restore.
>>
>>
>>
>> >I think it is simpler to realize that in a custom sink, than developing
>>  a new state backend.  Another Flink committer (Chesnay) has developed
>> some nice tooling for that, to >be merged into Flink soon.
>>
>> I am planning to implement something like this:
>>
>> Say I have a topology which looks like this: [source => operator =>
>> sink], I would like it to work like this:
>> 1. Upon receiving an element, the operator retrieves some state from an
>> external key-value store (would like to put an in-memory cache on top of
>> this with a TTL)
>> 2. The operator emits a new state (and updates its in-memory cache with
>> the new state)
>> 3. The sink batches up all the new states and upon checkpoint flushes
>> them to the external store
>>
>> Could anyone point me at the work that's already been done on this? Has
>> it already been merged into Flink?
>>
>> Thanks,
>> Josh
>>
>> On Thu, Apr 7, 2016 at 12:53 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> regarding windows and incremental aggregation. This is already happening
>>> in Flink as of now. When you give a ReduceFunction on a window, which "sum"
>>> internally does, the result for a window is incrementally updated whenever
>>> a new element comes in. This incremental aggregation only happens when you
>>> specify a ReduceFunction or a FoldFunction, not for the general case of a
>>> WindowFunction, where all elements in the window are required.
>>>
>>> You are right about incremental snapshots. We mainly want to introduce
>>> them to reduce latency incurred by snapshotting. Right now, processing
>>> stalls when a checkpoint happens.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 7 Apr 2016 at 13:12 Shannon Carey <sca...@expedia.com> wrote:
>>>
>>>> Thanks very kindly for your response, Stephan!
>>>>
>>>> We will definitely use a custom sink for persistence of idempotent
>>>> mutations whenever possible. Exposing state as read-only to external
>>>> systems is a complication we will try to avoid. Also, we will definitely
>>>> only write to the DB upon checkpoint, and the write will be synchronous and
>>>> transactional (no possibility of partial success/failure).
>>>>
>>>> However, we do want Flink s

Re: State in external db (dynamodb)

2016-07-25 Thread Josh
Hi Chen,

Can you explain what you mean a bit more? I'm not sure I understand the
problem.

Does anyone know if the tooling discussed here has been merged into Flink
already? Or if there's an example of what this custom sink would look like?
I guess the sink would buffer updates in-memory between checkpoints. Then
it would implement the Checkpointed interface and write to the external
store in snapshotState(...)?

Thanks,
Josh

On Sun, Jul 24, 2016 at 6:00 PM, Chen Qin <qinnc...@gmail.com> wrote:

>
>
> On Jul 22, 2016, at 2:54 AM, Josh <jof...@gmail.com> wrote:
>
> Hi all,
>
> >(1)  Only write to the DB upon a checkpoint, at which point it is known
> that no replay of that data will occur any more. Values from partially
> successful writes will be overwritten >with correct value. I assume that is
> what you thought of when referring to the State Backend, because in some 
> sense,
> that is what that state backend would do.
>
>
> I feel the problem is about how to commit all snapshots as a transaction.
> Partial writes pose cleanup challenges when job restore.
> A easy hack would be treat Rocksdb as cache and keep states updates there.
> Aka aemanifest. do cleanup check before actual restore.
>
>
>
> >I think it is simpler to realize that in a custom sink, than developing
>  a new state backend.  Another Flink committer (Chesnay) has developed
> some nice tooling for that, to >be merged into Flink soon.
>
> I am planning to implement something like this:
>
> Say I have a topology which looks like this: [source => operator => sink],
> I would like it to work like this:
> 1. Upon receiving an element, the operator retrieves some state from an
> external key-value store (would like to put an in-memory cache on top of
> this with a TTL)
> 2. The operator emits a new state (and updates its in-memory cache with
> the new state)
> 3. The sink batches up all the new states and upon checkpoint flushes them
> to the external store
>
> Could anyone point me at the work that's already been done on this? Has it
> already been merged into Flink?
>
> Thanks,
> Josh
>
> On Thu, Apr 7, 2016 at 12:53 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>> regarding windows and incremental aggregation. This is already happening
>> in Flink as of now. When you give a ReduceFunction on a window, which "sum"
>> internally does, the result for a window is incrementally updated whenever
>> a new element comes in. This incremental aggregation only happens when you
>> specify a ReduceFunction or a FoldFunction, not for the general case of a
>> WindowFunction, where all elements in the window are required.
>>
>> You are right about incremental snapshots. We mainly want to introduce
>> them to reduce latency incurred by snapshotting. Right now, processing
>> stalls when a checkpoint happens.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 7 Apr 2016 at 13:12 Shannon Carey <sca...@expedia.com> wrote:
>>
>>> Thanks very kindly for your response, Stephan!
>>>
>>> We will definitely use a custom sink for persistence of idempotent
>>> mutations whenever possible. Exposing state as read-only to external
>>> systems is a complication we will try to avoid. Also, we will definitely
>>> only write to the DB upon checkpoint, and the write will be synchronous and
>>> transactional (no possibility of partial success/failure).
>>>
>>> However, we do want Flink state to be durable, we want it to be in
>>> memory when possible, and we want to avoid running out of memory due to the
>>> size of the state. For example, if you have a wide window that hasn't
>>> gotten an event for a long time, we want to evict that window state from
>>> memory. We're now thinking of using Redis (via AWS Elasticache) which also
>>> conveniently has TTL, instead of DynamoDB.
>>>
>>> I just wanted to check whether eviction of (inactive/quiet) state from
>>> memory is something that I should consider implementing, or whether Flink
>>> already had some built-in way of doing it.
>>>
>>> Along the same lines, I am also wondering whether Flink already has
>>> means of compacting the state of a window by applying an aggregation
>>> function to the elements so-far (eg. every time window is triggered)? For
>>> example, if you are only executing a sum on the contents of the window, the
>>> window state doesn't need to store all the individual items in the window,
>>> it only needs to store the sum. Aggregations other than "sum" might have
>>> th

Re: State in external db (dynamodb)

2016-07-22 Thread Josh
Hi all,

>(1)  Only write to the DB upon a checkpoint, at which point it is known
that no replay of that data will occur any more. Values from partially
successful writes will be overwritten >with correct value. I assume that is
what you thought of when referring to the State Backend, because in some sense,
that is what that state backend would do.

>I think it is simpler to realize that in a custom sink, than developing a
new state backend.  Another Flink committer (Chesnay) has developed some
nice tooling for that, to >be merged into Flink soon.

I am planning to implement something like this:

Say I have a topology which looks like this: [source => operator => sink],
I would like it to work like this:
1. Upon receiving an element, the operator retrieves some state from an
external key-value store (would like to put an in-memory cache on top of
this with a TTL)
2. The operator emits a new state (and updates its in-memory cache with the
new state)
3. The sink batches up all the new states and upon checkpoint flushes them
to the external store

Could anyone point me at the work that's already been done on this? Has it
already been merged into Flink?

Thanks,
Josh

On Thu, Apr 7, 2016 at 12:53 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> regarding windows and incremental aggregation. This is already happening
> in Flink as of now. When you give a ReduceFunction on a window, which "sum"
> internally does, the result for a window is incrementally updated whenever
> a new element comes in. This incremental aggregation only happens when you
> specify a ReduceFunction or a FoldFunction, not for the general case of a
> WindowFunction, where all elements in the window are required.
>
> You are right about incremental snapshots. We mainly want to introduce
> them to reduce latency incurred by snapshotting. Right now, processing
> stalls when a checkpoint happens.
>
> Cheers,
> Aljoscha
>
> On Thu, 7 Apr 2016 at 13:12 Shannon Carey <sca...@expedia.com> wrote:
>
>> Thanks very kindly for your response, Stephan!
>>
>> We will definitely use a custom sink for persistence of idempotent
>> mutations whenever possible. Exposing state as read-only to external
>> systems is a complication we will try to avoid. Also, we will definitely
>> only write to the DB upon checkpoint, and the write will be synchronous and
>> transactional (no possibility of partial success/failure).
>>
>> However, we do want Flink state to be durable, we want it to be in memory
>> when possible, and we want to avoid running out of memory due to the size
>> of the state. For example, if you have a wide window that hasn't gotten an
>> event for a long time, we want to evict that window state from memory.
>> We're now thinking of using Redis (via AWS Elasticache) which also
>> conveniently has TTL, instead of DynamoDB.
>>
>> I just wanted to check whether eviction of (inactive/quiet) state from
>> memory is something that I should consider implementing, or whether Flink
>> already had some built-in way of doing it.
>>
>> Along the same lines, I am also wondering whether Flink already has means
>> of compacting the state of a window by applying an aggregation function to
>> the elements so-far (eg. every time window is triggered)? For example, if
>> you are only executing a sum on the contents of the window, the window
>> state doesn't need to store all the individual items in the window, it only
>> needs to store the sum. Aggregations other than "sum" might have that
>> characteristic too. I don't know if Flink is already that intelligent or
>> whether I should figure out how to aggregate window contents myself when
>> possible with something like a window fold? Another poster (Aljoscha) was
>> talking about adding incremental snapshots, but it sounds like that would
>> only improve the write throughput not the memory usage.
>>
>> Thanks again!
>> Shannon Carey
>>
>>
>> From: Stephan Ewen <se...@apache.org>
>> Date: Wednesday, April 6, 2016 at 10:37 PM
>> To: <user@flink.apache.org>
>> Subject: Re: State in external db (dynamodb)
>>
>> Hi Shannon!
>>
>> Welcome to the Flink community!
>>
>> You are right, sinks need in general to be idempotent if you want
>> "exactly-once" semantics, because there can be a replay of elements that
>> were already written.
>>
>> However, what you describe later, overwriting of a key with a new value
>> (or the same value again) is pretty much sufficient. That means that when a
>> duplicate write happens during replay, the value for the key is simply
>> overwritten with the sam

Re: Dynamic partitioning for stream output

2016-07-11 Thread Josh
Hi guys,

I've been working on this feature as I needed something similar. Have a
look at my issue here https://issues.apache.org/jira/browse/FLINK-4190 and
changes here https://github.com/joshfg/flink/tree/flink-4190
The changes follow Kostas's suggestion in this thread.

Thanks,
Josh


On Thu, May 26, 2016 at 3:27 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> while I think it would be possible to do it by creating a "meta sink" that
> contains several RollingSinks I think the approach of integrating it into
> the current RollinkSink is better.
>
> I think it's mostly a question of style and architectural purity but also
> of resource consumption and maintainability. If there are several
> RollingSinks in one other sink instead of just one RollingSink then we
> duplicate all of the internal structures of RollingSink. For
> maintainability, we would have to be very careful when interacting with the
> nested sources to ensure that they really can behave as proper sources.
> (watermarks, checkpoints, closing/disposing come to mind now but this might
> grow in the future.)
>
> Cheers,
> Aljoscha
>
> On Wed, 25 May 2016 at 11:35 Kostas Kloudas <k.klou...@data-artisans.com>
> wrote:
>
>> Hi Juho,
>>
>> To be more aligned with the semantics in Flink, I would suggest a
>> solution with a single modified RollingSink that caches
>> multiple buckets (from the Bucketer) and flushes (some of) them to disk
>> whenever certain time or space criteria are met.
>>
>> I would say that it is worth modifying the rolling sink so that it can
>> such use cases (different flushing policies).
>> Aljoscha, as the writer of the original Rolling Sink, what do you think?
>>
>> Kostas
>>
>> On May 25, 2016, at 8:21 AM, Juho Autio <juho.au...@rovio.com> wrote:
>>
>> Thanks, indeed the desired behavior is to flush if bucket size exceeds a
>> limit but also if the bucket has been open long enough. Contrary to the
>> current RollingSink we don't want to flush all the time if the bucket
>> changes but have multiple buckets "open" as needed.
>>
>> In our case the date to use for partitioning comes from an event field,
>> but needs to be formatted, too. The partitioning feature should be generic,
>> allowing to pass a function that formats the bucket path for each tuple.
>>
>> Does it seem like a valid plan to create a sink that internally caches
>> multiple rolling sinks?
>>
>> On Tue, May 24, 2016 at 3:50 PM, Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi Juho,
>>>
>>> If I understand correctly, you want a custom RollingSink that caches some
>>> buckets, one for each topic/date key, and whenever the volume of data
>>> buffered
>>> exceeds a limit, then it flushes to disk, right?
>>>
>>> If this is the case, then you are right that this is not currently
>>> supported
>>> out-of-the-box, but it would be interesting to update the RollingSink
>>> to support such scenarios.
>>>
>>> One clarification: when you say that you want partition by date,
>>> you mean the date of the event, right? Not the processing time.
>>>
>>> Kostas
>>>
>>> > On May 24, 2016, at 1:22 PM, Juho Autio <juho.au...@rovio.com> wrote:
>>> >
>>> > Could you suggest how to dynamically partition data with Flink
>>> streaming?
>>> >
>>> > We've looked at RollingSink, that takes care of writing batches to S3,
>>> but
>>> > it doesn't allow defining the partition dynamically based on the tuple
>>> > fields.
>>> >
>>> > Our data is coming from Kafka and essentially has the kafka topic and a
>>> > date, among other fields.
>>> >
>>> > We'd like to consume all topics (also automatically subscribe to new
>>> ones)
>>> > and write to S3 partitioned by topic and date, for example:
>>> >
>>> > s3://bucket/path/topic=topic2/date=20160522/
>>> > s3://bucket/path/topic=topic2/date=20160523/
>>> > s3://bucket/path/topic=topic1/date=20160522/
>>> > s3://bucket/path/topic=topic1/date=20160523/
>>> >
>>> > There are two problems with RollingSink as it is now:
>>> > - Only allows partitioning by date
>>> > - Flushes the batch every time the path changes. In our case the
>>> stream can
>>> > for example have a random mix of different topics and that would mean
>>> that
>>> > RollingSink isn't able to

Re: How to avoid breaking states when upgrading Flink job?

2016-07-01 Thread Josh
)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)


On Fri, Jul 1, 2016 at 10:21 AM, Josh <jof...@gmail.com> wrote:

> Thanks guys, that's very helpful info!
>
> @Aljoscha I thought I saw this exception on a job that was using the
> RocksDB state backend, but I'm not sure. I will do some more tests today to
> double check. If it's still a problem I'll try the explicit class
> definitions solution.
>
> Josh
>
> On Thu, Jun 30, 2016 at 1:27 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Also, you're using the FsStateBackend, correct?
>>
>> Reason I'm asking is that the problem should not occur for the RocksDB
>> state backend. There, we don't serialize any user code, only binary data. A
>> while back I wanted to change the FsStateBackend to also work like this.
>> Now might be a good time to actually do this. :-)
>>
>> On Thu, 30 Jun 2016 at 14:10 Till Rohrmann <trohrm...@apache.org> wrote:
>>
>>> Hi Josh,
>>>
>>> you could also try to replace your anonymous classes by explicit class
>>> definitions. This should assign these classes a fixed name independent of
>>> the other anonymous classes. Then the class loader should be able to
>>> deserialize your serialized data.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Jun 30, 2016 at 1:55 PM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Hi Josh,
>>>> I think in your case the problem is that Scala might choose different
>>>> names for synthetic/generated classes. This will trip up the code that is
>>>> trying to restore from a snapshot that was done with an earlier version of
>>>> the code where classes where named differently.
>>>>
>>>> I'm afraid I don't know how to solve this one right now, except by
>>>> switching to Java.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Thu, 30 Jun 2016 at 13:38 Maximilian Michels <m...@apache.org> wrote:
>>>>
>>>>> Hi Josh,
>>>>>
>>>>> You have to assign UIDs to all operators to change the topology. Plus,
>>>>> you have to add dummy operators for all UIDs which you removed; this
>>>>> is a limitation currently because Flink will attempt to find all UIDs
>>>>> of the old job.
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On Wed, Jun 29, 2016 at 9:00 PM, Josh <jof...@gmail.com> wrote:
>>>>> > Hi all,
>>>>> > Is there any information out there on how to avoid breaking saved
>>>>> > states/savepoints when making changes to a Flink job and redeploying
>>>>> it?
>>>>> >
>>>>> > I want to know how to avoid exceptions like this:
>>>>> >
>>>>> > java.lang.RuntimeException: Failed to deserialize state handle and
>>>>> setup
>>>>> > initial operator state.
>>>>> >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
>>>>> >   at java.lang.Thread.run(Thread.java:745)
>>>>> > Caused by: java.lang.ClassNotFoundException:
>>>>> > com.me.flink.MyJob$$anon$1$$anon$7$$anon$4
>>>>> >
>>>>> >
>>>>> > The best information I could find in the docs is here:
>>>>> >
>>>>> >
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>>>>> >
>>>>> >
>>>>> > Having made the suggested changes to my job (i.e. giving a uid to
>>>>> every
>>>>> > stateful sink and map function), what changes to the job/topology
>>>>> are then
>>>>> > allowed/not allowed?
>>>>> >
>>>>> >
>>>>> > If I'm 'naming' my states by providing uids, why does Flink need to
>>>>> look for
>>>>> > a specific class, like com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?
>>>>> >
>>>>> >
>>>>> > Thanks for any advice,
>>>>> >
>>>>> > Josh
>>>>>
>>>>
>>>
>


Re: How to avoid breaking states when upgrading Flink job?

2016-07-01 Thread Josh
Thanks guys, that's very helpful info!

@Aljoscha I thought I saw this exception on a job that was using the
RocksDB state backend, but I'm not sure. I will do some more tests today to
double check. If it's still a problem I'll try the explicit class
definitions solution.

Josh

On Thu, Jun 30, 2016 at 1:27 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Also, you're using the FsStateBackend, correct?
>
> Reason I'm asking is that the problem should not occur for the RocksDB
> state backend. There, we don't serialize any user code, only binary data. A
> while back I wanted to change the FsStateBackend to also work like this.
> Now might be a good time to actually do this. :-)
>
> On Thu, 30 Jun 2016 at 14:10 Till Rohrmann <trohrm...@apache.org> wrote:
>
>> Hi Josh,
>>
>> you could also try to replace your anonymous classes by explicit class
>> definitions. This should assign these classes a fixed name independent of
>> the other anonymous classes. Then the class loader should be able to
>> deserialize your serialized data.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 30, 2016 at 1:55 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi Josh,
>>> I think in your case the problem is that Scala might choose different
>>> names for synthetic/generated classes. This will trip up the code that is
>>> trying to restore from a snapshot that was done with an earlier version of
>>> the code where classes where named differently.
>>>
>>> I'm afraid I don't know how to solve this one right now, except by
>>> switching to Java.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 30 Jun 2016 at 13:38 Maximilian Michels <m...@apache.org> wrote:
>>>
>>>> Hi Josh,
>>>>
>>>> You have to assign UIDs to all operators to change the topology. Plus,
>>>> you have to add dummy operators for all UIDs which you removed; this
>>>> is a limitation currently because Flink will attempt to find all UIDs
>>>> of the old job.
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On Wed, Jun 29, 2016 at 9:00 PM, Josh <jof...@gmail.com> wrote:
>>>> > Hi all,
>>>> > Is there any information out there on how to avoid breaking saved
>>>> > states/savepoints when making changes to a Flink job and redeploying
>>>> it?
>>>> >
>>>> > I want to know how to avoid exceptions like this:
>>>> >
>>>> > java.lang.RuntimeException: Failed to deserialize state handle and
>>>> setup
>>>> > initial operator state.
>>>> >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
>>>> >   at java.lang.Thread.run(Thread.java:745)
>>>> > Caused by: java.lang.ClassNotFoundException:
>>>> > com.me.flink.MyJob$$anon$1$$anon$7$$anon$4
>>>> >
>>>> >
>>>> > The best information I could find in the docs is here:
>>>> >
>>>> >
>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>>>> >
>>>> >
>>>> > Having made the suggested changes to my job (i.e. giving a uid to
>>>> every
>>>> > stateful sink and map function), what changes to the job/topology are
>>>> then
>>>> > allowed/not allowed?
>>>> >
>>>> >
>>>> > If I'm 'naming' my states by providing uids, why does Flink need to
>>>> look for
>>>> > a specific class, like com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?
>>>> >
>>>> >
>>>> > Thanks for any advice,
>>>> >
>>>> > Josh
>>>>
>>>
>>


Re: Flink on YARN - how to resize a running cluster?

2016-06-30 Thread Josh
Hi Till,

Thanks, that's very helpful!
So I guess in that case, since it isn't possible to increase the job
parallelism later, it might be sensible to use say 10x the parallelism that
I need right now (even if only running on a couple of task managers) - so
that it's possible to scale the job in the future if I need to?

Josh

On Thu, Jun 30, 2016 at 11:17 AM, Till Rohrmann <trohrm...@apache.org>
wrote:

> Hi Josh,
>
> at the moment it is not possible to dynamically increase the parallelism
> of your job. The same holds true for a restarting a job from a savepoint.
> But we're currently working on exactly this. So in order to change the
> parallelism of your job, you would have to restart the job from scratch.
>
> Adding task managers dynamically to your running Flink cluster, is
> possible if you allocate new YARN containers and then start a TaskManager
> process manually with the current job manager address and port. You can
> either find the address and port out using the web dashboard under job
> manager configuration or you look up the .yarn-properties file which is
> stored in your temp directory on your machine. This file also contains the
> job manager address. But the easier way would be to stop your yarn session
> and then restart it with an increased number of containers. Because then,
> you wouldn't have to ship the lib directory, which might contain user code
> classes, manually.
>
> Cheers,
> Till
>
> On Wed, Jun 29, 2016 at 10:13 PM, Josh <jof...@gmail.com> wrote:
>
>> I'm running a Flink cluster as a YARN application, started by:
>> ./bin/yarn-session.sh -n 4 -jm 2048 -tm 4096 -d
>>
>> There are 2 worker nodes, so each are allocated 2 task managers. There is
>> a stateful Flink job running on the cluster with a parallelism of 2.
>>
>> If I now want to increase the number of worker nodes to 3, and add 2
>> extra task managers, and then increase the job parallelism, how should I do
>> this?
>>
>> I'm using EMR, so adding an extra worker node and making it available to
>> YARN is easy to do via the AWS console. But I haven't been able to find any
>> information in Flink docs about how to resize a running Flink cluster on
>> YARN. Is it possible to resize it while the YARN application is running, or
>> do I need to stop the YARN application and redeploy the cluster? Also do I
>> need to redeploy my Flink job from a savepoint to increase its parallelism,
>> or do I do this while the job is running?
>>
>> I tried redeploying the cluster having added a third worker node, via:
>>
>> > yarn application -kill myflinkcluster
>>
>> > ./bin/yarn-session.sh -n 6 -jm 2048 -tm 4096 -d
>>
>> (note increasing the number of task managers from 4 to 6)
>>
>> Surprisingly, this redeployed a Flink cluster with 4 task mangers (not
>> 6!) and restored my job from the last checkpoint.
>>
>> Can anyone point me in the right direction?
>>
>> Thanks,
>>
>> Josh
>>
>>
>>
>>
>


Flink on YARN - how to resize a running cluster?

2016-06-29 Thread Josh
I'm running a Flink cluster as a YARN application, started by:
./bin/yarn-session.sh -n 4 -jm 2048 -tm 4096 -d

There are 2 worker nodes, so each are allocated 2 task managers. There is a
stateful Flink job running on the cluster with a parallelism of 2.

If I now want to increase the number of worker nodes to 3, and add 2 extra
task managers, and then increase the job parallelism, how should I do this?

I'm using EMR, so adding an extra worker node and making it available to
YARN is easy to do via the AWS console. But I haven't been able to find any
information in Flink docs about how to resize a running Flink cluster on
YARN. Is it possible to resize it while the YARN application is running, or
do I need to stop the YARN application and redeploy the cluster? Also do I
need to redeploy my Flink job from a savepoint to increase its parallelism,
or do I do this while the job is running?

I tried redeploying the cluster having added a third worker node, via:

> yarn application -kill myflinkcluster

> ./bin/yarn-session.sh -n 6 -jm 2048 -tm 4096 -d

(note increasing the number of task managers from 4 to 6)

Surprisingly, this redeployed a Flink cluster with 4 task mangers (not 6!)
and restored my job from the last checkpoint.

Can anyone point me in the right direction?

Thanks,

Josh


How to avoid breaking states when upgrading Flink job?

2016-06-29 Thread Josh
Hi all,
Is there any information out there on how to avoid breaking saved
states/savepoints when making changes to a Flink job and redeploying it?

I want to know how to avoid exceptions like this:

java.lang.RuntimeException: Failed to deserialize state handle and
setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
com.me.flink.MyJob$$anon$1$$anon$7$$anon$4


The best information I could find in the docs is here:

https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html


Having made the suggested changes to my job (i.e. giving a uid to
every stateful sink and map function), what changes to the
job/topology are then allowed/not allowed?


If I'm 'naming' my states by providing uids, why does Flink need to
look for a specific class, like
com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ?


Thanks for any advice,

Josh


Re: Kinesis connector classpath issue when running Flink 1.1-SNAPSHOT on YARN

2016-06-23 Thread Josh
Hi Aljoscha,
I opened an issue here https://issues.apache.org/jira/browse/FLINK-4115 and
submitted a pull request.
I'm not sure if my fix is the best way to resolve this, or if it's better
to just remove the verification checks completely.

Thanks,
Josh

On Thu, Jun 23, 2016 at 9:41 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Josh,
> do you maybe want to open an issue for that and contribute your fix for
> that?
>
> Cheers,
> Aljoscha
>
> On Fri, 17 Jun 2016 at 17:49 Josh <jof...@gmail.com> wrote:
>
>> Hi Aljoscha,
>>
>> Thanks! It looks like you're right. I've ran it with the FsStateBackend
>> and everything works fine.
>>
>> I've also got it working with RocksDBStateBackend now, by rebuilding
>> Flink master with:
>> - the verify step in FsStateBackend skipped for URIs with s3 schemes.
>> - the initialisation of filesystem in the constructor commented out (not
>> sure why this is initialised in the constructor, since it seems to get
>> initialised later anyway)
>>
>> Josh
>>
>>
>>
>> On Fri, Jun 17, 2016 at 2:53 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> I think the problem with the missing Class
>>> com.amazon.ws.emr.hadoop.fs.EmrFileSystem is not specific to RocksDB. The
>>> exception is thrown in the FsStateBackend, which is internally used by the
>>> RocksDB backend to do snapshotting of non-partitioned state. The problem is
>>> that the FsStateBackend tries to verify that the checkpoint path exists in
>>> the constructor. The constructor is invoked in the client program, when not
>>> running in the Yarn context where the correct jars that hold the EMR
>>> FileSystem classes are available. This should be causing the exception.
>>>
>>> Just to verify, could you maybe run it with the FsStateBackend to see if
>>> you get the same exception. If yes, then we need to remove the verify step
>>> in the FsStateBackend or at least provide a way to bypass these steps.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Fri, 17 Jun 2016 at 15:40 Josh <jof...@gmail.com> wrote:
>>>
>>>> I found that I can still write to s3, using my Flink build of
>>>> 1.1-SNAPSHOT, for example if I run the word count example:
>>>> ./bin/flink run ./examples/batch/WordCount.jar --input
>>>> hdfs:///tmp/LICENSE --output s3://permutive-flink/wordcount-result.txt
>>>>
>>>> This works fine - it's just the RocksDBStateBackend which is erroring
>>>> with the s3 URI. I'm wondering if it could be an issue with
>>>> RocksDBStateBackend?
>>>>
>>>>
>>>> On Fri, Jun 17, 2016 at 12:09 PM, Josh <jof...@gmail.com> wrote:
>>>>
>>>>> Hi Gordon/Fabian,
>>>>>
>>>>> Thanks for helping with this! Downgrading the Maven version I was
>>>>> using to build Flink appears to have fixed that problem - I was using 
>>>>> Maven
>>>>> 3.3.3 before and have downgraded to 3.2.5.
>>>>>
>>>>> Just for reference, I printed the loaded class at runtime and found
>>>>> that when I was using Flink built with Maven 3.3.3, it was pulling in:
>>>>>
>>>>> jar:file:/opt/flink/flink-1.1-SNAPSHOT/lib/flink-dist_2.11-1.1-SNAPSHOT.jar!/org/apache/http/params/HttpConnectionParams.class
>>>>> But after building with the older Maven version, it pulled in the
>>>>> class from my jar:
>>>>>
>>>>> jar:file:/tmp/my-assembly-1.0.jar!/org/apache/http/params/HttpConnectionParams.class
>>>>>
>>>>>
>>>>> Unfortunately now that problem is fixed I've now got a different
>>>>> classpath issue. It started with:
>>>>>
>>>>> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
>>>>> com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>>>>> at
>>>>> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
>>>>> at
>>>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
>>>>> at
>>>>> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
>>>>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
>>>>> at
>>>>> org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsSta

Re: Kinesis connector classpath issue when running Flink 1.1-SNAPSHOT on YARN

2016-06-17 Thread Josh
I found that I can still write to s3, using my Flink build of 1.1-SNAPSHOT,
for example if I run the word count example:
./bin/flink run ./examples/batch/WordCount.jar --input hdfs:///tmp/LICENSE
--output s3://permutive-flink/wordcount-result.txt

This works fine - it's just the RocksDBStateBackend which is erroring with
the s3 URI. I'm wondering if it could be an issue with RocksDBStateBackend?


On Fri, Jun 17, 2016 at 12:09 PM, Josh <jof...@gmail.com> wrote:

> Hi Gordon/Fabian,
>
> Thanks for helping with this! Downgrading the Maven version I was using to
> build Flink appears to have fixed that problem - I was using Maven 3.3.3
> before and have downgraded to 3.2.5.
>
> Just for reference, I printed the loaded class at runtime and found that
> when I was using Flink built with Maven 3.3.3, it was pulling in:
>
> jar:file:/opt/flink/flink-1.1-SNAPSHOT/lib/flink-dist_2.11-1.1-SNAPSHOT.jar!/org/apache/http/params/HttpConnectionParams.class
> But after building with the older Maven version, it pulled in the class
> from my jar:
>
> jar:file:/tmp/my-assembly-1.0.jar!/org/apache/http/params/HttpConnectionParams.class
>
>
> Unfortunately now that problem is fixed I've now got a different classpath
> issue. It started with:
>
> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
> at
> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:383)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:175)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:144)
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.(RocksDBStateBackend.java:205)
>
> This is strange because I used an s3:// checkpoint directory when running
> Flink 1.0.3 on EMR and it worked fine. (according to
> https://ci.apache.org/projects/flink/flink-docs-master/setup/aws.html#provide-s3-filesystem-dependency
> no configuration should be needed to use S3 when running on EMR).
>
> Anyway I tried executing /etc/hadoop/conf/hadoop-env.sh before running my
> job, as this sets up the HADOOP_CLASSPATH env var. The exception then
> changed to:
> java.lang.NoClassDefFoundError: org/apache/hadoop/fs/common/Abortable
>
> I found that this class is related to a jar called s3-dist-cp, so then I
> tried copying that jar to Flink's lib directory from
> /usr/share/aws/emr/s3-dist-cp/lib/*
>
> And now I'm back to another Kinesis connector classpath error:
>
> java.lang.NoClassDefFoundError: org/apache/http/conn/ssl/SSLSocketFactory
> at
> com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:136)
> at
> com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:120)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:157)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:137)
> at
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.(KinesisProxy.java:76)
> at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.(FlinkKinesisConsumer.java:166)
> at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.(FlinkKinesisConsumer.java:140)
> at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.(FlinkKinesisConsumer.java:123)
>
> I guess this is related to me adding a bunch of extra stuff to the
> classpath in an attempt to solve the EmrFileSystem error. Any ideas what
> caused that error in the first place?
>
> By the way, I built Flink with:
> mvn clean install -Pinclude-kinesis,vendor-repos -DskipTests
> -Dhadoop.version=2.7.1
>
> Josh
>
> On Fri, Jun 17, 2016 at 9:56 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Josh,
>>
>> I assume that you build the SNAPSHOT version yourself. I had similar
>> version conflicts for Apache HttpCore with Flink SNAPSHOT versions on EMR.
>> The problem is cause by a changed behavior in Maven 3.3 and following
>> versions.
>> Due to these changes, the dependency shading is not working correctly.
>> That's why we use Maven 3.2 to build the Flink release artifacts.
>>
>> Can you check whether you used Maven 3.3 and try to downgrade to 3.2 if
>> that was the case?
>>
>> Cheers, Fabian
>>

Re: Moving from single-node, Maven examples to cluster execution

2016-06-16 Thread Josh
Hi Prez,

You need to build a jar with all your dependencies bundled inside. With
maven you can use maven-assembly-plugin for this, or with SBT there's
sbt-assembly.

Once you've done this, you can login to the JobManager node of your Flink
cluster, copy the jar across and use the Flink command line tool to submit
jobs to the running cluster, e.g. (from the Flink root directory):

./bin/flink run -c my.application.MainClass /path/to/YourApp.jar

See https://ci.apache.org/projects/flink/flink-docs-master/apis/cli.html

You can also run the Flink command line tool locally and submit the jar to
a remote JobManager with the -m flag. Although I don't do this at the
moment because it doesn't work so easily if you're running Flink on AWS/EMR.

Josh

On Thu, Jun 16, 2016 at 10:51 PM, Prez Cannady <revp...@opencorrelate.org>
wrote:

> Having a hard time trying to get my head around how to deploy my Flink
> programs to a pre-configured, remote Flink cluster setup.
>
> My Mavenized setup uses Spring Boot (to simplify class path handling and
> generate pretty logs) to execute provision a StreamExecutionEnvironment
> with Kafka sources and sinks. I can also run this quite effective the
> standard way (`java -jar …`).  What I’m unclear on is how I might go about
> distributing this code to run on an existing Flink cluster setup.  Where do
> I drop the jars? Do I need to restart Flink to do so?
>
> class AppRunner extends CommandLineRunner {
>
> val log = LoggerFactory.getLogger(classOf[AppRunner])
>
>
> override def run(args: String*): Unit = {
>
> val env : StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
>
> val consumer = …
> val producer = ...
> val stream = env.addSource(consumer)
>
> stream
> …
> // Do some stuff
> …
> .addSink(producer)
>
> env.execute
> }
>
>
>
> }
> …
>
> @SpringBootApplication
> object App {
>
>
> @throws(classOf[Exception])
> def main( args: Array[String] ) : Unit = {
> SpringApplication.run(classOf[AppRunner], args: _*)
> }
> }
>
>
> Try as I might, I couldn’t find any clear instructions on how to do this
> in the documentation.  The cluster documentation ends with starting it.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.8/cluster_setup.html#starting-flink
>
> The Wikiedits example doesn’t involve any third party dependencies, so I’m
> not clear on how to manage class path for it.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/quickstart/run_example_quickstart.html
>
> Any help in getting me on the right, preferably best practices path would
> be appreciated.
>
>
> Prez Cannady
> p: 617 500 3378
> e: revp...@opencorrelate.org
> GH: https://github.com/opencorrelate
> LI: https://www.linkedin.com/in/revprez
>
>
>
>
>
>
>
>
>
>


Kinesis connector classpath issue when running Flink 1.1-SNAPSHOT on YARN

2016-06-16 Thread Josh
Hey,

I've been running the Kinesis connector successfully now for a couple of
weeks, on a Flink cluster running Flink 1.0.3 on EMR 2.7.1/YARN.

Today I've been trying to get it working on a cluster running the current
Flink master (1.1-SNAPSHOT) but am running into a classpath issue when
starting the job. This only happens when running on EMR/YARN (it's fine
when running 1.1-SNAPSHOT locally, and when running 1.0.3 on EMR)


 The program finished with the following exception:

java.lang.NoSuchMethodError:
org.apache.http.params.HttpConnectionParams.setSoKeepalive(Lorg/apache/http/params/HttpParams;Z)V
at
com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96)
at com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:187)
at
com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:136)
at
com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:120)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:157)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:137)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.(KinesisProxy.java:76)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.(FlinkKinesisConsumer.java:166)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.(FlinkKinesisConsumer.java:140)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.(FlinkKinesisConsumer.java:123)
---

Any ideas what's going on?

The job I'm deploying has httpclient 4.3.6 and httpcore 4.3.3 which I
believe are the libraries with the HttpConnectionParams class.

Thanks,
Josh


Re: Migrating from one state backend to another

2016-06-15 Thread Josh
Hi Aljoscha,
Thanks, that makes sense. I will start using RocksDB right away then.

Josh

On Wed, Jun 15, 2016 at 1:01 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> right now migrating from one state backend to another is not possible. I
> have it in the back of my head, however, that we should introduce a common
> serialized representation of state to make this possible in the future.
> (Both for checkpoints and savepoints, which use the same mechanism
> underneath.)
>
> Cheers,
> Aljoscha
>
> On Wed, 15 Jun 2016 at 00:04 Josh <jof...@gmail.com> wrote:
>
>> I'm trying to decide whether to set-up RocksDB now or later.
>>
>> The state for my Flink jobs right now will be able to fit into memory, so
>> I can use the filesystem state backend. In a few months time it is likely
>> the state will no longer fit into memory, so I will want to use the RocksDB
>> backend.
>>
>> I was just wondering if it's possible/easy to use savepoints to migrate
>> existing state from the filesystem backend to the RocksDB backend? As I
>> would not want to lose any job state when switching to RocksDB. If there's
>> a way to do it then I can worry about RocksDB later.
>>
>> Thanks!
>> Josh
>>
>


Migrating from one state backend to another

2016-06-14 Thread Josh
I'm trying to decide whether to set-up RocksDB now or later.

The state for my Flink jobs right now will be able to fit into memory, so I
can use the filesystem state backend. In a few months time it is likely the
state will no longer fit into memory, so I will want to use the RocksDB
backend.

I was just wondering if it's possible/easy to use savepoints to migrate
existing state from the filesystem backend to the RocksDB backend? As I
would not want to lose any job state when switching to RocksDB. If there's
a way to do it then I can worry about RocksDB later.

Thanks!
Josh


Re: Accessing StateBackend snapshots outside of Flink

2016-06-13 Thread Josh
Hello,
I have a follow-up question to this: since Flink doesn't support state
expiration at the moment (e.g. expiring state which hasn't been updated for
a certain amount of time), would it be possible to clear up old UDF states
by:
- store a 'last_updated" timestamp in the state value
- periodically (e.g. monthly) go through all the state values in RocksDB,
deserialize them using TypeSerializer and read the "last_updated" property
- delete the key from RocksDB if the state's "last_updated" property is
over a month ago

Is there any reason this approach wouldn't work, or anything to be careful
of?

Thanks,
Josh


On Mon, Apr 18, 2016 at 8:23 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> key refers to the key extracted by your KeySelector. Right now, for every
> named state (i.e. the name in the StateDescriptor) there is a an isolated
> RocksDB instance.
>
> Cheers,
> Aljoscha
>
> On Sat, 16 Apr 2016 at 15:43 Igor Berman <igor.ber...@gmail.com> wrote:
>
>> thanks a lot for the info, seems not too complex
>> I'll try to write simple tool to read this state.
>>
>> Aljoscha, does the key reflects unique id of operator in some way? Or key
>> is just a "name" that passed to ValueStateDescriptor.
>>
>> thanks in advance
>>
>>
>> On 15 April 2016 at 15:10, Stephan Ewen <se...@apache.org> wrote:
>>
>>> One thing to add is that you can always trigger a persistent checkpoint
>>> via the "savepoints" feature:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
>>>
>>>
>>>
>>> On Fri, Apr 15, 2016 at 10:24 AM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> for RocksDB we simply use a TypeSerializer to serialize the key and
>>>> value to a byte[] array and store that in RocksDB. For a ListState, we
>>>> serialize the individual elements using a TypeSerializer and store them in
>>>> a comma-separated list in RocksDB. The snapshots of RocksDB that we write
>>>> to HDFS are regular backups of a RocksDB database, as described here:
>>>> https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F. You
>>>> should be possible to read them from HDFS and restore them to a RocksDB
>>>> data base as described in the linked documentation.
>>>>
>>>> tl;dr As long as you know the type of values stored in the state you
>>>> should be able to read them from RocksDB and deserialize the values using
>>>> TypeSerializer.
>>>>
>>>> One more bit of information: Internally the state is keyed by (key,
>>>> namespace) -> value where namespace can be an arbitrary type that has a
>>>> TypeSerializer. We use this to store window state that is both local to key
>>>> and the current window. For state that you store in a user-defined function
>>>> the namespace will always be null and that will be serialized by a
>>>> VoidSerializer that simply always writes a "0" byte.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Fri, 15 Apr 2016 at 00:18 igor.berman <igor.ber...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> we are evaluating Flink for new solution and several people raised
>>>>> concern
>>>>> of coupling too much to Flink -
>>>>> 1. we understand that if we want to get full fault tolerance and best
>>>>> performance we'll need to use Flink managed state(probably RocksDB
>>>>> backend
>>>>> due to volume of state)
>>>>> 2. but then if we latter find that Flink doesn't answer our needs(for
>>>>> any
>>>>> reason) - we'll need to extract this state in some way(since it's the
>>>>> only
>>>>> source of consistent state)
>>>>> In general I'd like to be able to take snapshot of backend and try to
>>>>> read
>>>>> it...do you think it's will be trivial task?
>>>>> say If I'm holding list state per partitioned key, would it be easy to
>>>>> take
>>>>> RocksDb file and open it?
>>>>>
>>>>> any thoughts regarding how can I convince people in our team?
>>>>>
>>>>> thanks in advance!
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-StateBackend-snapshots-outside-of-Flink-tp6116.html
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive at Nabble.com.
>>>>>
>>>>
>>>
>>


Re: Using Flink watermarks and a large window state for scheduling

2016-06-09 Thread Josh
Ok, thanks Aljoscha.

As an alternative to using Flink to maintain the schedule state, I could
take the (e, t2) stream and write to a external key-value store with a
bucket for each minute. Then have a separate service which polls the
key-value store every minute and retrieves the current bucket, and does the
final transformation.

I just thought there might be a nicer way to do it using Flink!

On Thu, Jun 9, 2016 at 2:23 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Josh,
> I'll have to think a bit about that one. Once I have something I'll get
> back to you.
>
> Best,
> Aljoscha
>
> On Wed, 8 Jun 2016 at 21:47 Josh <jof...@gmail.com> wrote:
>
>> This is just a question about a potential use case for Flink:
>>
>> I have a Flink job which receives tuples with an event id and a timestamp
>> (e, t) and maps them into a stream (e, t2) where t2 is a future timestamp
>> (up to 1 year in the future, which indicates when to schedule a
>> transformation of e). I then want to key by e and keep track of the max t2
>> for each e. Now the tricky bit: I want to periodically, say every minute
>> (in event time world) take all (e, t2) where t2 occurred in the last
>> minute, do a transformation and emit the result. It is important that the
>> final transformation happens after t2 (preferably as soon as possible, but
>> a delay of minutes is fine).
>>
>> Is it possible to use Flink's windowing and watermark mechanics to
>> achieve this? I want to maintain a large state for the (e, t2) window, e.g.
>> over a year (probably too large to fit in memory). And somehow use
>> watermarks to execute the scheduled transformations.
>>
>> If anyone has any views on how this could be done, (or whether it's even
>> possible/a good idea to do) with Flink then it would be great to hear!
>>
>> Thanks,
>>
>> Josh
>>
>


Using Flink watermarks and a large window state for scheduling

2016-06-08 Thread Josh
This is just a question about a potential use case for Flink:

I have a Flink job which receives tuples with an event id and a timestamp
(e, t) and maps them into a stream (e, t2) where t2 is a future timestamp
(up to 1 year in the future, which indicates when to schedule a
transformation of e). I then want to key by e and keep track of the max t2
for each e. Now the tricky bit: I want to periodically, say every minute
(in event time world) take all (e, t2) where t2 occurred in the last
minute, do a transformation and emit the result. It is important that the
final transformation happens after t2 (preferably as soon as possible, but
a delay of minutes is fine).

Is it possible to use Flink's windowing and watermark mechanics to achieve
this? I want to maintain a large state for the (e, t2) window, e.g. over a
year (probably too large to fit in memory). And somehow use watermarks to
execute the scheduled transformations.

If anyone has any views on how this could be done, (or whether it's even
possible/a good idea to do) with Flink then it would be great to hear!

Thanks,

Josh


Re: ClassCastException when redeploying Flink job on running cluster

2016-06-08 Thread Josh
Thanks Till, your suggestion worked!

I actually just created a new SpecificData for each
AvroDeserializationSchema instance, so I think it's still just as efficient.

Josh

On Wed, Jun 8, 2016 at 4:41 PM, Till Rohrmann <trohrm...@apache.org> wrote:

> The only thing I could think of is to not use the SpecificData singleton
> but instead creating a new SpecificData object for each SpecificDatumReader
> (you can pass it as a third argument to the constructor). This, of course,
> is not really efficient. But you could try it out to see whether it solves
> your problem.
>
> Cheers,
> Till
>
> On Wed, Jun 8, 2016 at 4:24 PM, Josh <jof...@gmail.com> wrote:
>
>> Sorry - I forgot to include my stack trace too. Here it is:
>>
>> The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Job execution failed.
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>> at
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>> at
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:536)
>> at com.me.flink.job.MyFlinkJob$.main(MyFlinkJob.scala:85)
>> at com.me.flink.job.MyFlinkJob.main(MyFlinkJob.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> at
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.lang.Exception: Could not forward element to next operator
>> at
>> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.run(KinesisDataFetcher.java:150)
>> at
>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:285)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Could not forward element to next
>> operator
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>> at
>> 

Re: ClassCastException when redeploying Flink job on running cluster

2016-06-08 Thread Josh
Sorry - I forgot to include my stack trace too. Here it is:

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:536)
at com.me.flink.job.MyFlinkJob$.main(MyFlinkJob.scala:85)
at com.me.flink.job.MyFlinkJob.main(MyFlinkJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Could not forward element to next operator
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.run(KinesisDataFetcher.java:150)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:285)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
at
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerThread.run(ShardConsumerThread.java:141)
Caused by: java.lang.ClassCastException: com.me.avro.MyAvroType cannot be
cast to com.me.avro.MyAvroType
at com.me.flink.job.MyFlinkJob$$anonfun$1.apply(MyFlinkJob.scala:61)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:746)
at
org.apache.flink.streaming.api.functions.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:71)
at
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:63)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
... 3 more

On Wed, Jun 8, 2016 at 3:19 PM, Josh <jof...@gmail.com> wrote:

> Hi Till,
>
> Thanks for the reply! I see - yes it does sound very much like FLINK-1390.
>
> Please see my AvroDeserializationSchema implementation here:
> http://pastebin.com/mK7SfBQ8
>
> I think perhaps the problem is caused by this line:
> val readerSche

Re: ClassCastException when redeploying Flink job on running cluster

2016-06-08 Thread Josh
Hi Till,

Thanks for the reply! I see - yes it does sound very much like FLINK-1390.

Please see my AvroDeserializationSchema implementation here:
http://pastebin.com/mK7SfBQ8

I think perhaps the problem is caused by this line:
val readerSchema = SpecificData.get().getSchema(classTag[T].runtimeClass)

Looking at SpecificData, it contains a classCache which is a map of strings
to classes, similar to what's described in FLINK-1390.

I'm not sure how to change my AvroDeserializationSchema to prevent this
from happening though! Do you have any ideas?

Josh



On Wed, Jun 8, 2016 at 11:23 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Josh,
>
> the error message you've posted usually indicates that there is a class
> loader issue. When you first run your program the class
> com.me.avro.MyAvroType will be first loaded (by the user code class
> loader). I suspect that this class is now somewhere cached (e.g. the avro
> serializer) and when you run your program a second time, then there is a
> new user code class loader which has loaded the same class and now you want
> to convert an instance of the first class into the second class. However,
> these two classes are not identical since they were loaded by different
> class loaders.
>
> In order to find the culprit, it would be helpful to see the full stack
> trace of the ClassCastException and the code of the
> AvroDeserializationSchema. I suspect that something similar to
> https://issues.apache.org/jira/browse/FLINK-1390 is happening.
>
> Cheers,
> Till
>
> On Wed, Jun 8, 2016 at 10:38 AM, Josh <jof...@gmail.com> wrote:
>
>> Hi all,
>>
>> Currently I have to relaunch my Flink cluster every time I want to
>> upgrade/redeploy my Flink job, because otherwise I get a ClassCastException:
>>
>> java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to
>> com.me.avro.MyAvroType
>>
>> It's related to MyAvroType which is an class generated from an Avro
>> schema. The ClassCastException occurs every time I redeploy the job without
>> killing the Flink cluster (even if there have been no changes to the
>> job/jar).
>>
>> I wrote my own AvroDeserializationSchema in Scala which does something a
>> little strange to get the avro type information (see below), and I'm
>> wondering if that's causing the problem when the Flink job creates an
>> AvroDeserializationSchema[MyAvroType].
>>
>> Does anyone have any ideas?
>>
>> Thanks,
>> Josh
>>
>>
>>
>> class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag]
>> extends DeserializationSchema[T] {
>>
>>   ...
>>
>>   private val avroType = classTag[T].runtimeClass.asInstanceOf[Class[T]]
>>
>>   private val typeInformation = TypeExtractor.getForClass(avroType)
>>
>>   ...
>>
>>   override def getProducedType: TypeInformation[T] = typeInformation
>>
>>   ...
>>
>> }
>>
>
>


ClassCastException when redeploying Flink job on running cluster

2016-06-08 Thread Josh
Hi all,

Currently I have to relaunch my Flink cluster every time I want to
upgrade/redeploy my Flink job, because otherwise I get a ClassCastException:

java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to
com.me.avro.MyAvroType

It's related to MyAvroType which is an class generated from an Avro schema.
The ClassCastException occurs every time I redeploy the job without killing
the Flink cluster (even if there have been no changes to the job/jar).

I wrote my own AvroDeserializationSchema in Scala which does something a
little strange to get the avro type information (see below), and I'm
wondering if that's causing the problem when the Flink job creates an
AvroDeserializationSchema[MyAvroType].

Does anyone have any ideas?

Thanks,
Josh



class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag] extends
DeserializationSchema[T] {

  ...

  private val avroType = classTag[T].runtimeClass.asInstanceOf[Class[T]]

  private val typeInformation = TypeExtractor.getForClass(avroType)

  ...

  override def getProducedType: TypeInformation[T] = typeInformation

  ...

}


Re: Combining streams with static data and using REST API as a sink

2016-05-25 Thread Josh
Hi Aljoscha,

That sounds exactly like the kind of feature I was looking for, since my
use-case fits the "Join stream with slowly evolving data" example. For now,
I will do an implementation similar to Max's suggestion. Of course it's not
as nice as the proposed feature, as there will be a delay in receiving
updates since the updates aren't being continuously ingested by Flink. But
it certainly sounds like it would be a nice feature to have!

Thanks,
Josh

On Tue, May 24, 2016 at 1:48 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Josh,
> for the first part of your question you might be interested in our ongoing
> work of adding side inputs to Flink. I started this design doc:
> https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit?usp=sharing
>
> It's still somewhat rough around the edges but could you see this being
> useful for your case? I also have some more stuff that I will shortly add
> to the document.
>
> Cheers,
> Aljoscha
>
> On Tue, 24 May 2016 at 14:34 Maximilian Michels <m...@apache.org> wrote:
>
>> Hi Josh,
>>
>> You can trigger an occasional refresh, e.g. on every 100 elements
>> received. Or, you could start a thread that does that every 100
>> seconds (possible with a lock involved to prevent processing in the
>> meantime).
>>
>> Cheers,
>> Max
>>
>> On Mon, May 23, 2016 at 7:36 PM, Josh <jof...@gmail.com> wrote:
>> >
>> > Hi Max,
>> >
>> > Thanks, that's very helpful re the REST API sink. For now I don't need
>> exactly once guarantees for the sink, so I'll just write a simple HTTP sink
>> implementation. But may need to move to the idempotent version in future!
>> >
>> > For 1), that sounds like a simple/easy solution, but how would I handle
>> occasional updates in that case, since I guess the open() function is only
>> called once? Do I need to periodically restart the job, or periodically
>> trigger tasks to restart and refresh their data? Ideally I would want this
>> job to be running constantly.
>> >
>> > Josh
>> >
>> > On Mon, May 23, 2016 at 5:56 PM, Maximilian Michels <m...@apache.org>
>> wrote:
>> >>
>> >> Hi Josh,
>> >>
>> >> 1) Use a RichFunction which has an `open()` method to load data (e.g.
>> from a database) at runtime before the processing starts.
>> >>
>> >> 2) No that's fine. If you want your Rest API Sink to interplay with
>> checkpointing (for fault-tolerance), this is a bit tricky though depending
>> on the guarantees you want to have. Typically, you would have "at least
>> once" or "exactly once" semantics on the state. In Flink, this is easy to
>> achieve, it's a bit harder for outside systems.
>> >>
>> >> "At Least Once"
>> >>
>> >> For example, if you increment a counter in a database, this count will
>> be off if you recover your job in the case of a failure. You can checkpoint
>> the current value of the counter and restore this value on a failure (using
>> the Checkpointed interface). However, your counter might decrease
>> temporarily when you resume from a checkpoint (until the counter has caught
>> up again).
>> >>
>> >> "Exactly Once"
>> >>
>> >> If you want "exactly once" semantics on outside systems (e.g. Rest
>> API), you'll need idempotent updates. An idempotent variant of this would
>> be a count with a checkpoint id (cid) in your database.
>> >>
>> >> | cid | count |
>> >> |-+---|
>> >> |   0 | 3 |
>> >> |   1 |11 |
>> >> |   2 |20 |
>> >> |   3 |   120 |
>> >> |   4 |   137 |
>> >> |   5 |   158 |
>> >>
>> >> You would then always read the newest cid value for presentation. You
>> would only write to the database once you know you have completed the
>> checkpoint (CheckpointListener). You can still fail while doing that, so
>> you need to keep the confirmation around in the checkpoint such that you
>> can confirm again after restore. It is important that confirmation can be
>> done multiple times without affecting the result (idempotent). On recovery
>> from a checkpoint, you want to delete all rows higher with a cid higher
>> than the one you resume from. For example, if you fail after checkpoint 3
>> has been created, you'll confirm 3 (because you might have failed before
>> you could confirm) and then delete 4 and 5 before starting the computa

Re: Combining streams with static data and using REST API as a sink

2016-05-23 Thread Josh
Hi Max,

Thanks, that's very helpful re the REST API sink. For now I don't need
exactly once guarantees for the sink, so I'll just write a simple HTTP sink
implementation. But may need to move to the idempotent version in future!

For 1), that sounds like a simple/easy solution, but how would I handle
occasional updates in that case, since I guess the open() function is only
called once? Do I need to periodically restart the job, or periodically
trigger tasks to restart and refresh their data? Ideally I would want this
job to be running constantly.

Josh

On Mon, May 23, 2016 at 5:56 PM, Maximilian Michels <m...@apache.org> wrote:

> Hi Josh,
>
> 1) Use a RichFunction which has an `open()` method to load data (e.g. from
> a database) at runtime before the processing starts.
>
> 2) No that's fine. If you want your Rest API Sink to interplay with
> checkpointing (for fault-tolerance), this is a bit tricky though depending
> on the guarantees you want to have. Typically, you would have "at least
> once" or "exactly once" semantics on the state. In Flink, this is easy to
> achieve, it's a bit harder for outside systems.
>
> "At Least Once"
>
> For example, if you increment a counter in a database, this count will be
> off if you recover your job in the case of a failure. You can checkpoint
> the current value of the counter and restore this value on a failure (using
> the Checkpointed interface). However, your counter might decrease
> temporarily when you resume from a checkpoint (until the counter has caught
> up again).
>
> "Exactly Once"
>
> If you want "exactly once" semantics on outside systems (e.g. Rest API),
> you'll need idempotent updates. An idempotent variant of this would be a
> count with a checkpoint id (cid) in your database.
>
> | cid | count |
> |-+---|
> |   0 | 3 |
> |   1 |11 |
> |   2 |20 |
> |   3 |   120 |
> |   4 |   137 |
> |   5 |   158 |
>
> You would then always read the newest cid value for presentation. You
> would only write to the database once you know you have completed the
> checkpoint (CheckpointListener). You can still fail while doing that, so
> you need to keep the confirmation around in the checkpoint such that you
> can confirm again after restore. It is important that confirmation can be
> done multiple times without affecting the result (idempotent). On recovery
> from a checkpoint, you want to delete all rows higher with a cid higher
> than the one you resume from. For example, if you fail after checkpoint 3
> has been created, you'll confirm 3 (because you might have failed before
> you could confirm) and then delete 4 and 5 before starting the computation
> again.
>
> You see, that strong consistency guarantees can be a bit tricky. If you
> don't need strong guarantees and undercounting is ok for you, implement a
> simple checkpointing for "at least once" using the Checkpointed interface
> or the KeyValue state if your counter is scoped by key.
>
> Cheers,
> Max
>
>
> On Mon, May 23, 2016 at 3:22 PM, Josh <jof...@gmail.com> wrote:
> > Hi all,
> >
> > I am new to Flink and have a couple of questions which I've had trouble
> > finding answers to online. Any advice would be much appreciated!
> >
> > What's a typical way of handling the scenario where you want to join
> > streaming data with a (relatively) static data source? For example, if I
> > have a stream 'orders' where each order has an 'item_id', and I want to
> join
> > this stream with my database of 'items'. The database of items is mostly
> > static (with perhaps a few new items added every day). The database can
> be
> > retrieved either directly from a standard SQL database (postgres) or via
> a
> > REST call. I guess one way to handle this would be to distribute the
> > database of items with the Flink tasks, and to redeploy the entire job if
> > the items database changes. But I think there's probably a better way to
> do
> > it?
> > I'd like my Flink job to output state to a REST API. (i.e. using the REST
> > API as a sink). Updates would be incremental, e.g. the job would output
> > tumbling window counts which need to be added to some property on a REST
> > resource, so I'd probably implement this as a PATCH. I haven't found much
> > evidence that anyone else has used a REST API as a Flink sink - is there
> a
> > reason why this might be a bad idea?
> >
> > Thanks for any advice on these,
> >
> > Josh
>
>


Re: Combining streams with static data and using REST API as a sink

2016-05-23 Thread Josh
Hi Rami,

Thanks for the fast reply.

   1. In your solution, would I need to create a new stream for 'item
   updates', and add it as a source of my Flink job? Then I would need to
   ensure item updates get broadcast to all nodes that are running my job and
   use them to update the in-memory items database? This sounds like it might
   be a good solution, but I'm not sure how the broadcast would work - it
   sounds like I'd need Flink broadcast variables, but it looks like there's
   no support for changing datasets at the moment:
   https://issues.apache.org/jira/browse/FLINK-3514
   2. I don't understand why an HTTP sink isn't possible. Say the output of
   my job is 'number of items ordered per customer', then for each output I
   want to update a 'customer' in my database, incrementing their
   'item_order_count'. What's wrong with doing that update in the Flink job
   via an HTTP REST call (updating the customer resource), rather than writing
   directly to a database? The reason I'd like to do it this way is to
   decouple the underlying database from Flink.

Josh

On Mon, May 23, 2016 at 2:35 PM, Al-Isawi Rami <rami.al-is...@comptel.com>
wrote:

> Hi Josh,
>
> I am no expert in Flink yet, but here are my thoughts on this:
>
> 1. what about you stream an event to flink everytime the DB of items have
> an update? then in some background thread you get the new data from the DB
> let it be through REST (if it is only few updates a day) then load the
> results in memory and there is your updated static data.
>
> 2. REST API are over HTTP, how that is possible to be a sink? does not
> sound like flink job at all to serve http requests. simply sink the results
> to some DB and have some component to read from DB and serve it as REST API.
>
> -Rami
>
> On 23 May 2016, at 16:22, Josh <jof...@gmail.com> wrote:
>
> Hi all,
>
> I am new to Flink and have a couple of questions which I've had trouble
> finding answers to online. Any advice would be much appreciated!
>
>1. What's a typical way of handling the scenario where you want to
>join streaming data with a (relatively) static data source? For example, if
>I have a stream 'orders' where each order has an 'item_id', and I want to
>join this stream with my database of 'items'. The database of items is
>mostly static (with perhaps a few new items added every day). The database
>can be retrieved either directly from a standard SQL database (postgres) or
>via a REST call. I guess one way to handle this would be to distribute the
>database of items with the Flink tasks, and to redeploy the entire job if
>the items database changes. But I think there's probably a better way to do
>it?
>2. I'd like my Flink job to output state to a REST API. (i.e. using
>the REST API as a sink). Updates would be incremental, e.g. the job would
>output tumbling window counts which need to be added to some property on a
>REST resource, so I'd probably implement this as a PATCH. I haven't found
>much evidence that anyone else has used a REST API as a Flink sink - is
>there a reason why this might be a bad idea?
>
> Thanks for any advice on these,
>
> Josh
>
>
> Disclaimer: This message and any attachments thereto are intended solely
> for the addressed recipient(s) and may contain confidential information. If
> you are not the intended recipient, please notify the sender by reply
> e-mail and delete the e-mail (including any attachments thereto) without
> producing, distributing or retaining any copies thereof. Any review,
> dissemination or other use of, or taking of any action in reliance upon,
> this information by persons or entities other than the intended
> recipient(s) is prohibited. Thank you.
>


Combining streams with static data and using REST API as a sink

2016-05-23 Thread Josh
Hi all,

I am new to Flink and have a couple of questions which I've had trouble
finding answers to online. Any advice would be much appreciated!

   1. What's a typical way of handling the scenario where you want to join
   streaming data with a (relatively) static data source? For example, if I
   have a stream 'orders' where each order has an 'item_id', and I want to
   join this stream with my database of 'items'. The database of items is
   mostly static (with perhaps a few new items added every day). The database
   can be retrieved either directly from a standard SQL database (postgres) or
   via a REST call. I guess one way to handle this would be to distribute the
   database of items with the Flink tasks, and to redeploy the entire job if
   the items database changes. But I think there's probably a better way to do
   it?
   2. I'd like my Flink job to output state to a REST API. (i.e. using the
   REST API as a sink). Updates would be incremental, e.g. the job would
   output tumbling window counts which need to be added to some property on a
   REST resource, so I'd probably implement this as a PATCH. I haven't found
   much evidence that anyone else has used a REST API as a Flink sink - is
   there a reason why this might be a bad idea?

Thanks for any advice on these,

Josh


Re: External DB as sink - with processing guarantees

2016-03-12 Thread Josh
Hi Fabian,

Thanks, that's very helpful. Actually most of my writes will be idempotent so I 
guess that means I'll get the exact once guarantee using the Hadoop output 
format!

Thanks,
Josh

> On 12 Mar 2016, at 09:14, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Josh,
> 
> Flink can guarantee exactly-once processing within its data flow given that 
> the data sources allow to replay data from a specific position in the stream. 
> For example, Flink's Kafka Consumer supports exactly-once.
> 
> Flink achieves exactly-once processing by resetting operator state to a 
> consistent state and replaying data. This means that data might actually be 
> processed more than once, but the operator state will reflect exactly-once 
> semantics because it was reset. Ensuring exactly-once end-to-end it 
> difficult, because Flink does not control (and cannot reset) the state of the 
> sinks. By default, data can be sent more than once to a sink resulting in 
> at-least-once semantics at the sink.
> 
> This issue can be addressed, if the sink provides transactional writes 
> (previous writes can be undone) or if the writes are idempotent (applying 
> them several times does not change the result). Transactional support would 
> need to be integrated with Flink's SinkFunction. This is not the case for 
> Hadoop OutputFormats. I am not familiar with the details of DynamoDB, but you 
> would need to implement a SinkFunction with transactional support or use 
> idempotent writes if you want to achieve exactly-once results.
> 
> Best, Fabian
> 
> 2016-03-12 9:57 GMT+01:00 Josh <jof...@gmail.com>:
>> Thanks Nick, that sounds good. I would still like to have an understanding 
>> of what determines the processing guarantee though. Say I use a DynamoDB 
>> Hadoop OutputFormat with Flink, how do I know what guarantee I have? And if 
>> it's at-least-once, is there a way to adapt it to achieve exactly-once?
>> 
>> Thanks,
>> Josh
>> 
>>> On 12 Mar 2016, at 02:46, Nick Dimiduk <ndimi...@gmail.com> wrote:
>>> 
>>> Pretty much anything you can write to from a Hadoop MapReduce program can 
>>> be a Flink destination. Just plug in the OutputFormat and go.
>>> 
>>> Re: output semantics, your mileage may vary. Flink should do you fine for 
>>> at least once.
>>> 
>>>> On Friday, March 11, 2016, Josh <jof...@gmail.com> wrote:
>>>> Hi all,
>>>> 
>>>> I want to use an external data store (DynamoDB) as a sink with Flink. It 
>>>> looks like there's no connector for Dynamo at the moment, so I have two 
>>>> questions:
>>>> 
>>>> 1. Is it easy to write my own sink for Flink and are there any docs around 
>>>> how to do this?
>>>> 2. If I do this, will I still be able to have Flink's processing 
>>>> guarantees? I.e. Can I be sure that every tuple has contributed to the 
>>>> DynamoDB state either at-least-once or exactly-once?
>>>> 
>>>> Thanks for any advice,
>>>> Josh
> 


Re: External DB as sink - with processing guarantees

2016-03-12 Thread Josh
Thanks Nick, that sounds good. I would still like to have an understanding of 
what determines the processing guarantee though. Say I use a DynamoDB Hadoop 
OutputFormat with Flink, how do I know what guarantee I have? And if it's 
at-least-once, is there a way to adapt it to achieve exactly-once?

Thanks,
Josh

> On 12 Mar 2016, at 02:46, Nick Dimiduk <ndimi...@gmail.com> wrote:
> 
> Pretty much anything you can write to from a Hadoop MapReduce program can be 
> a Flink destination. Just plug in the OutputFormat and go.
> 
> Re: output semantics, your mileage may vary. Flink should do you fine for at 
> least once.
> 
>> On Friday, March 11, 2016, Josh <jof...@gmail.com> wrote:
>> Hi all,
>> 
>> I want to use an external data store (DynamoDB) as a sink with Flink. It 
>> looks like there's no connector for Dynamo at the moment, so I have two 
>> questions:
>> 
>> 1. Is it easy to write my own sink for Flink and are there any docs around 
>> how to do this?
>> 2. If I do this, will I still be able to have Flink's processing guarantees? 
>> I.e. Can I be sure that every tuple has contributed to the DynamoDB state 
>> either at-least-once or exactly-once?
>> 
>> Thanks for any advice,
>> Josh


External DB as sink - with processing guarantees

2016-03-11 Thread Josh
Hi all,

I want to use an external data store (DynamoDB) as a sink with Flink. It looks 
like there's no connector for Dynamo at the moment, so I have two questions:

1. Is it easy to write my own sink for Flink and are there any docs around how 
to do this?
2. If I do this, will I still be able to have Flink's processing guarantees? 
I.e. Can I be sure that every tuple has contributed to the DynamoDB state 
either at-least-once or exactly-once?

Thanks for any advice,
Josh