Re: ProcessingTimeTimer in ProcessFunction after a savepoint

2017-03-20 Thread Florian König
@Aljoscha Thank you for the pointer to ProcessFunction. That looks like a 
better approach with less code and other overhead.

After restoring, the job is both reading new elements and emitting some, but 
nowhere near as many as expected. I’ll investigate further after switching to 
ProcessFunction. I suspect that there is some problem with my code. I’ll let 
you know if any unexplained discrepancy remains.

> Am 20.03.2017 um 14:15 schrieb Aljoscha Krettek :
> 
> As a general remark, I think the ProcessFunction 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html)
>  could be better suited for implementing such a use case.
> 
> I did run tests on Flink 1.2 and master with a simple processing-time 
> windowing job. After performing a savepoint and waiting a few minutes I 
> restored and the windows that were still there immediately fired.
> 
> In your case, after restoring, is the Job also reading new elements or did 
> you try with just restoring without any new input?
> 
>> On 19 Mar 2017, at 13:15, Florian König  wrote:
>> 
>> @Aljoscha: We’re using 1.2.
>> 
>> The intention of our code is as follows: The events that flow through Flink 
>> represent scheduling decisions, i.e. they contain the ID of a target entity, 
>> a description of an action that should be performed on that entity by some 
>> other job, and a timestamp of when that should happen.
>> 
>> We’re using the windowing mechanism to delay those events until they should 
>> be forwarded (and trigger the corresponding action). Furthermore, the 
>> schedule can be moved closer to the current point in time: subsequent 
>> schedule events for an entity (identified by its ID) can set the trigger 
>> time to an earlier instant. If the trigger time is in the past or very 
>> shortly (e.g., 100 ms) after now, the action should be triggered 
>> immediately. Actions scheduled for an instant after the currently planned 
>> one should be ignored; i.e. the schedule cannot be moved to the future.
>> 
>> exemplary event stream
>>  time … (ID, action, trigger time)   // intended reaction
>>  0 … (1, 'foo', 10)  // trigger action 'foo' on entity 1 at 
>> time 10
>>  3 … (2, 'bar', 15)  // trigger action 'bar' on entity 2 at 
>> time 15
>>  4 … (1, 'foo', 7)   // move trigger back to time 7
>>  9 … (1, 'foo', 12)  // ignore
>>  15 … (2, 'bar', 15) // trigger immediately
>> 
>> resulting stream:
>>  (1, 'foo', 7)   // at time 7
>>  (2, 'bar', 15)  // at time 15
>> 
>> To implement this, we have written a custom trigger that’s called from the 
>> following Flink code:
>> 
>> …
>> schedules.keyBy(schedule -> schedule.entityId)
>>  .window(GlobalWindows.create())
>>  .trigger(DynamicEarliestWindowTrigger.create())
>>  .fold((Schedule) null, (folded, schedule) -> schedule)
>>  .map( /* process schedules */ )
>> …
>> 
>> We fold the scheduling events 'to themselves', because only the latest event 
>> in each period is relevant. The custom trigger is implemented as follows 
>> (only Flink-revelvant parts and syntax):
>> 
>> class DynamicEarliestWindowTrigger 
>> extends Trigger {
>>  
>>  ValueStateDescriptor windowEnd = new 
>> ValueStateDescriptor<>("windowEnd", Long.class);
>>  
>>  TriggerResult onElement(T element, long timestamp, W window, 
>> TriggerContext ctx) throws Exception {
>>  val windowEndState = ctx.getPartitionedState(windowEnd);
>>  val windowEndsAt = windowEndState.value();
>>  val newEnd = element.getTimestamp();
>>  
>>  // no timer set yet, or intention to trigger earlier
>>  if (windowEndsAt == null || newEnd <= windowEndsAt) {
>>  deleteCurrentTimer(ctx);
>>  
>>  // trigger time far enough from now => schedule timer
>>  if (newEnd > System.currentTimeMillis() + 100) {
>>  ctx.registerProcessingTimeTimer(newEnd);
>>  windowEndState.update(newEnd);
>>  } else {
>>  return TriggerResult.FIRE;  // close enough 
>> => fire immediately
>>  }
>>  }
>>  
>>  // ignore events that should be triggered in the future
>>  return TriggerResult.CONTINUE;
>>  }
>> 
>>  // fire when timer has reached pre-set time
>>  TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) 
>> throws Exception {
>>  return TriggerResult.FIRE_AND_PURGE;
>>  }
>> 
>>  // noop
>>  TriggerResult onEventTime(long time, W window, TriggerContext ctx) 
>> throws Exception {
>>  return TriggerResult.CONTINUE;
>>  }
>>  

Re: Job fails to start with S3 savepoint

2017-03-20 Thread Bajaj, Abhinav
Hi Ufuk,

Realized I sent an incomplete mail. Continuing my previous reply here.

Thanks for hint on the region. The bucket is in eu-west-1 region.

The flink configuration for checkpoints and savepoints is as below –
state.backend.fs.checkpointdir: s3://flink-bucket/flink-checkpoints
state.savepoints.dir:   s3://flink-bucket/flink-savepoints

No region specified in the s3 urls above. But the savepoint was created 
successfully.

When using the monitoring REST API, the cancel-with-savepoint API returned the 
savepoint path – “s3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2”.
I used the same savepoint path while submitting a new job.

I am wondering why there would be a difference in behavior between creating and 
reading the savepoint.

I meantime, I will try updating the s3 urls to reflect the region and update 
here.

Thanks,
Abhinav



[cid:image001.png@01D2A169.CB504880]

Abhinav Bajaj
Lead Engineer
HERE Predictive Analytics
Office:  +12062092767
Mobile: +17083299516

HERE Seattle
701 Pike Street, #2000, Seattle, WA 98101, USA
47° 36' 41" N. 122° 19' 57" W
HERE Maps




From: "Bajaj, Abhinav" 
Date: Monday, March 20, 2017 at 10:46 AM
To: "user@flink.apache.org" , "u...@apache.org" 

Subject: Re: Job fails to start with S3 savepoint

Hi Ufuk,

Thanks for replying.
The savepoint path is correct and it exists.
FYI, I used the monitoring REST APIs to cancel the job with savepoint.


[cid:image002.png@01D2A169.CB504880]

Abhinav Bajaj
Lead Engineer
HERE Predictive Analytics
Office:  +12062092767
Mobile: +17083299516

HERE Seattle
701 Pike Street, #2000, Seattle, WA 98101, USA
47° 36' 41" N. 122° 19' 57" W
HERE Maps




From: Ufuk Celebi 
Reply-To: "user@flink.apache.org" 
Date: Monday, March 20, 2017 at 2:41 AM
To: "user@flink.apache.org" 
Subject: Re: Job fails to start with S3 savepoint

Hey Abhinav,

the Exception is thrown if the S3 object does not exist.

Can you double check that it actually does exist (no typos, etc.)?

Could this be related to accessing a different region than expected?

– Ufuk


On Mon, Mar 20, 2017 at 9:38 AM, Timo Walther 
> wrote:
Hi Abhinav,

can you check if you have configured your AWS setup correctly? The S3 
configuration might be missing.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#missing-s3-filesystem-configuration

Regards,
Timo


Am 18/03/17 um 01:33 schrieb Bajaj, Abhinav:
Hi,

I am trying to explore using S3 for storing checkpoints and savepoints.
I can get Flink to store the checkpoints and savepoints in s3.

However, when I try to submit the same Job using the stored savepoint, it fails 
with below exception.
I am using Flink 1.2 and submitted the job from the UI dashboard.

Can anyone guide me through this issue?

Thanks,
Abhinav

Jobmanager logs with exception –

2017-03-18 00:10:09,193 INFO  org.apache.flink.runtime.blob.BlobClient  
 - Blob client connecting to akka://flink/user/jobmanager
2017-03-18 00:10:09,348 INFO  org.apache.flink.runtime.client.JobClient 
 - Checking and uploading JAR files
2017-03-18 00:10:09,348 INFO  org.apache.flink.runtime.blob.BlobClient  
 - Blob client connecting to akka://flink/user/jobmanager
2017-03-18 00:10:09,501 INFO  org.apache.flink.yarn.YarnJobManager  
 - Submitting job 4425245091bea9ad103dd3ff338244bb (Session Counter 
Example).
2017-03-18 00:10:09,502 INFO  org.apache.flink.yarn.YarnJobManager  
 - Using restart strategy NoRestartStrategy for 
4425245091bea9ad103dd3ff338244bb.
2017-03-18 00:10:09,502 INFO  org.apache.flink.yarn.YarnJobManager  
 - Running initialization on master for job Session Counter Example 
(4425245091bea9ad103dd3ff338244bb).
2017-03-18 00:10:09,502 INFO  org.apache.flink.yarn.YarnJobManager  
 - Successfully ran initialization on master in 0 ms.
2017-03-18 00:10:09,503 INFO  org.apache.flink.yarn.YarnJobManager  
 - Starting job from savepoint 
's3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2'.
2017-03-18 00:10:09,636 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Session 
Counter Example (4425245091bea9ad103dd3ff338244bb) switched from state CREATED 
to FAILING.
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable 
failure. This suppresses job restarts. Please check the stack trace for the 
root cause.
at 

Re: Job fails to start with S3 savepoint

2017-03-20 Thread Bajaj, Abhinav
Hi Ufuk,

Thanks for replying.
The savepoint path is correct and it exists.
FYI, I used the monitoring REST APIs to cancel the job with savepoint.


[cid:image001.png@01D2A167.3E215140]

Abhinav Bajaj
Lead Engineer
HERE Predictive Analytics
Office:  +12062092767
Mobile: +17083299516

HERE Seattle
701 Pike Street, #2000, Seattle, WA 98101, USA
47° 36' 41" N. 122° 19' 57" W
HERE Maps




From: Ufuk Celebi 
Reply-To: "user@flink.apache.org" 
Date: Monday, March 20, 2017 at 2:41 AM
To: "user@flink.apache.org" 
Subject: Re: Job fails to start with S3 savepoint

Hey Abhinav,

the Exception is thrown if the S3 object does not exist.

Can you double check that it actually does exist (no typos, etc.)?

Could this be related to accessing a different region than expected?

– Ufuk


On Mon, Mar 20, 2017 at 9:38 AM, Timo Walther 
> wrote:
Hi Abhinav,

can you check if you have configured your AWS setup correctly? The S3 
configuration might be missing.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#missing-s3-filesystem-configuration

Regards,
Timo


Am 18/03/17 um 01:33 schrieb Bajaj, Abhinav:
Hi,

I am trying to explore using S3 for storing checkpoints and savepoints.
I can get Flink to store the checkpoints and savepoints in s3.

However, when I try to submit the same Job using the stored savepoint, it fails 
with below exception.
I am using Flink 1.2 and submitted the job from the UI dashboard.

Can anyone guide me through this issue?

Thanks,
Abhinav

Jobmanager logs with exception –

2017-03-18 00:10:09,193 INFO  org.apache.flink.runtime.blob.BlobClient  
 - Blob client connecting to akka://flink/user/jobmanager
2017-03-18 00:10:09,348 INFO  org.apache.flink.runtime.client.JobClient 
 - Checking and uploading JAR files
2017-03-18 00:10:09,348 INFO  org.apache.flink.runtime.blob.BlobClient  
 - Blob client connecting to akka://flink/user/jobmanager
2017-03-18 00:10:09,501 INFO  org.apache.flink.yarn.YarnJobManager  
 - Submitting job 4425245091bea9ad103dd3ff338244bb (Session Counter 
Example).
2017-03-18 00:10:09,502 INFO  org.apache.flink.yarn.YarnJobManager  
 - Using restart strategy NoRestartStrategy for 
4425245091bea9ad103dd3ff338244bb.
2017-03-18 00:10:09,502 INFO  org.apache.flink.yarn.YarnJobManager  
 - Running initialization on master for job Session Counter Example 
(4425245091bea9ad103dd3ff338244bb).
2017-03-18 00:10:09,502 INFO  org.apache.flink.yarn.YarnJobManager  
 - Successfully ran initialization on master in 0 ms.
2017-03-18 00:10:09,503 INFO  org.apache.flink.yarn.YarnJobManager  
 - Starting job from savepoint 
's3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2'.
2017-03-18 00:10:09,636 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Session 
Counter Example (4425245091bea9ad103dd3ff338244bb) switched from state CREATED 
to FAILING.
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable 
failure. This suppresses job restarts. Please check the stack trace for the 
root cause.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1369)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1330)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1330)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Invalid path 

Re: load balancing of keys to operators

2017-03-20 Thread Sonex
Thanx for your response.

When using time windows, doesn`t flink know the load per window? I have
observed this behavior in windows as well.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/load-balancing-of-keys-to-operators-tp12303p12308.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


shaded version of legacy kafka connectors

2017-03-20 Thread Gwenhael Pasquiers
Hi,

Before doing it myself I thought it would be better to ask.
We need to consume from kafka 0.8 and produce to kafka 0.10 in a flink app.
I guess there will be classes and package names conflicts for a lot of 
dependencies of both connectors.

The obvious solution it to make a “shaded” version of the kafka 0.8 connector 
so that it can coexist with the 0.10 version.

Does it already exists ?


Re: load balancing of keys to operators

2017-03-20 Thread Timo Walther

Hi,

using keyBy Flink ensures that every set of records with same key is 
send to the same operator, otherwise it would not be possible to process 
them as a whole. It depends on your use case if it is also ok that 
another operator processes parts of this set of records. You can 
implement you own partition strategy to split your data more evenly 
(https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html#physical-partitioning). 
But this depends on your knowledge of key spaces and load, Flink can not 
know this in advance.


I hope that helps.

Regards,
Timo


Am 20/03/17 um 15:29 schrieb Sonex:

I am using a simple streaming job where I use keyBy on the stream to process
events per key. The keys may vary in number (few keys to thousands). I have
noticed a behavior of Flink and I need clarification on that. When we use
keyBy on the stream, flink assigns keys to parallel operators so each
operator can handle events per key independently. Once a key is assigned to
an operator, can the key change the operator on which it is assigned? From
what I`ve seen the answer is no.

For example, let`s assume that keys 1 and 2 are assigned to operator A and
keys 3 and 4 are assigned to operator B. If there is a burst of data for key
1 at some later time point, but keys 2,3 and 4 have only few data will key 2
be assigned to operator B to balance the load? If not is there a way to do
that? And again if not, why flink does not do that?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/load-balancing-of-keys-to-operators-tp12303.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Data+control stream from kafka + window function - not working

2017-03-20 Thread Aljoscha Krettek
What do you get form the sys out printing in CoFlatMapFunImpl? Could it be that 
all the elements are being processed before the control input element arrives 
and that they are therefore dropped?

> On 17 Mar 2017, at 09:14, Tarandeep Singh  wrote:
> 
> Hi Gordon,
> 
> When I use getInput (input created via collection), then watermarks are 
> always Long.MAX_VALUE: 
> WM: Watermark @ 9223372036854775807
> 
> This is understandable as input source has finished so a watermark of value 
> Long.MAX_VALUE is emitted.
> 
> When I use getKafkaInput, I get this watermark:
> WM: Watermark @ 1489532509000
> 
> This corresponds to Tue Mar 14 2017 16:01:49, which seems right (last 
> record's timestamp: 2017-03-14 16:01:50 minus 1 sec due to maxOutOfOrder 
> value).
> 
> If I *don't* use control stream, I also get correct watermark and this time 
> window function is called and correct aggregated values are generated.
> 
> Thanks,
> Tarandeep
> 
> 
> 
> On Thu, Mar 16, 2017 at 10:25 PM, Tzu-Li (Gordon) Tai  > wrote:
> Hi Tarandeep,
> 
> Thanks for clarifying.
> 
> For the next step, I would recommend taking a look at 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_event_time.html
>  
> 
>  and try to find out what exactly is wrong with the watermark progression. 
> Flink 1.2 exposes watermarks as a metric, and that should help in figuring 
> out why the windows aren’t firing.
> 
> Also, I see you have added a “WatermarkDebugger” in your job. Have you 
> checked whether or not the watermarks printed there are identical (using 
> getInput v.s. getKafkaInput)?
> 
> Cheers,
> Gordon
> 
> 
> On March 17, 2017 at 12:32:51 PM, Tarandeep Singh (tarand...@gmail.com 
> ) wrote:
> 
>> Anyone?
>> Any suggestions what could be going wrong or what I am doing wrong?
>> 
>> Thanks,
>> Tarandeep
>> 
>> 
>> On Thu, Mar 16, 2017 at 7:34 AM, Tarandeep Singh > > wrote:
>> Data is read from Kafka and yes I use different group id every time I run 
>> the code. I have put break points and print statements to verify that.
>> 
>> Also, if I don't connect with control stream the window function works. 
>> 
>> - Tarandeep
>> 
>> On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai > > wrote:
>> 
>>> Hi Tarandeep,
>>> 
>>> I haven’t looked at the rest of the code yet, but my first guess is that 
>>> you might not be reading any data from Kafka at all:
>>> 
 private static DataStream readKafkaStream(String topic, 
 StreamExecutionEnvironment env) throws IOException {
 
 Properties properties = new Properties();
 properties.setProperty("bootstrap.servers", "localhost:9092");
 properties.setProperty("zookeeper.connect", "localhost:2181");
 properties.setProperty("group.id ", 
 "group-0009");
 properties.setProperty("auto.offset.reset", "smallest");
 return env.addSource(new FlinkKafkaConsumer08<>(topic, new 
 SimpleStringSchema(), properties));
 }
>>> 
>>> 
>>> Have you tried using a different “group.id ” everytime 
>>> you’re re-running the job?
>>> Note that the “auto.offset.reset” value is only respected when there aren’t 
>>> any offsets for the group committed in Kafka.
>>> So you might not actually be reading the complete “small_input.cv 
>>> ” dataset, unless you use a different group.id 
>>>  overtime.
>>> 
>>> Cheers,
>>> Gordon
>>> 
>>> On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarand...@gmail.com 
>>> ) wrote:
>>> 
 Hi,
 
 I am using flink-1.2 and reading data stream from Kafka (using 
 FlinkKafkaConsumer08). I want to connect this data stream with another 
 stream (read control stream) so as to do some filtering on the fly. After 
 filtering, I am applying window function (tumbling/sliding event window) 
 along with fold function. However, the window function does not get called.
 
 Any help to debug/fix this is greatly appreciated!
 
 Below is a reproducible code that one can run in IDE like IntelliJ or on 
 flink cluster. You will need to have a running Kafka cluster (local or 
 otherwise).
 Create a topic and add test data points-
 
 $KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper 
 localhost:2181 --replication-factor 1 --partitions 1
 $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 
 --topic test < small_input.csv
 
 where small_input.csv contains the following lines-
 
 p1,10.0f,2017-03-14 16:01:01
 p1,10.0f,2017-03-14 16:01:02
 p1,10.0f,2017-03-14 16:01:03

Re: Telling if a job has caught up with Kafka

2017-03-20 Thread Bruno Aranda
Hi,

Thanks! The proposal sounds very good to us too.

Bruno

On Sun, 19 Mar 2017 at 10:57 Florian König 
wrote:

> Thanks Gordon for the detailed explanation! That makes sense and explains
> the expected behaviour.
>
> The JIRA for the new metric also sounds very good. Can’t wait to have this
> in the Flink GUI (KafkaOffsetMonitor has some problems and stops working
> after 1-2 days, don’t know the reason yet).
>
> All the best,
> Florian
>
>
> > Am 18.03.2017 um 08:38 schrieb Tzu-Li (Gordon) Tai  >:
> >
> > @Florian
> > the 0.9 / 0.10 version and 0.8 version behave a bit differently right
> now for the offset committing.
> >
> > In 0.9 / 0.10, if checkpointing is enabled, the “auto.commit.enable”
> etc. settings will be completely ignored and overwritten before used to
> instantiate the interval Kafka clients, hence committing will only happen
> on Flink checkpoints.
> >
> > In 0.8, this isn’t the case. Both automatic periodic committing and
> committing on checkpoints can take place. That’s perhaps why you’re
> observing the 0.8 consumer to be committing more frequently.
> >
> > FYI: This behaviour will be unified in Flink 1.3.0. If you’re
> interested, you can take a look at
> https://github.com/apache/flink/pull/3527.
> >
> > - Gordon
> >
> >
> > On March 17, 2017 at 6:07:38 PM, Florian König (
> florian.koe...@micardo.com) wrote:
> >
> >> Why is that so? The checkpoint contains the Kafka offset and would be
> able to start reading wherever it left off, regardless of any offset stored
> in Kafka or Zookeeper. Why is the offset not committed regularly,
> independently from the checkpointing? Or did I misconfigure anything?
>
>
>


load balancing of keys to operators

2017-03-20 Thread Sonex
I am using a simple streaming job where I use keyBy on the stream to process
events per key. The keys may vary in number (few keys to thousands). I have
noticed a behavior of Flink and I need clarification on that. When we use
keyBy on the stream, flink assigns keys to parallel operators so each
operator can handle events per key independently. Once a key is assigned to
an operator, can the key change the operator on which it is assigned? From
what I`ve seen the answer is no.

For example, let`s assume that keys 1 and 2 are assigned to operator A and
keys 3 and 4 are assigned to operator B. If there is a burst of data for key
1 at some later time point, but keys 2,3 and 4 have only few data will key 2
be assigned to operator B to balance the load? If not is there a way to do
that? And again if not, why flink does not do that? 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/load-balancing-of-keys-to-operators-tp12303.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: accessing flink HA cluster with scala shell/zeppelin notebook

2017-03-20 Thread Alexis Gendronneau
Hello users,

As Maciek, I'm currently trying to make apache Zeppelin 0.7 working with
Flink. I have two versions of flink available (1.1.2 and 1.2.0). Each one
is running in High-availability mode.

When running jobs from Zeppelin in Flink local mode, everything works fine.
But when trying to submit job to remote host (no matter which version
involved), job is stuck in submitting phase until it reaches
akka.client.timeout.

I tried to increase timeout (like said in error raised in zeppelin), but it
only increase time before error is finally raised (tested with 600s).

On Flink side, nothing appears but :

2017-03-20 11:19:31,675 WARN
org.apache.flink.runtime.jobmanager.JobManager - Discard message
LeaderSessionMessage(null,SubmitJob(JobGraph(jobId:
8af3a91762a171b04c4be0efe540f3d4) ,EXECUTION_RESULT_AND_STATE_CHANGES))
because the expected leader session ID
Some(f955760c-d80d-4992-a148-5968026ca6e4) did not equal the received
leader session ID None.


On zepplin interpreter side, we get following stacktrace :

bestCarrier: org.apache.flink.api.scala.DataSet[CarrierFlightsCount]
= org.apache.flink.api.scala.DataSet@669fc812
org.apache.flink.client.program.ProgramInvocationException: The
program execution failed: Communication with JobManager failed: Job
submission to the JobManager timed out. You may increase
'akka.client.timeout' in case the JobManager needs more time to
configure and confirm the job submission.
  at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:409)
  at
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
  at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:382)
  at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:369)
  at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:344)
  at
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211)
  at
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188)
  at
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
  at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
  at
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:637)
  at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
  ... 36 elided
Caused by: org.apache.flink.runtime.client.JobExecutionException:
Communication with JobManager failed: Job submission to the JobManager
timed out. You may increase 'akka.client.timeout' in case the JobManager
needs more time to configure and confirm the job submission.
  at
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137)
  at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405)
  ... 46 more
Caused by:
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
Job submission to the JobManager timed out. You may increase
'akka.client.timeout' in case the JobManager needs more time to configure
and confirm the job submission.
  at
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:264)
  at
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90)
  at
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70)
  at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
  at akka.actor.ActorCell.invoke(ActorCell.scala:487)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
  at akka.dispatch.Mailbox.run(Mailbox.scala:221)
  at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
  at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
  at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
  at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

It looks like we have to add parameters on zepplin side, but I cant see
whats missing here. Any clue appreciated.

Regards,

2017-01-24 17:13 GMT+01:00 Aljoscha Krettek :

> +Till Rohrmann , do you know what can be used to
> access a HA cluster from that setting.
>
> Adding Till since he probably knows the HA stuff best.
>
> On Sun, 22 Jan 2017 at 15:58 Maciek Próchniak  wrote:
>
>> Hi,
>>
>> I have standalone Flink cluster configured with HA setting (i.e. with
>> zookeeper recovery). How should I access 

Re: SQL + flatten (or .*) quality docs location?

2017-03-20 Thread Fabian Hueske
Hi Stu,

thanks for reporting back.
I tried to reproduce the "method flatten() not found" error but did not
succeed.

It would be great if you could open a JIRA issue and describe how to
reproduce the problem.

Thank you,
Fabian

2017-03-17 16:42 GMT+01:00 Stu Smith :

> Thank you! Just in case someone else stumbles onto this, I figured what
> was giving me trouble.
> The object I wanted to flattened happened to be null at times, at which
> point it would error out and give some exception along the lines of:
>
> "method flatten() not found"
>
> (Sorry, I'll try to follow up with the actual trace to help people with
> their searches later)
>
> which made it sound more like I was using it incorrectly altogether,
> rather than that the object was null... I think even just letting NPE would
> have been a little more helpful...
>
> I figured it out, btw, but trying equivalent programs in Flink and Esper
> (doing an eval of both). Esper gave a clearer error, and then I went back
> and fixed the flink program.
>
> Take care,
>   -stu
>
> On Thu, Mar 16, 2017 at 3:27 AM, Fabian Hueske  wrote:
>
>> Hi Stu,
>>
>> there is only one page of documentation for the Table API and SQL [1].
>> I agree the structure could be improved and split into multiple pages.
>>
>> Regarding the flatting of a Pojo have a look at the "Built-In Functions"
>> section [2].
>> If you select "SQL" and head to the "Value access functions", you'll find
>>
>> > tableName.compositeType.* : Converts a Flink composite type (such as
>> Tuple, POJO, etc.) and all of its direct subtypes into a flat
>> representation where every subtype is a separate field.
>>
>>
>> The following program works returns the correct result:
>>
>> // POJO definition
>> class MyPojo(var x: Int, var y: Int) {
>>   def this() = this(0, 0)
>> }
>>
>> // SQL query
>> val env = ExecutionEnvironment.getExecutionEnvironment
>> val tEnv = TableEnvironment.getTableEnvironment(env, config)
>>
>> val ds = env.fromElements((0, new MyPojo(1, 2)), (1, new MyPojo(2, 3)),
>> (2, new MyPojo(3, 4)) )
>> tEnv.registerDataSet("Pojos", ds, 'id, 'pojo)
>>
>> val result = tEnv.sql("SELECT id, Pojos.pojo.* FROM Pojos") // you need
>> to include the table name to flatten a Pojo
>>
>> val results = result.toDataSet[Row].collect()
>> println(results.mkString("\n"))
>>
>> // Result
>> 0,1,2
>> 1,2,3
>> 2,3,4
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> dev/table_api.html
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> dev/table_api.html#built-in-functions
>>
>> 2017-03-15 21:31 GMT+01:00 Stu Smith :
>>
>>> The documentation seems to indicate that there is a flatten method
>>> available in the sql language interface (in the table of available
>>> methods), or, alternatively using the '*' character somehow (in the text
>>> above the table).
>>>
>>> Yet I cannot flatten a POJO type, nor can I find any sufficient
>>> documentation in the official docs, searching the mailing list via
>>> markmail, looking through the examples in the source, or looking for
>>> through the SQL tests in the source.
>>>
>>> Can someone point me to the correct location for some solid flink SQL
>>> examples and docs?
>>>
>>> Take care,
>>>   -stu
>>>
>>
>>
>


Re: Job fails to start with S3 savepoint

2017-03-20 Thread Ufuk Celebi
Hey Abhinav,

the Exception is thrown if the S3 object does not exist.

Can you double check that it actually does exist (no typos, etc.)?

Could this be related to accessing a different region than expected?

– Ufuk


On Mon, Mar 20, 2017 at 9:38 AM, Timo Walther  wrote:

> Hi Abhinav,
>
> can you check if you have configured your AWS setup correctly? The S3
> configuration might be missing.
>
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/aws.html#missing-s3-filesystem-configuration
>
> Regards,
> Timo
>
>
> Am 18/03/17 um 01:33 schrieb Bajaj, Abhinav:
>
> Hi,
>
>
>
> I am trying to explore using S3 for storing checkpoints and savepoints.
>
> I can get Flink to store the checkpoints and savepoints in s3.
>
>
>
> However, when I try to submit the same Job using the stored savepoint, it
> fails with below exception.
>
> I am using Flink 1.2 and submitted the job from the UI dashboard.
>
>
>
> Can anyone guide me through this issue?
>
>
>
> Thanks,
>
> Abhinav
>
>
>
> *Jobmanager logs with exception* –
>
>
>
> 2017-03-18 00:10:09,193 INFO  org.apache.flink.runtime.blob.
> BlobClient   - Blob client connecting to
> akka://flink/user/jobmanager
>
> 2017-03-18 00:10:09,348 INFO  org.apache.flink.runtime.
> client.JobClient  - Checking and uploading JAR files
>
> 2017-03-18 00:10:09,348 INFO  org.apache.flink.runtime.blob.
> BlobClient   - Blob client connecting to
> akka://flink/user/jobmanager
>
> 2017-03-18 00:10:09,501 INFO  org.apache.flink.yarn.
> YarnJobManager   - Submitting job
> 4425245091bea9ad103dd3ff338244bb (Session Counter Example).
>
> 2017-03-18 00:10:09,502 INFO  org.apache.flink.yarn.
> YarnJobManager   - Using restart strategy
> NoRestartStrategy for 4425245091bea9ad103dd3ff338244bb.
>
> 2017-03-18 00:10:09,502 INFO  org.apache.flink.yarn.
> YarnJobManager   - Running initialization on
> master for job Session Counter Example (4425245091bea9ad103dd3ff338244bb).
>
> 2017-03-18 00:10:09,502 INFO  org.apache.flink.yarn.
> YarnJobManager   - Successfully ran
> initialization on master in 0 ms.
>
> 2017-03-18 00:10:09,503 INFO  org.apache.flink.yarn.
> YarnJobManager   - Starting job from savepoint
> 's3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2'.
>
> 2017-03-18 00:10:09,636 INFO  org.apache.flink.runtime.
> executiongraph.ExecutionGraph - Job Session Counter Example (
> 4425245091bea9ad103dd3ff338244bb) switched from state CREATED to FAILING.
>
> org.apache.flink.runtime.execution.SuppressRestartsException:
> Unrecoverable failure. This suppresses job restarts. Please check the stack
> trace for the root cause.
>
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$org$apache$flink$runtime$jobmanager$JobManager$
> $submitJob$1.apply$mcV$sp(JobManager.scala:1369)
>
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$org$apache$flink$runtime$jobmanager$JobManager$
> $submitJob$1.apply(JobManager.scala:1330)
>
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$org$apache$flink$runtime$jobmanager$JobManager$
> $submitJob$1.apply(JobManager.scala:1330)
>
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
>
> at scala.concurrent.impl.Future$
> PromiseCompletingRunnable.run(Future.scala:24)
>
> at akka.dispatch.TaskInvocation.
> run(AbstractDispatcher.scala:40)
>
> at akka.dispatch.ForkJoinExecutorConfigurator$
> AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> Caused by: java.lang.IllegalArgumentException: Invalid path
> 's3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2'.
>
> at org.apache.flink.runtime.checkpoint.savepoint.
> SavepointStore.createFsInputStream(SavepointStore.java:182)
>
> at org.apache.flink.runtime.checkpoint.savepoint.
> SavepointStore.loadSavepoint(SavepointStore.java:131)
>
> at org.apache.flink.runtime.checkpoint.savepoint.
> SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:64)
>
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$org$apache$flink$runtime$jobmanager$JobManager$
> $submitJob$1.apply$mcV$sp(JobManager.scala:1348)
>
> ... 10 more
>
> 2017-03-18 00:10:09,638 INFO  org.apache.flink.runtime.
> 

Re: Return of Flink shading problems in 1.2.0

2017-03-20 Thread Robert Metzger
Here is the JIRA: https://issues.apache.org/jira/browse/FLINK-6125

On Mon, Mar 20, 2017 at 10:27 AM, Robert Metzger 
wrote:

> Hi Craig,
>
> I was able to reproduce the issue with maven 3.3 in Flink 1.2. I'll look
> into it.
>
> On Fri, Mar 17, 2017 at 11:56 PM, Foster, Craig 
> wrote:
>
>> Ping. So I’ve built with 3.0.5 and it does give proper shading. So it
>> does get me yet another workaround where my only recourse is to use a max
>> version of Maven. Still, I feel there should be a long-term fix at some
>> point in time.
>>
>>
>>
>> I also believe there is a regression in Flink 1.2.0 for Maven 3.3.x with
>> the process as documented, so hoping someone can at least duplicate or let
>> me know of a new workaround for 3.3.x.
>>
>>
>>
>> Thanks!
>>
>> Craig
>>
>>
>>
>> *From: *"Foster, Craig" 
>> *Reply-To: *"user@flink.apache.org" 
>> *Date: *Friday, March 17, 2017 at 7:23 AM
>> *To: *"user@flink.apache.org" 
>> *Cc: *Ufuk Celebi , Robert Metzger ,
>> Stephan Ewen 
>> *Subject: *Re: Return of Flink shading problems in 1.2.0
>>
>>
>>
>> Hey Stephen:
>>
>> I am building twice in every case described in my previous mail. Well,
>> building then rebuilding the flink-dist submodule.
>>
>>
>>
>> This was fixed in BigTop but I started seeing this issue again with Flink
>> 1.2.0. I was wondering if there's something else in the environment that
>> could prevent the shading from working because it isn't now even with the
>> workaround.
>>
>>
>> On Mar 17, 2017, at 4:08 AM, Stephan Ewen  wrote:
>>
>> Hi Craig!
>>
>>
>>
>> Maven 3.3.x has a shading problem. You need to build two times, once from
>> root, once inside "flink-dist". Have a look here:
>>
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> setup/building.html#dependency-shading
>>
>>
>>
>> Maybe that way missed in BigTop?
>>
>>
>>
>> I am wondering if we should actually throw an error if building with
>> Maven 3.3.x - too many users run into that issue.
>>
>>
>>
>> Stephan
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Mar 17, 2017 at 8:14 AM, Ufuk Celebi  wrote:
>>
>> Pulling in Robert and Stephan who know the project's shading setup the
>> best.
>>
>>
>> On Fri, Mar 17, 2017 at 6:52 AM, Foster, Craig 
>> wrote:
>> > Hi:
>> >
>> > A few months ago, I was building Flink and ran into shading issues for
>> > flink-dist as described in your docs. We resolved this in BigTop by
>> adding
>> > the correct way to build flink-dist in the do-component-build script and
>> > everything was fine after that.
>> >
>> >
>> >
>> > Now, I’m running into issues doing the same now in Flink 1.2.0 and I’m
>> > trying to figure out what’s changed and how to fix it. Here’s how the
>> > flink-dist jar looks with proper shading:
>> >
>> >
>> >
>> > jar -tvf /usr/lib/flink/lib/flink-dist_2.10-1.1.4.jar | grep
>> > HttpConnectionParams
>> > 2485 Tue Jan 01 00:00:00 UTC 1980
>> > org/apache/flink/hadoop/shaded/org/apache/commons/httpclient
>> /params/HttpConnectionParams.class
>> > 3479 Tue Jan 01 00:00:00 UTC 1980
>> > org/apache/flink/hadoop/shaded/org/apache/http/params/HttpCo
>> nnectionParams.class
>> >
>> >
>> >
>> > When I build Flink 1.2.0 in BigTop, here’s shading for the jar found in
>> the
>> > RPM:
>> >
>> >
>> >
>> > jar -tvf flink-dist_2.10-1.2.0.jar | grep HttpConnectionParams
>> > 2392 Tue Jan 01 00:00:00 GMT 1980
>> > org/apache/commons/httpclient/params/HttpConnectionParams.class
>> > 2485 Tue Jan 01 00:00:00 GMT 1980
>> > org/apache/flink/hadoop/shaded/org/apache/commons/httpclient
>> /params/HttpConnectionParams.class
>> > 3479 Tue Jan 01 00:00:00 GMT 1980
>> > org/apache/flink/hadoop/shaded/org/apache/http/params/HttpCo
>> nnectionParams.class
>> > 2868 Tue Jan 01 00:00:00 GMT 1980
>> > org/apache/http/params/HttpConnectionParams.class
>> >
>> >
>> >
>> > I thought maybe it was some strange thing going on with BigTop, so then
>> I
>> > tried just straight building Flink 1.2.0 (outside BigTop) and get the
>> same
>> > shading:
>> >
>> >
>> >
>> > jar -tvf flink-dist_2.10-1.2.0.jar | grep HttpConnectionParams
>> >
>> >   2485 Fri Mar 17 05:41:16 GMT 2017
>> > org/apache/flink/hadoop/shaded/org/apache/commons/httpclient
>> /params/HttpConnectionParams.class
>> >
>> >   3479 Fri Mar 17 05:41:16 GMT 2017
>> > org/apache/flink/hadoop/shaded/org/apache/http/params/HttpCo
>> nnectionParams.class
>> >
>> >   2392 Fri Mar 17 05:41:24 GMT 2017
>> > org/apache/commons/httpclient/params/HttpConnectionParams.class
>> >
>> >   2868 Fri Mar 17 05:41:24 GMT 2017
>> > org/apache/http/params/HttpConnectionParams.class
>> >
>> >
>> >
>> > And, yes, this is after going into flink-dist and running mvn clean
>> install
>> > again since I am using Maven 3.3.x.
>> >
>> >
>> >
>> > Here’s a snippet from my Maven version:
>> >
>> > mvn -version
>> >
>> > Apache 

Re: Return of Flink shading problems in 1.2.0

2017-03-20 Thread Robert Metzger
Hi Craig,

I was able to reproduce the issue with maven 3.3 in Flink 1.2. I'll look
into it.

On Fri, Mar 17, 2017 at 11:56 PM, Foster, Craig  wrote:

> Ping. So I’ve built with 3.0.5 and it does give proper shading. So it does
> get me yet another workaround where my only recourse is to use a max
> version of Maven. Still, I feel there should be a long-term fix at some
> point in time.
>
>
>
> I also believe there is a regression in Flink 1.2.0 for Maven 3.3.x with
> the process as documented, so hoping someone can at least duplicate or let
> me know of a new workaround for 3.3.x.
>
>
>
> Thanks!
>
> Craig
>
>
>
> *From: *"Foster, Craig" 
> *Reply-To: *"user@flink.apache.org" 
> *Date: *Friday, March 17, 2017 at 7:23 AM
> *To: *"user@flink.apache.org" 
> *Cc: *Ufuk Celebi , Robert Metzger ,
> Stephan Ewen 
> *Subject: *Re: Return of Flink shading problems in 1.2.0
>
>
>
> Hey Stephen:
>
> I am building twice in every case described in my previous mail. Well,
> building then rebuilding the flink-dist submodule.
>
>
>
> This was fixed in BigTop but I started seeing this issue again with Flink
> 1.2.0. I was wondering if there's something else in the environment that
> could prevent the shading from working because it isn't now even with the
> workaround.
>
>
> On Mar 17, 2017, at 4:08 AM, Stephan Ewen  wrote:
>
> Hi Craig!
>
>
>
> Maven 3.3.x has a shading problem. You need to build two times, once from
> root, once inside "flink-dist". Have a look here:
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/setup/building.html#dependency-shading
>
>
>
> Maybe that way missed in BigTop?
>
>
>
> I am wondering if we should actually throw an error if building with Maven
> 3.3.x - too many users run into that issue.
>
>
>
> Stephan
>
>
>
>
>
>
>
> On Fri, Mar 17, 2017 at 8:14 AM, Ufuk Celebi  wrote:
>
> Pulling in Robert and Stephan who know the project's shading setup the
> best.
>
>
> On Fri, Mar 17, 2017 at 6:52 AM, Foster, Craig 
> wrote:
> > Hi:
> >
> > A few months ago, I was building Flink and ran into shading issues for
> > flink-dist as described in your docs. We resolved this in BigTop by
> adding
> > the correct way to build flink-dist in the do-component-build script and
> > everything was fine after that.
> >
> >
> >
> > Now, I’m running into issues doing the same now in Flink 1.2.0 and I’m
> > trying to figure out what’s changed and how to fix it. Here’s how the
> > flink-dist jar looks with proper shading:
> >
> >
> >
> > jar -tvf /usr/lib/flink/lib/flink-dist_2.10-1.1.4.jar | grep
> > HttpConnectionParams
> > 2485 Tue Jan 01 00:00:00 UTC 1980
> > org/apache/flink/hadoop/shaded/org/apache/commons/httpclient/params/
> HttpConnectionParams.class
> > 3479 Tue Jan 01 00:00:00 UTC 1980
> > org/apache/flink/hadoop/shaded/org/apache/http/params/
> HttpConnectionParams.class
> >
> >
> >
> > When I build Flink 1.2.0 in BigTop, here’s shading for the jar found in
> the
> > RPM:
> >
> >
> >
> > jar -tvf flink-dist_2.10-1.2.0.jar | grep HttpConnectionParams
> > 2392 Tue Jan 01 00:00:00 GMT 1980
> > org/apache/commons/httpclient/params/HttpConnectionParams.class
> > 2485 Tue Jan 01 00:00:00 GMT 1980
> > org/apache/flink/hadoop/shaded/org/apache/commons/httpclient/params/
> HttpConnectionParams.class
> > 3479 Tue Jan 01 00:00:00 GMT 1980
> > org/apache/flink/hadoop/shaded/org/apache/http/params/
> HttpConnectionParams.class
> > 2868 Tue Jan 01 00:00:00 GMT 1980
> > org/apache/http/params/HttpConnectionParams.class
> >
> >
> >
> > I thought maybe it was some strange thing going on with BigTop, so then I
> > tried just straight building Flink 1.2.0 (outside BigTop) and get the
> same
> > shading:
> >
> >
> >
> > jar -tvf flink-dist_2.10-1.2.0.jar | grep HttpConnectionParams
> >
> >   2485 Fri Mar 17 05:41:16 GMT 2017
> > org/apache/flink/hadoop/shaded/org/apache/commons/httpclient/params/
> HttpConnectionParams.class
> >
> >   3479 Fri Mar 17 05:41:16 GMT 2017
> > org/apache/flink/hadoop/shaded/org/apache/http/params/
> HttpConnectionParams.class
> >
> >   2392 Fri Mar 17 05:41:24 GMT 2017
> > org/apache/commons/httpclient/params/HttpConnectionParams.class
> >
> >   2868 Fri Mar 17 05:41:24 GMT 2017
> > org/apache/http/params/HttpConnectionParams.class
> >
> >
> >
> > And, yes, this is after going into flink-dist and running mvn clean
> install
> > again since I am using Maven 3.3.x.
> >
> >
> >
> > Here’s a snippet from my Maven version:
> >
> > mvn -version
> >
> > Apache Maven 3.3.9 (bb52d8502b132ec0a5a3f4c09453c07478323dc5;
> > 2015-11-10T16:41:47+00:00)
> >
> > Maven home: /usr/local/apache-maven
> >
> > Java version: 1.8.0_121, vendor: Oracle Corporation
> >
> >
> >
> > Any ideas on what my problem might be here?
> >
> >
> >
> > Thanks,
> >
> > Craig
> >
> >
>
>
>
>


Re: Job fails to start with S3 savepoint

2017-03-20 Thread Timo Walther

Hi Abhinav,

can you check if you have configured your AWS setup correctly? The S3 
configuration might be missing.


https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#missing-s3-filesystem-configuration

Regards,
Timo


Am 18/03/17 um 01:33 schrieb Bajaj, Abhinav:


Hi,

I am trying to explore using S3 for storing checkpoints and savepoints.

I can get Flink to store the checkpoints and savepoints in s3.

However, when I try to submit the same Job using the stored savepoint, 
it fails with below exception.


I am using Flink 1.2 and submitted the job from the UI dashboard.

Can anyone guide me through this issue?

Thanks,

Abhinav

_Jobmanager logs with exception_–

2017-03-18 00:10:09,193 INFO org.apache.flink.runtime.blob.BlobClient 
- Blob client connecting to akka://flink/user/jobmanager


2017-03-18 00:10:09,348 INFO org.apache.flink.runtime.client.JobClient 
- Checking and uploading JAR files


2017-03-18 00:10:09,348 INFO org.apache.flink.runtime.blob.BlobClient 
- Blob client connecting to akka://flink/user/jobmanager


2017-03-18 00:10:09,501 INFO org.apache.flink.yarn.YarnJobManager - 
Submitting job 4425245091bea9ad103dd3ff338244bb (Session Counter Example).


2017-03-18 00:10:09,502 INFO org.apache.flink.yarn.YarnJobManager - 
Using restart strategy NoRestartStrategy for 
4425245091bea9ad103dd3ff338244bb.


2017-03-18 00:10:09,502 INFO org.apache.flink.yarn.YarnJobManager - 
Running initialization on master for job Session Counter Example 
(4425245091bea9ad103dd3ff338244bb).


2017-03-18 00:10:09,502 INFO org.apache.flink.yarn.YarnJobManager - 
Successfully ran initialization on master in 0 ms.


2017-03-18 00:10:09,503 INFO org.apache.flink.yarn.YarnJobManager - 
Starting job from savepoint 
's3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2'.


2017-03-18 00:10:09,636 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Session 
Counter Example (4425245091bea9ad103dd3ff338244bb) switched from state 
CREATED to FAILING.


org.apache.flink.runtime.execution.SuppressRestartsException: 
Unrecoverable failure. This suppresses job restarts. Please check the 
stack trace for the root cause.


at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1369)


at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1330)


at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1330)


at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)


at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)


at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)

at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)


at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)


at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)


at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Caused by: java.lang.IllegalArgumentException: Invalid path 
's3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2'.


at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.createFsInputStream(SavepointStore.java:182)


at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepoint(SavepointStore.java:131)


at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:64)


at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1348)


... 10 more

2017-03-18 00:10:09,638 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: 
Custom Source -> Map (1/1) (f7e8f6c8d2030f5773f9d162d9ac2797) switched 
from CREATED to CANCELED.


2017-03-18 00:10:09,639 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
TriggerWindow(TumblingProcessingTimeWindows(15000), 
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f0aadd59}, 
ProcessingTimeTrigger(), 
WindowedStream.apply(WindowedStream.java:521)) -> Sink: Unnamed (1/1) 
(7d1917621cf923445ab904bb60c62bfd) switched from CREATED to CANCELED.


2017-03-18 00:10:09,639 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Try to 
restart or fail the job Session Counter Example 
(4425245091bea9ad103dd3ff338244bb) if no longer possible.


2017-03-18 00:10:09,639 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Session 
Counter Example (4425245091bea9ad103dd3ff338244bb) switched from state 
FAILING to FAILED.