Ingesting data from an API

2018-11-11 Thread Aarti Gupta
Hi,

I have an API that emits output that I want to use as a data source for
Flink.

I have written a custom source function that is as follows -

public class DynamicRuleSource extends AlertingRuleSource {
private ArrayList rules = new ArrayList();


public void run(SourceContext ctx) throws Exception {
System.out.println("In run ");
while(true) {
while (!rules.isEmpty()) {
Rule rule = rules.remove(0);
ctx.collectWithTimestamp(rule, 0);
ctx.emitWatermark(new Watermark(0));
}
Thread.sleep(1000);
}
}

public void addRule(Rule rule) {
rules.add(rule);
}

@Override
public void cancel() {
}
}


When the API is invoked, it calls the addRule method in my CustomSource
function.

The run method in CustomSource polls for any data to be ingested.

The same object instance is shared with the API and the Flink Execution
environment, however, the output of the API does not get ingested into the
Flink DataStream.

Is this the right pattern to use, or is Kafka the recommended way of
streaming data into Flink ?

--Aarti

-- 
Aarti Gupta 
Director, Engineering, Correlation


aagu...@qualys.com
T


Qualys, Inc. – Blog  | Community
 | Twitter 





RE: RichInputFormat working differently in eclipse and in flink cluster

2018-11-11 Thread Teena Kappen // BPRISE
Hi Till,

We are using 1.4.0. We have not tried this any other releases.

We will try this on 1.6.2 and see what happens.

Thank you.

Regards,
Teena

From: Till Rohrmann 
Sent: 07 November 2018 20:23
To: Teena Kappen // BPRISE 
Cc: user 
Subject: Re: RichInputFormat working differently in eclipse and in flink cluster

Hi Teena,

which Flink version are you using? Have you tried whether this happens with the 
latest release 1.6.2 as well?

Cheers,
Till

On Fri, Oct 26, 2018 at 1:17 PM Teena Kappen // BPRISE 
mailto:teena.kap...@bprise.com>> wrote:
Hi all,

I have implemented RichInputFormat for reading result of aggregation queries in 
Elasticsearch. There are around 10 buckets, which are of type json array. 
Note: This is one time response.

My idea here is to iterate these arrays in parallel. Here is the pseudo code.

public void configure(Configuration parameters) {
System.out.println("configure");
}

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
}

public ResponseInputSplit[] createInputSplits(int minNumSplits){
System.out.println("createInputSplits");

//read from elastic
// add buckets to array
}

public InputSplitAssigner getInputSplitAssigner(ResponseInputSplit[] 
inputSplits) {
//this is default
System.out.println("getInputSplitAssigner");
return new DefaultInputSplitAssigner(inputSplits);
}

public void open(ResponseInputSplit split) {
//read buckets
}

public boolean reachedEnd(){
System.out.println("reachedEnd");
}

public Bounce nextRecord(Bounce reuse) {
}

public void close(){
}

// my main method,
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet bounce_data_set = env.createInput(new 
MyInputDataSetInputFormat());

When running in eclipse, it executes createInputSplits and the results look 
fine. Logs are given below.
Output is -->
configure
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1685591882] 
with leader session id...
configure
createInputSplits

When submitting job in flink cluster, it doesn’t execute ‘configure’ and 
‘createInputSplits’ methods. Instead it directly goes to nextRecord function. 
Logs are given below.
Output is -->
Starting execution of program
configure
Submitting job with JobID: 47526660fc9a463cad4bee04a4ba99d9. Waiting for job 
completion.
Connected to JobManager at Actor[akka.tcp://flink@:xxx 
/user/jobmanager#1219973491] with leader session id...
10/26/2018 15:05:57 Job execution switched to status RUNNING.
10/26/2018 15:05:57 DataSource (at 
createInput(ExecutionEnvironment.java:547) ())(1/1) switched to SCHEDULED
10/26/2018 15:05:57 DataSource (at 
createInput(ExecutionEnvironment.java:547) ())(1/1) switched to DEPLOYING
10/26/2018 15:06:00 DataSource (at 
createInput(ExecutionEnvironment.java:547) ())(1/1) switched to RUNNING
10/26/2018 15:06:00 DataSource (at 
createInput(ExecutionEnvironment.java:547) ())(1/1) switched to FAILED
java.lang.NullPointerException
   at com.xxx.test. 
MyInputDataSetInputFormat.nextRecord(MyInputDataSetInputFormat.java:143)

Regards,
Teena



Re: Any examples on invoke the Flink REST API post method ?

2018-11-11 Thread vino yang
Hi Henry,

Maybe Gary can help you, ping him for you.

Thanks, vino.

徐涛  于2018年11月12日周一 下午12:45写道:

> HI Experts,
> I am trying to trigger a savepoint from Flink REST API on version 1.6 , in
> the document it shows that I need to pass a json as a request body
> {
>  "type" : "object”,
>   "id" :
> "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:savepoints:SavepointTriggerRequestBody”,
>  "properties" : {
>  "target-directory" : { "type" : "string" },
>  "cancel-job" : { "type" : "boolean" }
>  }
> }
> So I send the following json as
> {
> "type":"object”,
>
> "id":"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:savepoints:SavepointTriggerRequestBody”,
> "properties”:{
> "target-directory":"hdfs:///flinkDsl”,
> "cancel-job”:false
> }
> }
>
> And I use okhttp to send the request:
> val MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8")
> val body = RequestBody.create(MEDIA_TYPE_JSON, postBody)
> val request = new Request.Builder()
>   .url(url)
>   .post(body)
>   .build()
> client.newCall(request).execute()
>
>
> but get an error  {"errors":["Request did not match expected format
> SavepointTriggerRequestBody.”]}
> Would anyone give an example of how to invoke the post rest api of Flink?
> Thanks a lot.
>
> Best
> Henry
>


Any examples on invoke the Flink REST API post method ?

2018-11-11 Thread 徐涛
HI Experts,
I am trying to trigger a savepoint from Flink REST API on version 1.6 , 
in the document it shows that I need to pass a json as a request body
{ 
 "type" : "object”, 
"id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:savepoints:SavepointTriggerRequestBody”,
 
 "properties" : { 
 "target-directory" : { "type" : "string" },
 "cancel-job" : { "type" : "boolean" } 
 } 
}

So I send the following json as 
{
"type":"object”,

"id":"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:savepoints:SavepointTriggerRequestBody”,
"properties”:{
"target-directory":"hdfs:///flinkDsl”,
"cancel-job”:false
}
}

And I use okhttp to send the request:
val MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8")
val body = RequestBody.create(MEDIA_TYPE_JSON, postBody)
val request = new Request.Builder()
.url(url)
.post(body)
.build()
client.newCall(request).execute()


but get an error  {"errors":["Request did not match expected format 
SavepointTriggerRequestBody.”]}
Would anyone give an example of how to invoke the post rest api of 
Flink?
Thanks a lot.

Best
Henry

Re: Where is the "Latest Savepoint" information saved?

2018-11-11 Thread Hao Sun
Thanks, I'll check it out.

On Sun, Nov 11, 2018 at 10:52 AM Ufuk Celebi  wrote:

> I think there should be a log message, but I don't know what the exact
> format is (you would need to look through it and search for something
> related to CompletedCheckpointStore).
>
> An alternative is the web UI checkpointing tab. It shows the latest
> checkpoint used for restore of the job. You should see your savepoint
> there.
>
> Best,
>
> Ufuk
>
>
> On Sun, Nov 11, 2018 at 7:45 PM Hao Sun  wrote:
> >
> > This is great, I will try option 3 and let you know.
> > Can I log some message so I know job is recovered from the latest
> savepoint?
> >
> > On Sun, Nov 11, 2018 at 10:42 AM Ufuk Celebi  wrote:
> >>
> >> Hey Hao and Paul,
> >>
> >> 1) Fetch checkpoint info manually from ZK (problematic, not recommended)
> >> - As Paul pointed out, this is problematic as the node is a serialized
> >> pointer (StateHandle) to a CompletedCheckpoint in the HA storage
> >> directory and not a path [1].
> >> - I would not recommend this approach at the moment
> >>
> >> 2) Using the HTTP API to fetch the latest savepoint pointer (possible,
> >> but cumbersome)
> >> - As Paul proposed, you could use /jobs/:jobid/checkpoints to fetch
> >> checkpoint statistics about the latest savepoint
> >> - The latest savepoint is available as a separate entry under
> >> `latest.savepoint` (If I'm reading the docs [2] correctly)
> >> - You would need to manually do this before shutting down (requires
> >> custom tooling to automate)
> >>
> >> 3) Use cancelWithSavepoint
> >> - If you keep `high-availability.cluster-id` consistent between
> >> executions of your job cluster, using cancelWithSavepoint [3] should
> >> add the the savepoint to ZK before cancelling the job
> >> - On the next execution of your job cluster, Flink should
> >> automatically pick it up (no need to attach a savepoint restore path
> >> manually)
> >>
> >> I've not tried 3) myself yet, but believe it should work. If you have
> >> time to try it out, I'd be happy to hear whether it works as expected
> >> for you.
> >>
> >> – Ufuk
> >>
> >> [1] I believe this is overly complicated and should be simplified in
> the future.
> >> [2] Search /jobs/:jobid/checkpoints in
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
> 
> >> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html#job-cancellation
> 
> >>
> >> On Fri, Nov 9, 2018 at 5:03 PM Hao Sun  wrote:
> >> >
> >> > Can we add an option to allow job cluster mode to start from the
> latest save point? Otherwise I have to somehow get the info from ZK, before
> job cluster's container started by K8s.
> >> >
> >> > On Fri, Nov 9, 2018, 01:29 Paul Lam  wrote:
> >> >>
> >> >> Hi Hao,
> >> >>
> >> >> The savepoint path is stored in ZK, but it’s in binary format, so in
> order to retrieve the path you have to deserialize it back to some Flink
> internal object.
> >> >>
> >> >> A better approach would be using REST api to get the path. You could
> find it here[1].
> >> >>
> >> >> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
> 
> >> >>
> >> >> Best,
> >> >> Paul Lam
> >> >>
> >> >>
> >> >> 在 2018年11月9日,13:55,Hao Sun  写道:
> >> >>
> >> >> Since this save point path is very useful to application updates,
> where is this information stored? Can we keep it in ZK or S3 for retrieval?
> >> >>
> >> >> 
> >> >>
> >> >>
>


Re: Where is the "Latest Savepoint" information saved?

2018-11-11 Thread Ufuk Celebi
I think there should be a log message, but I don't know what the exact
format is (you would need to look through it and search for something
related to CompletedCheckpointStore).

An alternative is the web UI checkpointing tab. It shows the latest
checkpoint used for restore of the job. You should see your savepoint
there.

Best,

Ufuk

On Sun, Nov 11, 2018 at 7:45 PM Hao Sun  wrote:
>
> This is great, I will try option 3 and let you know.
> Can I log some message so I know job is recovered from the latest savepoint?
>
> On Sun, Nov 11, 2018 at 10:42 AM Ufuk Celebi  wrote:
>>
>> Hey Hao and Paul,
>>
>> 1) Fetch checkpoint info manually from ZK (problematic, not recommended)
>> - As Paul pointed out, this is problematic as the node is a serialized
>> pointer (StateHandle) to a CompletedCheckpoint in the HA storage
>> directory and not a path [1].
>> - I would not recommend this approach at the moment
>>
>> 2) Using the HTTP API to fetch the latest savepoint pointer (possible,
>> but cumbersome)
>> - As Paul proposed, you could use /jobs/:jobid/checkpoints to fetch
>> checkpoint statistics about the latest savepoint
>> - The latest savepoint is available as a separate entry under
>> `latest.savepoint` (If I'm reading the docs [2] correctly)
>> - You would need to manually do this before shutting down (requires
>> custom tooling to automate)
>>
>> 3) Use cancelWithSavepoint
>> - If you keep `high-availability.cluster-id` consistent between
>> executions of your job cluster, using cancelWithSavepoint [3] should
>> add the the savepoint to ZK before cancelling the job
>> - On the next execution of your job cluster, Flink should
>> automatically pick it up (no need to attach a savepoint restore path
>> manually)
>>
>> I've not tried 3) myself yet, but believe it should work. If you have
>> time to try it out, I'd be happy to hear whether it works as expected
>> for you.
>>
>> – Ufuk
>>
>> [1] I believe this is overly complicated and should be simplified in the 
>> future.
>> [2] Search /jobs/:jobid/checkpoints in
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
>> [3] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html#job-cancellation
>>
>> On Fri, Nov 9, 2018 at 5:03 PM Hao Sun  wrote:
>> >
>> > Can we add an option to allow job cluster mode to start from the latest 
>> > save point? Otherwise I have to somehow get the info from ZK, before job 
>> > cluster's container started by K8s.
>> >
>> > On Fri, Nov 9, 2018, 01:29 Paul Lam  wrote:
>> >>
>> >> Hi Hao,
>> >>
>> >> The savepoint path is stored in ZK, but it’s in binary format, so in 
>> >> order to retrieve the path you have to deserialize it back to some Flink 
>> >> internal object.
>> >>
>> >> A better approach would be using REST api to get the path. You could find 
>> >> it here[1].
>> >>
>> >> [1] 
>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
>> >>
>> >> Best,
>> >> Paul Lam
>> >>
>> >>
>> >> 在 2018年11月9日,13:55,Hao Sun  写道:
>> >>
>> >> Since this save point path is very useful to application updates, where 
>> >> is this information stored? Can we keep it in ZK or S3 for retrieval?
>> >>
>> >> 
>> >>
>> >>


Re: java.io.IOException: NSS is already initialized

2018-11-11 Thread Ufuk Celebi
Hey Hao,

1) Regarding Hadoop S3: are you using the repackaged Hadoop S3
dependency from the /opt folder of the Flink distribution? Or the
actual Hadoop implementation? If latter, would you mind also running
it with the one that comes packaged with Flink? For this you can
remove all Hadoop-related configuration in your flink-conf.yaml and
copy the Hadoop S3 dependency from /opt to /lib and configure it [1].

2) Could you please share your complete Flink configuration for when
you tried to run with Presto S3? If you don't want to share this
publicly, feel free to share it privately with me. I'm curious to see
whether we can reproduce this.

– Ufuk

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended
On Sat, Nov 10, 2018 at 4:07 PM Hao Sun  wrote:
>
> Hi Ufuk, thanks for checking. I am using openJDK 1.8_171, I still have the 
> same issue with presto.
>
> - why checkpoint is not starting from 1? old chk stored in ZK caused it, I 
> cleaned it up, but not very helpful
> - I switched to Flink + Hadoop28, and used hadoop s3, with no other changes, 
> check pointing is working with the hadoop flavour.
>
> On Fri, Nov 9, 2018 at 2:02 PM Ufuk Celebi  wrote:
>>
>> Hey Hao Sun,
>>
>> - Is this an intermittent failure or permanent? The logs indicate that
>> some checkpoints completed before the error occurs (e.g. checkpoint
>> numbers are greater than 1).
>>
>> - Which Java versions are you using? And which Java image? I've
>> Googled similar issues that seem to be related to the JVM, e.g. [1].
>>
>> Best,
>>
>> Ufuk
>>
>> [1] 
>> https://dev.lucee.org/t/could-not-initialize-class-sun-security-ssl-sslcontextimp/3972
>>
>>
>> On Thu, Nov 8, 2018 at 8:55 PM Hao Sun  wrote:
>> >
>> > Thanks, any insight/help here is appreciated.
>> >
>> > On Thu, Nov 8, 2018 at 4:38 AM Dawid Wysakowicz  
>> > wrote:
>> >>
>> >> Hi Hao,
>> >>
>> >> I am not sure, what might be wrong, but I've cc'ed Gary and Kostas who 
>> >> were recently working with S3, maybe they will have some ideas.
>> >>
>> >> Best,
>> >>
>> >> Dawid
>> >>
>> >> On 03/11/2018 03:09, Hao Sun wrote:
>> >>
>> >> Same environment, new error.
>> >>
>> >> I can run the same docker image with my local Mac, but on K8S, this gives 
>> >> me this error.
>> >> I can not think of any difference between local Docker and K8S Docker.
>> >>
>> >> Any hint will be helpful. Thanks
>> >>
>> >> 
>> >>
>> >> 2018-11-02 23:29:32,981 INFO 
>> >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job 
>> >> ConnectedStreams maxwell.accounts () 
>> >> switched from state RUNNING to FAILING.
>> >> AsynchronousException{java.lang.Exception: Could not materialize 
>> >> checkpoint 235 for operator Source: KafkaSource(maxwell.accounts) -> 
>> >> MaxwellFilter->Maxwell(maxwell.accounts) -> 
>> >> FixedDelayWatermark(maxwell.accounts) -> 
>> >> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink: 
>> >> influxdbSink(maxwell.accounts) (1/1).}
>> >> at 
>> >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>> >> at 
>> >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>> >> at 
>> >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>> >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> >> at 
>> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> >> at 
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >> at java.lang.Thread.run(Thread.java:748)
>> >> Caused by: java.lang.Exception: Could not materialize checkpoint 235 for 
>> >> operator Source: KafkaSource(maxwell.accounts) -> 
>> >> MaxwellFilter->Maxwell(maxwell.accounts) -> 
>> >> FixedDelayWatermark(maxwell.accounts) -> 
>> >> MaxwellFPSEvent->InfluxDBData(maxwell.accounts) -> Sink: 
>> >> influxdbSink(maxwell.accounts) (1/1).
>> >> at 
>> >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>> >> ... 6 more
>> >> Caused by: java.util.concurrent.ExecutionException: 
>> >> java.lang.NoClassDefFoundError: Could not initialize class 
>> >> sun.security.ssl.SSLSessionImpl
>> >> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> >> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> >> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>> >>
>> >> at 
>> >> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(http://OperatorSnapshotFinalizer.java:53)
>> >>
>> >> at 
>> >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>> >> ... 5 more
>> >> Caused 

Re: Where is the "Latest Savepoint" information saved?

2018-11-11 Thread Hao Sun
This is great, I will try option 3 and let you know.
Can I log some message so I know job is recovered from the latest savepoint?

On Sun, Nov 11, 2018 at 10:42 AM Ufuk Celebi  wrote:

> Hey Hao and Paul,
>
> 1) Fetch checkpoint info manually from ZK (problematic, not recommended)
> - As Paul pointed out, this is problematic as the node is a serialized
> pointer (StateHandle) to a CompletedCheckpoint in the HA storage
> directory and not a path [1].
> - I would not recommend this approach at the moment
>
> 2) Using the HTTP API to fetch the latest savepoint pointer (possible,
> but cumbersome)
> - As Paul proposed, you could use /jobs/:jobid/checkpoints to fetch
> checkpoint statistics about the latest savepoint
> - The latest savepoint is available as a separate entry under
> `latest.savepoint` (If I'm reading the docs [2] correctly)
> - You would need to manually do this before shutting down (requires
> custom tooling to automate)
>
> 3) Use cancelWithSavepoint
> - If you keep `high-availability.cluster-id` consistent between
> executions of your job cluster, using cancelWithSavepoint [3] should
> add the the savepoint to ZK before cancelling the job
> - On the next execution of your job cluster, Flink should
> automatically pick it up (no need to attach a savepoint restore path
> manually)
>
> I've not tried 3) myself yet, but believe it should work. If you have
> time to try it out, I'd be happy to hear whether it works as expected
> for you.
>
> – Ufuk
>
> [1] I believe this is overly complicated and should be simplified in the
> future.
> [2] Search /jobs/:jobid/checkpoints in
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
> 
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html#job-cancellation
> 
>
> On Fri, Nov 9, 2018 at 5:03 PM Hao Sun  wrote:
> >
> > Can we add an option to allow job cluster mode to start from the latest
> save point? Otherwise I have to somehow get the info from ZK, before job
> cluster's container started by K8s.
> >
> > On Fri, Nov 9, 2018, 01:29 Paul Lam  wrote:
> >>
> >> Hi Hao,
> >>
> >> The savepoint path is stored in ZK, but it’s in binary format, so in
> order to retrieve the path you have to deserialize it back to some Flink
> internal object.
> >>
> >> A better approach would be using REST api to get the path. You could
> find it here[1].
> >>
> >> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
> 
> >>
> >> Best,
> >> Paul Lam
> >>
> >>
> >> 在 2018年11月9日,13:55,Hao Sun  写道:
> >>
> >> Since this save point path is very useful to application updates, where
> is this information stored? Can we keep it in ZK or S3 for retrieval?
> >>
> >> 
> >>
> >>
>


Re: Where is the "Latest Savepoint" information saved?

2018-11-11 Thread Ufuk Celebi
Hey Hao and Paul,

1) Fetch checkpoint info manually from ZK (problematic, not recommended)
- As Paul pointed out, this is problematic as the node is a serialized
pointer (StateHandle) to a CompletedCheckpoint in the HA storage
directory and not a path [1].
- I would not recommend this approach at the moment

2) Using the HTTP API to fetch the latest savepoint pointer (possible,
but cumbersome)
- As Paul proposed, you could use /jobs/:jobid/checkpoints to fetch
checkpoint statistics about the latest savepoint
- The latest savepoint is available as a separate entry under
`latest.savepoint` (If I'm reading the docs [2] correctly)
- You would need to manually do this before shutting down (requires
custom tooling to automate)

3) Use cancelWithSavepoint
- If you keep `high-availability.cluster-id` consistent between
executions of your job cluster, using cancelWithSavepoint [3] should
add the the savepoint to ZK before cancelling the job
- On the next execution of your job cluster, Flink should
automatically pick it up (no need to attach a savepoint restore path
manually)

I've not tried 3) myself yet, but believe it should work. If you have
time to try it out, I'd be happy to hear whether it works as expected
for you.

– Ufuk

[1] I believe this is overly complicated and should be simplified in the future.
[2] Search /jobs/:jobid/checkpoints in
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html#job-cancellation
On Fri, Nov 9, 2018 at 5:03 PM Hao Sun  wrote:
>
> Can we add an option to allow job cluster mode to start from the latest save 
> point? Otherwise I have to somehow get the info from ZK, before job cluster's 
> container started by K8s.
>
> On Fri, Nov 9, 2018, 01:29 Paul Lam  wrote:
>>
>> Hi Hao,
>>
>> The savepoint path is stored in ZK, but it’s in binary format, so in order 
>> to retrieve the path you have to deserialize it back to some Flink internal 
>> object.
>>
>> A better approach would be using REST api to get the path. You could find it 
>> here[1].
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
>>
>> Best,
>> Paul Lam
>>
>>
>> 在 2018年11月9日,13:55,Hao Sun  写道:
>>
>> Since this save point path is very useful to application updates, where is 
>> this information stored? Can we keep it in ZK or S3 for retrieval?
>>
>> 
>>
>>


Best practice to write data from a stream to non-relational, distributed database (hbase)

2018-11-11 Thread Marke Builder
Hi,

what is the prefered way to wirte streaming data to hbase?
Rolling File Sink or Streaming File Sink?
How can I configure this (open the connection with conf, and the write
handling(key,data)?
What do I have to consider about the partitions? I prefer a write pro
partition.

Thanks!
Marke