Re: Flink 1.7.1 uses Kryo version 2.24.0

2019-03-15 Thread Tzu-Li (Gordon) Tai
Hi,

Currently Flink uses Kryo as the default serializer for data types that
Flink's type serialization stack doesn't support [1].
This also includes serializers being used for managed state registered by
users.

Because of this, at the moment it's not easy to upgrade the Kryo version,
since it is known to be binary incompatible across major versions [2].
Therefore, upgrading Kryo would also mean we would be breaking backwards
compatibility for Flink's savepoints between Flink minor version releases
[3], which is something the community decided to maintain as part of
Flink's backward compatibility policy.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/types_serialization.html#flinks-typeinformation-class

[2]  https://github.com/EsotericSoftware/kryo#kryo-versioning-and-upgrading

[3]
https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#compatibility-table



On Sat, Mar 16, 2019 at 4:55 AM anaray  wrote:

> Hi ,
> Flink 1.7 still uses kryo-2.24.0. Is there any specific reason for not
> upgrading kryo?
>
> Thanks,
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Flink tableApi 按列排序,只能按一列,不能按多列排序吗?

2019-03-15 Thread 刘 文

输出结果,只按id降序排序,没有按value1升序排序。







package 
com.opensourceteams.module.bigdata.flink.example.tableapi.operation.orderBy

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

env.setParallelism(1)

val dataSet = env.fromElements( (1,"a",10),(2,"b",20) 
,(20,"f",200),(3,"c",30) )



//从dataset转化为 table
val table = tableEnv.fromDataSet(dataSet)

//注册table
tableEnv.registerTable("user1",table)


//查询table 所有数据
tableEnv.scan("user1").as('id,'name,'value1)
  //.orderBy('id.asc)  //按id列,升序排序(注意是按分区来排序)
  .orderBy('id.desc)
  .orderBy('value1.asc)

  .first(1000)

  //print 输出 (相当于sink)
  .print()


/**
  * 输出结果
  * 
  * 20,f,200
  * 3,c,30
  * 2,b,20
  * 1,a,10
  */



  }

}



Re: Migrating Existing TTL State to 1.8

2019-03-15 Thread Ning Shi
Hi Stefan,

Thank you for the confirmation.

Doing a one time cleanup with full snapshot and upgrading to Flink 1.8
could work. However, in our case, the state is quite large (TBs).
Taking a savepoint takes over an hour, during which we have to pause
the job or it may process more events.

The JavaDoc of `cleanupFullSnapshot` [1] says "Cleanup expired state
in full snapshot on checkpoint.". My understanding is that the only
way to take a full snapshot with RocksDB backend is to take a
savepoint. Is there another way to take a full checkpoint?

I noticed that Flink 1.8 also added an incremental cleanup strategy
[2] by iterating through several keys at a time for each state access.
If I combine this with the new compaction filter cleanup strategy,
will it eventually remove all expired state without taking a full
snapshot for upgrade?

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/state/StateTtlConfig.Builder.html#cleanupFullSnapshot--
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/state/StateTtlConfig.Builder.html#cleanupIncrementally-int-boolean-

Thanks,

Ning


On Wed, Mar 13, 2019 at 11:22 AM Stefan Richter  wrote:
>
> Hi,
>
> If you are worried about old state, you can combine the compaction filter 
> based TTL with other cleanup strategies (see docs). For example, setting 
> `cleanupFullSnapshot` when you take a savepoint it will be cleared of any 
> expired state and you can then use it to bring it into Flink 1.8.
>
> Best,
> Stefan


Flink 1.7.1 uses Kryo version 2.24.0

2019-03-15 Thread anaray
Hi ,
Flink 1.7 still uses kryo-2.24.0. Is there any specific reason for not
upgrading kryo? 

Thanks,




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: What should I take care if I enable object reuse

2019-03-15 Thread Elias Levy
That's certainly the safe thing to do, but if you do not mutate the object,
a copy is not strictly necessary.



On Thu, Mar 14, 2019 at 9:19 PM Kurt Young  wrote:

> Keep one thing in mind: if you want the element remains legal after the
> function call ends (maybe map(), flatmap(), depends on what you are using),
> you should copy the elements.
> Typical scenarios includes:
> 1. Save the elements into some collection like array, list, map for later
> usage, you should copy it explicitly.
> 2. Pass the element into some async calls, you should copy it.
>
> Best,
> Kurt
>
>
> On Fri, Mar 15, 2019 at 8:45 AM yinhua.dai 
> wrote:
>
>> Hi Elias,
>>
>> Thanks.
>> Would it be good enough as long as we use always use different object when
>> call the Collector.collect() method in the operator?
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Confusing exception info in Flink-SQL

2019-03-15 Thread 徐涛
Hi Experts,
When I am using the following sentence in Flink-SQL
if(item_name=‘xxx',u.user_id,null)
The following exception was throw out, which is a bit confusing, 
because it is actually caused by there is no if function in Flink-SQL, I think 
it is more clearly to just point it out that there is no if function, is it?

Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
column 90 to line 1, column 93: Illegal use of 'NULL'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:783)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:768)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4759)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1727)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1804)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1804)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1804)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:456)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4023)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3286)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:967)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:943)
at 
org.apache.calcite.sql.validate.SetopNamespace.validateImpl(SetopNamespace.java:102)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:967)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:943)
at 
org.apache.calcite.sql.validate.SetopNamespace.validateImpl(SetopNamespace.java:102)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:967)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:943)
at 
org.apache.calcite.sql.SqlSetOperator.validateCall(SqlSetOperator.java:90)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:5187)
at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:115)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:918)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:628)
at 
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)


Best
Henry

Re: flink tableapi inner join exception

2019-03-15 Thread Xingcan Cui
Hi,

As the message said, some columns share the same names. You could first rename 
the columns of one table with the `as` operation [1].

Best,
Xingcan

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#scan-projection-and-filter
 

> On Mar 15, 2019, at 9:03 AM, 刘 文  wrote:
> 
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> join relations with ambiguous names: id, name, value
>   at 
> org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156)
>   at 
> org.apache.flink.table.plan.logical.Join.validate(operators.scala:463)
>   at org.apache.flink.table.api.Table.join(table.scala:589)
>   at org.apache.flink.table.api.Table.join(table.scala:397)
>   at 
> com.opensourceteams.module.bigdata.flink.example.tableapi.operation.innerJoin.Run$.main(Run.scala:26)
>   at 
> com.opensourceteams.module.bigdata.flink.example.tableapi.operation.innerJoin.Run.main(Run.scala)
> 
> 
> 
> 
> 
> 
> package 
> com.opensourceteams.module.bigdata.flink.example.tableapi.operation.innerJoin
> 
> import org.apache.flink.api.scala.{ExecutionEnvironment, _}
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.api.scala._
> 
> object Run {
> 
> 
>   def main(args: Array[String]): Unit = {
> 
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tableEnv = TableEnvironment.getTableEnvironment(env)
> 
> val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )
> val dataSet2 = env.fromElements( (1,"a",10),(20,"b",20), (30,"c",30) )
> 
> 
> 
> //从dataset转化为 table
> val table = tableEnv.fromDataSet(dataSet,'id,'name,'value)
> val table2 = tableEnv.fromDataSet(dataSet2,'id,'name,'value)
> 
> 
> 
>table.join(table2).where(" id = id ").first(1000).print()
> 
> 
> 
> 
> 
> 
>   }
> 
> }
> 
> 
> 
> 
> 
> 
> Best,
> thinktothings
> 
> 
> 



Re: Expressing Flink array aggregation using Table / SQL API

2019-03-15 Thread Piyush Narang
Hi Kurt,

Thanks for getting back and explaining this. The behavior in this case makes 
more sense now after your explanation + reading the dynamic tables article. I 
was able to hook up the Scoped aggregation like you suggested so I have a 
workaround for now. I guess the part that I’m trying to figure out is if 
there’s any way to express the query I had to be able to sink to an append sink 
(apart from this custom aggregation). I tried including the time window in the 
outer query as well but I was running into errors there. Or would you typically 
in such scenarios go the route of either having a retractable sink / sink that 
can update partial results by key?

Thanks,

-- Piyush


From: Kurt Young 
Date: Tuesday, March 12, 2019 at 11:51 PM
To: Piyush Narang 
Cc: "user@flink.apache.org" 
Subject: Re: Expressing Flink array aggregation using Table / SQL API

Hi Piyush,

I think your second sql is correct, but the problem you have encountered is the 
outside aggregation (GROUP BY userId & COLLECT(client_custom_aggregated)) will
emit result immediately when receiving results from the inner aggregation.  
Hence Flink need the sink to
1. either has ability to retract the former emitted result, the sink should be 
a `RetractStreamTableSink` or
2. the sink has something like primary key and can update result by key. In 
your case, userId should be the key.

I think you are trying to emit the result to a `AppendStreamTableSink`, so here 
is why you see error like that.

Best,
Kurt


On Tue, Mar 12, 2019 at 9:46 PM Piyush Narang 
mailto:p.nar...@criteo.com>> wrote:
Thanks for getting back Kurt. Yeah this might be an option to try out. I was 
hoping there would be a way to express this directly in the SQL though ☹.

-- Piyush


From: Kurt Young mailto:ykt...@gmail.com>>
Date: Tuesday, March 12, 2019 at 2:25 AM
To: Piyush Narang mailto:p.nar...@criteo.com>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Expressing Flink array aggregation using Table / SQL API

Hi Piyush,

Could you try to add clientId into your aggregate function, and to track the 
map of  inside your new aggregate 
function, and assemble what ever result when emit.
The SQL will looks like:

SELECT userId, some_aggregation(clientId, eventType, `timestamp`, dataField)
FROM my_kafka_stream_table
GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

Kurt


On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang 
mailto:p.nar...@criteo.com>> wrote:
Hi folks,

I’m getting started with Flink and trying to figure out how to express 
aggregating some rows into an array to finally sink data into an 
AppendStreamTableSink.
My data looks something like this:
userId, clientId, eventType, timestamp, dataField

I need to compute some custom aggregations using a UDAF while grouping by 
userId, clientId over a sliding window (10 mins, triggered every 1 min). My 
first attempt is:
SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField) as 
custom_aggregated
FROM my_kafka_stream_table
GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' 
HOUR)

This query works as I expect it to. In every time window I end up with inserts 
for unique userId + clientId combinations. What I want to do though, is 
generate a single row per userId in each time window and this is what I’m 
struggling with expressing along with the restriction that I want to sink this 
to an AppendStreamTableSink. I was hoping to do something like this:

SELECT userId, COLLECT(client_custom_aggregated)
FROM
(
  SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`, 
dataField) as custom_aggregated] as client_custom_aggregated
  FROM my_kafka_stream_table
  GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' 
HOUR)
) GROUP BY userId

Unfortunately when I try this (and a few other variants), I run into the error, 
“AppendStreamTableSink requires that Table has only insert changes”. Does 
anyone know if there’s a way for me to compute my collect aggregation to 
produce one row per userId for a given time window?

Thanks,

-- Piyush



flink tableapi inner join exception

2019-03-15 Thread 刘 文
Exception in thread "main" org.apache.flink.table.api.ValidationException: join 
relations with ambiguous names: id, name, value
at 
org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156)
at 
org.apache.flink.table.plan.logical.Join.validate(operators.scala:463)
at org.apache.flink.table.api.Table.join(table.scala:589)
at org.apache.flink.table.api.Table.join(table.scala:397)
at 
com.opensourceteams.module.bigdata.flink.example.tableapi.operation.innerJoin.Run$.main(Run.scala:26)
at 
com.opensourceteams.module.bigdata.flink.example.tableapi.operation.innerJoin.Run.main(Run.scala)






package 
com.opensourceteams.module.bigdata.flink.example.tableapi.operation.innerJoin

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Run {


  def main(args: Array[String]): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val dataSet = env.fromElements( (1,"a",10),(2,"b",20), (3,"c",30) )
val dataSet2 = env.fromElements( (1,"a",10),(20,"b",20), (30,"c",30) )



//从dataset转化为 table
val table = tableEnv.fromDataSet(dataSet,'id,'name,'value)
val table2 = tableEnv.fromDataSet(dataSet2,'id,'name,'value)



   table.join(table2).where(" id = id ").first(1000).print()






  }

}






Best,
thinktothings





Re: No resource available error while testing HA

2019-03-15 Thread Averell
Hi Gary,

Thanks for the answer. I missed your most recent answer in this thread too.
However, my last question 

Averell wrote
> How about changing the configuration of the Flink job itself during
> runtime?
> What I have to do now is to take a savepoint, stop the job, change the
> configuration, and then restore the job from the save point.

was about changing job configuration (like parallelism, checkpoint
locations, checkpoint period,...), not about logback.

Thanks and regards,
Averel



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Where does the logs in Flink GUI's Exception tab come from?

2019-03-15 Thread Averell
Hi Gary,

Thanks a lot for the explanation, and sorry for missing your earlier
message.
I am clear now.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Set partition number of Flink DataSet

2019-03-15 Thread qi luo
Hi Fabian,

I understand this is a by-design behavior, since Flink is firstly built for 
streaming. Supporting batch shuffle and custom partition number in Flink may be 
compelling in batch processing. 

Could you help explain a bit more on which works are needed to be done, so 
Flink can support custom partition numbers numbers? We would be willing to help 
improve this area.

Thanks,
Qi

> On Mar 15, 2019, at 4:25 PM, Fabian Hueske  wrote:
> 
> Hi,
> 
> Flink works a bit differently than Spark.
> By default, Flink uses pipelined shuffles which push results of the sender 
> immediately to the receivers (btw. this is one of the building blocks for 
> stream processing).
> However, pipelined shuffles require that all receivers are online. Hence, 
> there number of partitions determines the number of running tasks.
> There is also a batch shuffle mode, but it needs to be explicitly enabled and 
> AFAIK does not resolve the dependency of number of partitions and task 
> parallelism.
> 
> However, the community is currently working on many improvements for batch 
> processing, including scheduling and fault-tolerance. 
> Batched shuffles are an important building block for this and there might be 
> better support for your use case in the future.
> 
> Best, Fabian
> 
> Am Fr., 15. März 2019 um 03:56 Uhr schrieb qi luo  >:
> Hi Ken,
> 
> That looks awesome! I’ve implemented something similar to your bucketing 
> sink, but using multiple internal writers rather than multiple internal 
> output.
> 
> Besides this, I’m also curious whether Flink can achieve this like Spark: 
> allow user to specify partition number in partitionBy() method (so no 
> multiple output formats are needed). But this seems to need non-trivial 
> changes in Flink core.
> 
> Thanks,
> Qi
> 
>> On Mar 15, 2019, at 2:36 AM, Ken Krugler > > wrote:
>> 
>> Hi Qi,
>> 
>> See https://github.com/ScaleUnlimited/flink-utils/ 
>> , for a rough but working 
>> version of a bucketing sink.
>> 
>> — Ken
>> 
>> 
>>> On Mar 13, 2019, at 7:46 PM, qi luo >> > wrote:
>>> 
>>> Hi Ken,
>>> 
>>> Agree. I will try partitonBy() to reducer the number of parallel sinks, and 
>>> may also try sortPartition() so each sink could write files one by one. 
>>> Looking forward to your solution. :)
>>> 
>>> Thanks,
>>> Qi
>>> 
 On Mar 14, 2019, at 2:54 AM, Ken Krugler >>> > wrote:
 
 Hi Qi,
 
> On Mar 13, 2019, at 1:26 AM, qi luo  > wrote:
> 
> Hi Ken,
> 
> Do you mean that I can create a batch sink which writes to N files? 
 
 Correct.
 
> That sounds viable, but since our data size is huge (billions of records 
> & thousands of files), the performance may be unacceptable. 
 
 The main issue with performance (actually memory usage) is how many 
 OutputFormats do you need to have open at the same time.
 
 If you partition by the same key that’s used to define buckets, then the 
 max number is less, as each parallel instance of the sink only gets a 
 unique subset of all possible bucket values.
 
 I’m actually dealing with something similar now, so I might have a 
 solution to share soon.
 
 — Ken
 
 
> I will check Blink and give it a try anyway.
> 
> Thank you,
> Qi
> 
>> On Mar 12, 2019, at 11:58 PM, Ken Krugler > > wrote:
>> 
>> Hi Qi,
>> 
>> If I understand what you’re trying to do, then this sounds like a 
>> variation of a bucketing sink.
>> 
>> That typically uses a field value to create a directory path or a file 
>> name (though the filename case is only viable when the field is also 
>> what’s used to partition the data)
>> 
>> But I don’t believe Flink has built-in support for that, in batch mode 
>> (see BucketingSink 
>> 
>>  for streaming).
>> 
>> Maybe Blink has added that? Hoping someone who knows that codebase can 
>> chime in here.
>> 
>> Otherwise you’ll need to create a custom sink to implement the desired 
>> behavior - though abusing a MapPartitionFunction 
>> 
>>  would be easiest, I think.
>> 
>> — Ken
>> 
>> 
>> 
>>> On Mar 12, 2019, at 2:28 AM, qi luo >> > wrote:
>>> 
>>> Hi Ken,
>>> 
>>> Thanks for your reply. I may not make myself clear: our problem is not 
>>> about reading but rather writing. 
>>> 
>>> We need to write to N files based 

RE: ProgramInvocationException when trying to submit a job by running a jar using Monitoring REST API

2019-03-15 Thread Papadopoulos, Konstantinos
Yes, we are submitting more than one job and we choose which one is going to be 
executed depending on the first program argument (i.e., ‘job’ argument).

From: Chesnay Schepler 
Sent: Παρασκευή, 15 Μαρτίου 2019 12:53 μμ
To: Papadopoulos, Konstantinos ; 
user@flink.apache.org
Subject: Re: ProgramInvocationException when trying to submit a job by running 
a jar using Monitoring REST API

In your jar, are you submitting multiple jobs in parallel?

On 15.03.2019 10:05, Papadopoulos, Konstantinos wrote:
We had some progress since the job seems to be submitted and its execution has 
been started, but, now, I am getting a ProgramAbortException as follows:

05:01:01.788 [ERROR] SpringApplication – Application run failed
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException: 
null
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.execute(OptimizerPlanEnvironment.java:54)
 ~[flink-dist_2.12-1.7.1.jar:1.7.1]
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
 ~[flink-dist_2.12-1.7.1.jar:1.7.1]
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) 
~[flink-dist_2.12-1.7.1.jar:1.7.1]
at 
com.iri.aa.etl.lmx.entity.service.AbstractLmxService.getCampaigns(AbstractLmxService.java:105)
 ~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
com.iri.aa.etl.lmx.entity.service.MediaSpentServiceImpl.transform(MediaSpentServiceImpl.java:25)
 ~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
com.iri.aa.etl.lmx.MediaSpentJob.execute(MediaSpentJob.java:58) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
com.iri.aa.etl.lmx.AbstractLmxJob.abstractExecute(AbstractLmxJob.java:27) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
com.iri.aa.etl.lmx.MediaSpentJob.execute(MediaSpentJob.java:38) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
com.iri.aa.etl.lmx.MediaSpentJob.executeDry(MediaSpentJob.java:44) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
com.iri.aa.etl.job.JobExecutor.lambda$executeDryRunners$6(JobExecutor.java:44) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at java.util.ArrayList.forEach(Unknown Source) ~[?:1.8.0_201]
at 
com.iri.aa.etl.job.JobExecutor.executeDryRunners(JobExecutor.java:44) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at com.iri.aa.etl.job.JobExecutor.run(JobExecutor.java:35) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804)
 ~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794)
 ~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:324) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at com.iri.aa.etl.EtlApplication.main(EtlApplication.java:21) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_201]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
~[?:1.8.0_201]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source) ~[?:1.8.0_201]
at java.lang.reflect.Method.invoke(Unknown Source) 
~[?:1.8.0_201]
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
 ~[flink-dist_2.12-1.7.1.jar:1.7.1]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
 ~[flink-dist_2.12-1.7.1.jar:1.7.1]
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
 ~[flink-dist_2.12-1.7.1.jar:1.7.1]
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
 ~[flink-dist_2.12-1.7.1.jar:1.7.1]
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
 ~[flink-dist_2.12-1.7.1.jar:1.7.1]
  

Re: ProgramInvocationException when trying to submit a job by running a jar using Monitoring REST API

2019-03-15 Thread Chesnay Schepler

In your jar, are you submitting multiple jobs in parallel?

On 15.03.2019 10:05, Papadopoulos, Konstantinos wrote:


We had some progress since the job seems to be submitted and its 
execution has been started, but, now, I am getting a 
ProgramAbortException as follows:


05:01:01.788 [ERROR] SpringApplication – Application run failed

org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException: 
null


at 
org.apache.flink.client.program.OptimizerPlanEnvironment.execute(OptimizerPlanEnvironment.java:54) 
~[flink-dist_2.12-1.7.1.jar:1.7.1]


at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817) 
~[flink-dist_2.12-1.7.1.jar:1.7.1]


at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) 
~[flink-dist_2.12-1.7.1.jar:1.7.1]


at 
com.iri.aa.etl.lmx.entity.service.AbstractLmxService.getCampaigns(AbstractLmxService.java:105) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]


at 
com.iri.aa.etl.lmx.entity.service.MediaSpentServiceImpl.transform(MediaSpentServiceImpl.java:25) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]


at com.iri.aa.etl.lmx.MediaSpentJob.execute(MediaSpentJob.java:58) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]


at 
com.iri.aa.etl.lmx.AbstractLmxJob.abstractExecute(AbstractLmxJob.java:27) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]


at com.iri.aa.etl.lmx.MediaSpentJob.execute(MediaSpentJob.java:38) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]


at com.iri.aa.etl.lmx.MediaSpentJob.executeDry(MediaSpentJob.java:44) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]


at 
com.iri.aa.etl.job.JobExecutor.lambda$executeDryRunners$6(JobExecutor.java:44) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]


at java.util.ArrayList.forEach(Unknown Source) ~[?:1.8.0_201]

at 
com.iri.aa.etl.job.JobExecutor.executeDryRunners(JobExecutor.java:44) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]


at com.iri.aa.etl.job.JobExecutor.run(JobExecutor.java:35) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]


at 
org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]


at 
org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]


at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:324) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]


at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]


at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]


at com.iri.aa.etl.EtlApplication.main(EtlApplication.java:21) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]


at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_201]


at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
~[?:1.8.0_201]


at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
~[?:1.8.0_201]


at java.lang.reflect.Method.invoke(Unknown Source) ~[?:1.8.0_201]

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) 
~[flink-dist_2.12-1.7.1.jar:1.7.1]


at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) 
~[flink-dist_2.12-1.7.1.jar:1.7.1]


at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) 
~[flink-dist_2.12-1.7.1.jar:1.7.1]


at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78) 
~[flink-dist_2.12-1.7.1.jar:1.7.1]


at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120) 
~[flink-dist_2.12-1.7.1.jar:1.7.1]


at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:117) 
~[flink-dist_2.12-1.7.1.jar:1.7.1]


at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$7(JarRunHandler.java:151) 
~[flink-dist_2.12-1.7.1.jar:1.7.1]


at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
Source) [?:1.8.0_201]


at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
[?:1.8.0_201]


at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
[?:1.8.0_201]


at java.lang.Thread.run(Unknown Source) [?:1.8.0_201]

*From:*Chesnay Schepler 
*Sent:* Παρασκευή, 15 Μαρτίου 2019 10:56 πμ
*To:* Papadopoulos, Konstantinos 
; user@flink.apache.org
*Subject:* Re: 

Re: [DISCUSS] Create a Flink ecosystem website

2019-03-15 Thread Robert Metzger
Thank you for reaching out to Infra and the ember client.
When I first saw the Ember repository, I thought it is the whole thing
(frontend and backend), but while testing it, I realized it is "only" the
frontend. I'm not sure if it makes sense to adjust the Ember observer
client, or just write a simple UI from scratch.
I have a very capable and motivated frontend developer who would be willing
to implement what I've mocked in my proposal.
In addition, I found somebody (Congxian Qiu) who seems to be eager to help
with this project for the backend:
https://github.com/rmetzger/flink-community-tools/issues/4

For Infra: I made the same experience when asking for more GitHub
permissions for "flinkbot": They didn't respond on their mailing list, only
on Jira.



On Thu, Mar 14, 2019 at 2:45 PM Becket Qin  wrote:

> Thanks for writing up the specifications.
>
> Regarding the website source code, Austin found a website[1] whose
> frontend code[2] is available publicly. It lacks some support (e.g login),
> but it is still a good starting point. One thing is that I did not find a
> License statement for that source code. I'll reach out to the author to see
> if they have any concern over our usage.
>
> Apache Infra has not replied to my email regarding some details about the
> VM. I'll open an infra Jira ticket tomorrow if there is still no response.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> [1] https://emberobserver.com/
> [2] https://github.com/emberobserver/client
>
>
>
> On Thu, Mar 14, 2019 at 1:35 AM Robert Metzger 
> wrote:
>
>> @Bowen: I agree. Confluent Hub looks nicer, but it is on their company
>> website. I guess the likelihood that they give out code from their company
>> website is fairly low.
>> @Nils: Beam's page is similar to our Ecosystem page, which we'll
>> reactivate as part of this PR:
>> https://github.com/apache/flink-web/pull/187
>>
>> Spark-packages.org did not respond to my request.
>> I will propose a short specification in Becket's initial document.
>>
>>
>> On Mon, Mar 11, 2019 at 11:38 AM Niels Basjes  wrote:
>>
>>> Hi,
>>>
>>> The Beam project has something in this area that is simply a page
>>> within their documentation website:
>>> https://beam.apache.org/documentation/sdks/java-thirdparty/
>>>
>>> Niels Basjes
>>>
>>> On Fri, Mar 8, 2019 at 11:39 PM Bowen Li  wrote:
>>> >
>>> > Confluent hub for Kafka is another good example of this kind. I
>>> personally like it over the spark site. May worth checking it out with
>>> Kafka folks
>>> >
>>> > On Thu, Mar 7, 2019 at 6:06 AM Becket Qin 
>>> wrote:
>>> >>
>>> >> Absolutely! Thanks for the pointer. I'll submit a PR to update the
>>> >> ecosystem page and the navigation.
>>> >>
>>> >> Thanks,
>>> >>
>>> >> Jiangjie (Becket) Qin
>>> >>
>>> >> On Thu, Mar 7, 2019 at 8:47 PM Robert Metzger 
>>> wrote:
>>> >>
>>> >> > Okay. I will reach out to spark-packages.org and see if they are
>>> willing
>>> >> > to share.
>>> >> >
>>> >> > Do you want to raise a PR to update the ecosystem page (maybe sync
>>> with
>>> >> > the "Software Projects" listed here:
>>> >> > https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink)
>>> and
>>> >> > link it in the navigation?
>>> >> >
>>> >> > Best,
>>> >> > Robert
>>> >> >
>>> >> >
>>> >> > On Thu, Mar 7, 2019 at 10:13 AM Becket Qin 
>>> wrote:
>>> >> >
>>> >> >> Hi Robert,
>>> >> >>
>>> >> >> I think it at least worths checking if spark-packages.org owners
>>> are
>>> >> >> willing to share. Thanks for volunteering to write the requirement
>>> >> >> descriptions! In any case, that will be very helpful.
>>> >> >>
>>> >> >> Since a static page has almost no cost, and we will need it to
>>> redirect
>>> >> >> to the dynamic site anyways, how about we first do that while
>>> working on
>>> >> >> the dynamic website?
>>> >> >>
>>> >> >> Thanks,
>>> >> >>
>>> >> >> Jiangjie (Becket) Qin
>>> >> >>
>>> >> >> On Thu, Mar 7, 2019 at 4:59 AM Ufuk Celebi  wrote:
>>> >> >>
>>> >> >>> I like Shaoxuan's idea to keep this a static site first. We could
>>> then
>>> >> >>> iterate on this and make it a dynamic thing. Of course, if we
>>> have the
>>> >> >>> resources in the community to quickly start with a dynamic site,
>>> I'm
>>> >> >>> not apposed.
>>> >> >>>
>>> >> >>> – Ufuk
>>> >> >>>
>>> >> >>> On Wed, Mar 6, 2019 at 2:31 PM Robert Metzger <
>>> rmetz...@apache.org>
>>> >> >>> wrote:
>>> >> >>> >
>>> >> >>> > Awesome! Thanks a lot for looking into this Becket! The VMs
>>> hosted by
>>> >> >>> Infra
>>> >> >>> > look suitable.
>>> >> >>> >
>>> >> >>> > @Shaoxuan: There is actually already a static page. It used to
>>> be
>>> >> >>> linked,
>>> >> >>> > but has been removed from the navigation bar for some reason.
>>> This is
>>> >> >>> the
>>> >> >>> > page: https://flink.apache.org/ecosystem.html
>>> >> >>> > We could update the page and add it back to the navigation bar
>>> for the
>>> >> >>> > coming weeks. What do you think?
>>> >> >>> >
>>> >> >>> > I would actually like to push for a dynamic page 

RE: ProgramInvocationException when trying to submit a job by running a jar using Monitoring REST API

2019-03-15 Thread Papadopoulos, Konstantinos
We had some progress since the job seems to be submitted and its execution has 
been started, but, now, I am getting a ProgramAbortException as follows:

05:01:01.788 [ERROR] SpringApplication - Application run failed
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException: 
null
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.execute(OptimizerPlanEnvironment.java:54)
 ~[flink-dist_2.12-1.7.1.jar:1.7.1]
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
 ~[flink-dist_2.12-1.7.1.jar:1.7.1]
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) 
~[flink-dist_2.12-1.7.1.jar:1.7.1]
at 
com.iri.aa.etl.lmx.entity.service.AbstractLmxService.getCampaigns(AbstractLmxService.java:105)
 ~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
com.iri.aa.etl.lmx.entity.service.MediaSpentServiceImpl.transform(MediaSpentServiceImpl.java:25)
 ~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
com.iri.aa.etl.lmx.MediaSpentJob.execute(MediaSpentJob.java:58) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
com.iri.aa.etl.lmx.AbstractLmxJob.abstractExecute(AbstractLmxJob.java:27) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
com.iri.aa.etl.lmx.MediaSpentJob.execute(MediaSpentJob.java:38) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
com.iri.aa.etl.lmx.MediaSpentJob.executeDry(MediaSpentJob.java:44) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
com.iri.aa.etl.job.JobExecutor.lambda$executeDryRunners$6(JobExecutor.java:44) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at java.util.ArrayList.forEach(Unknown Source) ~[?:1.8.0_201]
at 
com.iri.aa.etl.job.JobExecutor.executeDryRunners(JobExecutor.java:44) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at com.iri.aa.etl.job.JobExecutor.run(JobExecutor.java:35) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804)
 ~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794)
 ~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:324) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at com.iri.aa.etl.EtlApplication.main(EtlApplication.java:21) 
~[d7378efa-919f-4bd3-a320-d74a5a76f80e_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_201]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
~[?:1.8.0_201]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source) ~[?:1.8.0_201]
at java.lang.reflect.Method.invoke(Unknown Source) 
~[?:1.8.0_201]
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
 ~[flink-dist_2.12-1.7.1.jar:1.7.1]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
 ~[flink-dist_2.12-1.7.1.jar:1.7.1]
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
 ~[flink-dist_2.12-1.7.1.jar:1.7.1]
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
 ~[flink-dist_2.12-1.7.1.jar:1.7.1]
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
 ~[flink-dist_2.12-1.7.1.jar:1.7.1]
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:117)
 ~[flink-dist_2.12-1.7.1.jar:1.7.1]
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$7(JarRunHandler.java:151)
 ~[flink-dist_2.12-1.7.1.jar:1.7.1]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) 
[?:1.8.0_201]
at 

Re: ProgramInvocationException when trying to submit a job by running a jar using Monitoring REST API

2019-03-15 Thread Chesnay Schepler
Please separate your program arguments by a space instead of a comma and 
try again.


On 15.03.2019 09:34, Papadopoulos, Konstantinos wrote:


Hi Chesnay,

Sorry for the misunderstanding. I get the following exception:

2019-03-15 04:31:26,826 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Exception 
occurred in REST handler.


org.apache.flink.runtime.rest.handler.RestHandlerException: 
org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error.


at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$4(JarRunHandler.java:114)


at java.util.concurrent.CompletableFuture.uniExceptionally(Unknown Source)

at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown 
Source)


at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)

at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.lang.Thread.run(Unknown Source)

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error.


at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:119)


at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$7(JarRunHandler.java:151)


... 4 more

Caused by: org.apache.flink.client.program.ProgramInvocationException: 
The main method caused an error.


at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)


at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)


at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)


at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)


at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)


at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:117)


... 5 more

Caused by: java.lang.IllegalStateException: Failed to execute 
ApplicationRunner


at 
org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:807)


at 
org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794)


at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:324)


at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)


at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)


at com.iri.aa.etl.EtlApplication.main(EtlApplication.java:21)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)

at java.lang.reflect.Method.invoke(Unknown Source)

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)


... 10 more

Caused by: java.lang.IllegalArgumentException: Argument 
'initialScopeId' is missing.


at 
com.iri.aa.etl.job.JobArgumentsUtility.getArgument(JobArgumentsUtility.java:58)


at 
com.iri.aa.etl.lmx.job.JobArgumentsServiceImpl.getLmxArguments(JobArgumentsServiceImpl.java:86)


at 
com.iri.aa.etl.lmx.job.JobArgumentsServiceImpl.getJobArguments(JobArgumentsServiceImpl.java:39)


at com.iri.aa.etl.job.JobExecutor.run(JobExecutor.java:30)

at 
org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804)


... 20 more

*From:*Chesnay Schepler 
*Sent:* Παρασκευή, 15 Μαρτίου 2019 10:20 πμ
*To:* Papadopoulos, Konstantinos 
; user@flink.apache.org
*Subject:* Re: ProgramInvocationException when trying to submit a job 
by running a jar using Monitoring REST API


Please provide the logged exception, I cannot help you otherwise.

On 14.03.2019 14:20, Papadopoulos, Konstantinos wrote:

It seems that Flink cluster does not retrieve program arguments
correctly. For reference, I sent the following request:

Method Type: POST

URL:

http://dbtpa05p.ch3.dev.i.com:9171/v1/jars/321febd8-a5e8-4255-858b-c221b49aef18_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar/run



Body: {"programArgs" :

"--job=mediaSpent,--initialScopeId=b494c35d-4c37-4338-8d23-0fc947bef690,--integratedScopeId=91769bd8-df4d-436c-b8d0-2e23ce862859,--projectId=333,--log.path=../log"}

Content-Type: application/json


RE: ProgramInvocationException when trying to submit a job by running a jar using Monitoring REST API

2019-03-15 Thread Papadopoulos, Konstantinos
Hi Chesnay,

Sorry for the misunderstanding. I get the following exception:

2019-03-15 04:31:26,826 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Exception 
occurred in REST handler.
org.apache.flink.runtime.rest.handler.RestHandlerException: 
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$4(JarRunHandler.java:114)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(Unknown Source)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown Source)
at java.util.concurrent.CompletableFuture.postComplete(Unknown 
Source)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:119)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$7(JarRunHandler.java:151)
... 4 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:117)
... 5 more
Caused by: java.lang.IllegalStateException: Failed to execute ApplicationRunner
at 
org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:807)
at 
org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794)
at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:324)
at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
at com.iri.aa.etl.EtlApplication.main(EtlApplication.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 10 more
Caused by: java.lang.IllegalArgumentException: Argument 'initialScopeId' is 
missing.
at 
com.iri.aa.etl.job.JobArgumentsUtility.getArgument(JobArgumentsUtility.java:58)
at 
com.iri.aa.etl.lmx.job.JobArgumentsServiceImpl.getLmxArguments(JobArgumentsServiceImpl.java:86)
at 
com.iri.aa.etl.lmx.job.JobArgumentsServiceImpl.getJobArguments(JobArgumentsServiceImpl.java:39)
at com.iri.aa.etl.job.JobExecutor.run(JobExecutor.java:30)
at 
org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804)
... 20 more

From: Chesnay Schepler 
Sent: Παρασκευή, 15 Μαρτίου 2019 10:20 πμ
To: Papadopoulos, Konstantinos ; 
user@flink.apache.org
Subject: Re: ProgramInvocationException when trying to submit a job by running 
a jar using Monitoring REST API

Please provide the logged exception, I cannot help you otherwise.

On 14.03.2019 14:20, Papadopoulos, Konstantinos wrote:
It seems that Flink cluster does not retrieve program arguments correctly. For 
reference, I sent the following request:
Method Type: POST
URL: 

Re: Set partition number of Flink DataSet

2019-03-15 Thread Fabian Hueske
Hi,

Flink works a bit differently than Spark.
By default, Flink uses pipelined shuffles which push results of the sender
immediately to the receivers (btw. this is one of the building blocks for
stream processing).
However, pipelined shuffles require that all receivers are online. Hence,
there number of partitions determines the number of running tasks.
There is also a batch shuffle mode, but it needs to be explicitly enabled
and AFAIK does not resolve the dependency of number of partitions and task
parallelism.

However, the community is currently working on many improvements for batch
processing, including scheduling and fault-tolerance.
Batched shuffles are an important building block for this and there might
be better support for your use case in the future.

Best, Fabian

Am Fr., 15. März 2019 um 03:56 Uhr schrieb qi luo :

> Hi Ken,
>
> That looks awesome! I’ve implemented something similar to your bucketing
> sink, but using multiple internal writers rather than multiple internal
> output.
>
> Besides this, I’m also curious whether Flink can achieve this like Spark:
> allow user to specify partition number in partitionBy() method (so no
> multiple output formats are needed). But this seems to need non-trivial
> changes in Flink core.
>
> Thanks,
> Qi
>
> On Mar 15, 2019, at 2:36 AM, Ken Krugler 
> wrote:
>
> Hi Qi,
>
> See https://github.com/ScaleUnlimited/flink-utils/, for a rough but
> working version of a bucketing sink.
>
> — Ken
>
>
> On Mar 13, 2019, at 7:46 PM, qi luo  wrote:
>
> Hi Ken,
>
> Agree. I will try partitonBy() to reducer the number of parallel sinks,
> and may also try sortPartition() so each sink could write files one by one.
> Looking forward to your solution. :)
>
> Thanks,
> Qi
>
> On Mar 14, 2019, at 2:54 AM, Ken Krugler 
> wrote:
>
> Hi Qi,
>
> On Mar 13, 2019, at 1:26 AM, qi luo  wrote:
>
> Hi Ken,
>
> Do you mean that I can create a batch sink which writes to N files?
>
>
> Correct.
>
> That sounds viable, but since our data size is huge (billions of records &
> thousands of files), the performance may be unacceptable.
>
>
> The main issue with performance (actually memory usage) is how many
> OutputFormats do you need to have open at the same time.
>
> If you partition by the same key that’s used to define buckets, then the
> max number is less, as each parallel instance of the sink only gets a
> unique subset of all possible bucket values.
>
> I’m actually dealing with something similar now, so I might have a
> solution to share soon.
>
> — Ken
>
>
> I will check Blink and give it a try anyway.
>
> Thank you,
> Qi
>
> On Mar 12, 2019, at 11:58 PM, Ken Krugler 
> wrote:
>
> Hi Qi,
>
> If I understand what you’re trying to do, then this sounds like a
> variation of a bucketing sink.
>
> That typically uses a field value to create a directory path or a file
> name (though the filename case is only viable when the field is also what’s
> used to partition the data)
>
> But I don’t believe Flink has built-in support for that, in batch mode
> (see BucketingSink
> 
>  for
> streaming).
>
> Maybe Blink has added that? Hoping someone who knows that codebase can
> chime in here.
>
> Otherwise you’ll need to create a custom sink to implement the desired
> behavior - though abusing a MapPartitionFunction
> 
>  would
> be easiest, I think.
>
> — Ken
>
>
>
> On Mar 12, 2019, at 2:28 AM, qi luo  wrote:
>
> Hi Ken,
>
> Thanks for your reply. I may not make myself clear: our problem is not
> about reading but rather writing.
>
> We need to write to N files based on key partitioning. We have to use
> *setParallelism() *to set the output partition/file number, but when the
> partition number is too large (~100K), the parallelism would be too high.
> Is there any other way to achieve this?
>
> Thanks,
> Qi
>
> On Mar 11, 2019, at 11:22 PM, Ken Krugler 
> wrote:
>
> Hi Qi,
>
> I’m guessing you’re calling createInput() for each input file.
>
> If so, then instead you want to do something like:
>
>  Job job = Job.getInstance();
>
> for each file…
> FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(file
> path));
>
> env.createInput(HadoopInputs.createHadoopInput(…, job)
>
> Flink/Hadoop will take care of parallelizing the reads from the files,
> given the parallelism that you’re specifying.
>
> — Ken
>
>
> On Mar 11, 2019, at 5:42 AM, qi luo  wrote:
>
> Hi,
>
> We’re trying to distribute batch input data to (N) HDFS files partitioning
> by hash using DataSet API. What I’m doing is like:
>
> *env.createInput(…)*
> *  .partitionByHash(0)*
> *  .setParallelism(N)*
> *  .output(…)*
>
> This works well for small number of files. But when we need to distribute
> to* large number of files (say 100K)*, the 

Re: ProgramInvocationException when trying to submit a job by running a jar using Monitoring REST API

2019-03-15 Thread Chesnay Schepler

Please provide the logged exception, I cannot help you otherwise.

On 14.03.2019 14:20, Papadopoulos, Konstantinos wrote:


It seems that Flink cluster does not retrieve program arguments 
correctly. For reference, I sent the following request:


Method Type: POST

URL: 
http://dbtpa05p.ch3.dev.i.com:9171/v1/jars/321febd8-a5e8-4255-858b-c221b49aef18_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar/run


Body: {"programArgs" : 
"--job=mediaSpent,--initialScopeId=b494c35d-4c37-4338-8d23-0fc947bef690,--integratedScopeId=91769bd8-df4d-436c-b8d0-2e23ce862859,--projectId=333,--log.path=../log"}


Content-Type: application/json

*From:*Chesnay Schepler 
*Sent:* Πέμπτη, 14 Μαρτίου 2019 2:24 μμ
*To:* Papadopoulos, Konstantinos 
; user@flink.apache.org
*Subject:* Re: ProgramInvocationException when trying to submit a job 
by running a jar using Monitoring REST API


Please enable debug logging, re-submit the job, check the JobManager 
logs and report back the logged exception.


On 14.03.2019 13:16, Papadopoulos, Konstantinos wrote:

Hi all,

As part of our projects, I experiment with Flink Monitoring REST
API and, especially, its capabilities of uploading and running jar
files.

When I am trying to submit one of our jobs by running a jar
previously uploaded via '/jars/upload', I am getting an 500
Internal Server Error response with the following body:

{ "errors": [
"org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error." ] }

On the other hand, when I am trying the same with the ‘Word Count’
batch example, execution succeeds.

It must be pointed out that I tried to execute the respective POST
request both via ‘curl’ and Postman, but both failed with the same
error.

Do anyone have idea why this may happen?

Thanks in advance,

Konstantinos





Re: Where does the logs in Flink GUI's Exception tab come from?

2019-03-15 Thread Gary Yao
Hi Averell,

I think I have answered your question previously [1]. The bottom line is
that
the error is logged on INFO level in the ExecutionGraph [2]. However, your
effective log level (of the root logger) is WARN. The log levels are ordered
as follows [3]:

TRACE < DEBUG < INFO <  WARN < ERROR

It follows that, in your case, log requests to the root logger of level WARN
and ERROR only will appear in the log file – all other levels will be
discarded. It should be enough to set the root log level to INFO, or set the
log level for the org.apache.flink.runtime.executiongraph.ExecutionGraph
logger explicitly to INFO (or lower).

Best,
Gary

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-resource-available-error-while-testing-HA-tp25681p25835.html
[2]
https://github.com/apache/flink/blob/81acd0a490f3ac40cbb2736189796138ac109dd0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L1363
[3] https://logback.qos.ch/manual/architecture.html

On Fri, Mar 15, 2019 at 4:21 AM Averell  wrote:

> Hi everyone,
>
> I am running Flink in EMR YARN cluster, and when the job failed and
> restarted, I could see some logs in the Exception tab of Flink GUI.
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Screen_Shot_2019-03-15_at_12.png>
>
>
> I could not find this piece of logs on my cluster's hard-disk - not in TM
> or
> JM logs.
>
> Where can I find this?
>
> Thanks.
>
> Here below is my logback.xml. I'm not sure it has anything to do with my
> question.
>
> 
> 
> ${log.file}
> false
> 
> %d{-MM-dd HH:mm:ss.SSS} [%thread] %-5level
> %logger{60} %X{sourceThread} - %msg%n
> 
> 
>
> 
> 
> 
>  level="INFO"
> additivity="false">
> 
> 
>  level="INFO" additivity="false">
> 
> 
> 
> name="org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline"
> level="ERROR">
> 
> 
> 
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Re: Re: Re: Flink 1.7.2: Task Manager not able to connect to Job Manager

2019-03-15 Thread Kumar Bolar, Harshith
Hi Gary,

The job manager was indeed being invoked with a second parameter.

${Flink_HOME}/bin/jobmanager.sh start cluster

I removed the second argument and everything works fine now. I really 
appreciate your help. Thanks a lot :-)

Regards,
Harshith

From: Gary Yao 
Date: Friday, 15 March 2019 at 12:41 PM
To: Harshith Kumar Bolar 
Cc: user 
Subject: [External] Re: Re: Re: Flink 1.7.2: Task Manager not able to connect 
to Job Manager

I forgot to add line numbers to the first link in my previous email:


https://github.com/apache/flink/blob/c6878aca6c5aeee46581b4d6744b31049db9de95/flink-dist/src/main/flink-bin/bin/jobmanager.sh#L21-L25

On Fri, Mar 15, 2019 at 8:08 AM Gary Yao 
mailto:g...@ververica.com>> wrote:
Hi Harshith,

In the 
jobmanager.sh
 script, the 2nd argument is assigned to the HOST variable
[1]. How are you invoking 
jobmanager.sh?
 Prior to 1.5, the script expected an
execution mode (local or cluster) but this is no longer the case [2].

Best,
Gary

[1] 
https://github.com/apache/flink/blob/c6878aca6c5aeee46581b4d6744b31049db9de95/flink-dist/src/main/flink-bin/bin/jobmanager.sh
[2] 
https://github.com/apache/flink/commit/d61664ca64bcb82c4e8ddf03a2ed38fe8edafa98

On Fri, Mar 15, 2019 at 3:36 AM Kumar Bolar, Harshith 
mailto:hk...@arity.com>> wrote:
Hi Gary,

An update. I noticed the line “–host cluster” in the program arguments section 
of the job manager logs. So, I commented the following section in 
jobmanager.sh,
 the task manager is now able to connect to job manager without issues.

  if [ ! -z $HOST ]; then
args+=("--host")
args+=("${HOST}")
fi


Task manager logs after commenting those lines:


2019-03-14 22:31:02,863 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
- Starting RPC endpoint for 
org.apache.flink.runtime.taskexecutor.TaskExecutor at 
akka://flink/user/taskmanager_0 .
2019-03-14 22:31:02,875 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
2019-03-14 22:31:02,876 INFO  
org.apache.flink.runtime.taskexecutor.JobLeaderService- Start job 
leader service.
2019-03-14 22:31:02,877 INFO  org.apache.flink.runtime.filecache.FileCache  
- User file cache uses directory 
/tmp/flink-dist-cache-12d5905f-d694-46f6-9359-3a636188b008
2019-03-14 22:31:02,884 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting to 
ResourceManager 
akka.tcp://fl...@flink0-1.flink1.us-east-1.high.ue1.non.aws.cloud.arity.com:28945/user/resourcemanager(8583b335fd08a30a89585b7af07e4213).
2019-03-14 22:31:03,109 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Resolved 
ResourceManager address, beginning registration
2019-03-14 22:31:03,110 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Registration at 
ResourceManager attempt 1 (timeout=100ms)
2019-03-14 22:31:03,228 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Registration at 
ResourceManager attempt 2 (timeout=200ms)
2019-03-14 22:31:03,266 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Successful 
registration at resource manager 

Re: Re: Re: Flink 1.7.2: Task Manager not able to connect to Job Manager

2019-03-15 Thread Gary Yao
I forgot to add line numbers to the first link in my previous email:


https://github.com/apache/flink/blob/c6878aca6c5aeee46581b4d6744b31049db9de95/flink-dist/src/main/flink-bin/bin/jobmanager.sh#L21-L25

On Fri, Mar 15, 2019 at 8:08 AM Gary Yao  wrote:

> Hi Harshith,
>
> In the jobmanager.sh script, the 2nd argument is assigned to the HOST
> variable
> [1]. How are you invoking jobmanager.sh? Prior to 1.5, the script expected
> an
> execution mode (local or cluster) but this is no longer the case [2].
>
> Best,
> Gary
>
> [1]
> https://github.com/apache/flink/blob/c6878aca6c5aeee46581b4d6744b31049db9de95/flink-dist/src/main/flink-bin/bin/jobmanager.sh
> [2]
> https://github.com/apache/flink/commit/d61664ca64bcb82c4e8ddf03a2ed38fe8edafa98
>
> On Fri, Mar 15, 2019 at 3:36 AM Kumar Bolar, Harshith 
> wrote:
>
>> Hi Gary,
>>
>>
>>
>> An update. I noticed the line “–host cluster” in the program arguments
>> section of the job manager logs. So, I commented the following section in
>> jobmanager.sh, the task manager is now able to connect to job manager
>> without issues.
>>
>>
>>
>>   *if [ ! -z $HOST ]; then*
>>
>> *args+=("--host")*
>>
>> *args+=("${HOST}")*
>>
>> *fi*
>>
>>
>>
>>
>>
>> Task manager logs after commenting those lines:
>>
>>
>>
>>
>> * 2019-03-14 22:31:02,863 INFO
>> org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
>> RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at
>> akka://flink/user/taskmanager_0 .*
>>
>> *2019-03-14 22:31:02,875 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.*
>>
>> *2019-03-14 22:31:02,876 INFO
>> org.apache.flink.runtime.taskexecutor.JobLeaderService- Start job
>> leader service.*
>>
>> *2019-03-14 22:31:02,877 INFO
>> org.apache.flink.runtime.filecache.FileCache  - User file
>> cache uses directory
>> /tmp/flink-dist-cache-12d5905f-d694-46f6-9359-3a636188b008*
>>
>> *2019-03-14 22:31:02,884 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting
>> to ResourceManager
>> akka.tcp://fl...@flink0-1.flink1.us-east-1.high.ue1.non.aws.cloud.arity.com:28945/user/resourcemanager(8583b335fd08a30a89585b7af07e4213)
>> .*
>>
>> *2019-03-14 22:31:03,109 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Resolved
>> ResourceManager address, beginning registration*
>>
>> *2019-03-14 22:31:03,110 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor-
>> Registration at ResourceManager attempt 1 (timeout=100ms)*
>>
>> *2019-03-14 22:31:03,228 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor-
>> Registration at ResourceManager attempt 2 (timeout=200ms)*
>>
>> *2019-03-14 22:31:03,266 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Successful
>> registration at resource manager
>> akka.tcp://fl...@flink0-1.flink1.us-east-1.abc.com:28945/user/resourcemanager
>> 
>> under registration id 170ee6a00f80ee02ead0e88710093d77.*
>>
>>
>>
>>
>>
>> Thanks,
>>
>> Harshith
>>
>>
>>
>> *From: *Harshith Kumar Bolar 
>> *Date: *Friday, 15 March 2019 at 7:38 AM
>> *To: *Gary Yao 
>> *Cc: *user 
>> *Subject: *Re: [External] Re: Re: Flink 1.7.2: Task Manager not able to
>> connect to Job Manager
>>
>>
>>
>> Hi Gary,
>>
>>
>>
>> Here are the full job manager and task manager logs. In the job manager
>> logs, I see it says “*starting StandaloneSessionClusterEntrypoint”,* whereas
>> in Flink 1.4.2, it used to say “*starting JobManager”*. Is this correct?
>>
>>
>>
>> Job manager logs: https://paste.ubuntu.com/p/DCVzsQdpHq/ 
>> (https://paste(.)ubuntu(.)com/p/DCVzsQdpHq
>> /)
>>
>> Task Manager logs: https://paste.ubuntu.com/p/wbvYFZxdT8/ (
>> https://paste(.)ubuntu(.)com/p/wbvYFZxdT8/)
>>
>>
>>
>> Thanks,
>>
>> Harshith
>>
>>
>>
>> *From: *Gary Yao 
>> *Date: *Thursday, 14 March 2019 at 10:11 PM
>> *To: *Harshith Kumar Bolar 
>> *Cc: *user 
>> *Subject: *[External] Re: Re: Flink 1.7.2: Task Manager not able to
>> connect to Job Manager
>>
>>
>>
>> Hi Harshith,
>>
>> The truncated log is not enough. Can you share the complete logs? If
>> that's
>> not possible, I'd like to see the beginning of the log files where the
>> cluster
>> configuration is logged.
>>
>> The TaskManager tries to connect to the leader that is advertised in
>> ZooKeeper. In your case the "cluster" hostname is advertised which hints a
>> problem in your Flink configuration.
>>
>> Best,
>> Gary
>>
>>
>>
>> On Thu, Mar 14, 2019 at 4:54 PM Kumar Bolar, Harshith 
>> wrote:
>>
>> Hi Gary,
>>
>>
>>
>> I’ve attached the relevant portions of the JM and TM logs.
>>
>>
>>
>> *Job Manager Logs:*
>>
>> 2019-03-14 11:38:28,257 INFO
>> 

Re: Re: Re: Flink 1.7.2: Task Manager not able to connect to Job Manager

2019-03-15 Thread Gary Yao
Hi Harshith,

In the jobmanager.sh script, the 2nd argument is assigned to the HOST
variable
[1]. How are you invoking jobmanager.sh? Prior to 1.5, the script expected
an
execution mode (local or cluster) but this is no longer the case [2].

Best,
Gary

[1]
https://github.com/apache/flink/blob/c6878aca6c5aeee46581b4d6744b31049db9de95/flink-dist/src/main/flink-bin/bin/jobmanager.sh
[2]
https://github.com/apache/flink/commit/d61664ca64bcb82c4e8ddf03a2ed38fe8edafa98

On Fri, Mar 15, 2019 at 3:36 AM Kumar Bolar, Harshith 
wrote:

> Hi Gary,
>
>
>
> An update. I noticed the line “–host cluster” in the program arguments
> section of the job manager logs. So, I commented the following section in
> jobmanager.sh, the task manager is now able to connect to job manager
> without issues.
>
>
>
>   *if [ ! -z $HOST ]; then*
>
> *args+=("--host")*
>
> *args+=("${HOST}")*
>
> *fi*
>
>
>
>
>
> Task manager logs after commenting those lines:
>
>
>
>
> * 2019-03-14 22:31:02,863 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
> RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at
> akka://flink/user/taskmanager_0 .*
>
> *2019-03-14 22:31:02,875 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.*
>
> *2019-03-14 22:31:02,876 INFO
> org.apache.flink.runtime.taskexecutor.JobLeaderService- Start job
> leader service.*
>
> *2019-03-14 22:31:02,877 INFO
> org.apache.flink.runtime.filecache.FileCache  - User file
> cache uses directory
> /tmp/flink-dist-cache-12d5905f-d694-46f6-9359-3a636188b008*
>
> *2019-03-14 22:31:02,884 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting
> to ResourceManager
> akka.tcp://fl...@flink0-1.flink1.us-east-1.high.ue1.non.aws.cloud.arity.com:28945/user/resourcemanager(8583b335fd08a30a89585b7af07e4213)
> .*
>
> *2019-03-14 22:31:03,109 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Resolved
> ResourceManager address, beginning registration*
>
> *2019-03-14 22:31:03,110 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Registration at ResourceManager attempt 1 (timeout=100ms)*
>
> *2019-03-14 22:31:03,228 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Registration at ResourceManager attempt 2 (timeout=200ms)*
>
> *2019-03-14 22:31:03,266 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Successful
> registration at resource manager
> akka.tcp://fl...@flink0-1.flink1.us-east-1.abc.com:28945/user/resourcemanager
> 
> under registration id 170ee6a00f80ee02ead0e88710093d77.*
>
>
>
>
>
> Thanks,
>
> Harshith
>
>
>
> *From: *Harshith Kumar Bolar 
> *Date: *Friday, 15 March 2019 at 7:38 AM
> *To: *Gary Yao 
> *Cc: *user 
> *Subject: *Re: [External] Re: Re: Flink 1.7.2: Task Manager not able to
> connect to Job Manager
>
>
>
> Hi Gary,
>
>
>
> Here are the full job manager and task manager logs. In the job manager
> logs, I see it says “*starting StandaloneSessionClusterEntrypoint”,* whereas
> in Flink 1.4.2, it used to say “*starting JobManager”*. Is this correct?
>
>
>
> Job manager logs: https://paste.ubuntu.com/p/DCVzsQdpHq/ 
> (https://paste(.)ubuntu(.)com/p/DCVzsQdpHq
> /)
>
> Task Manager logs: https://paste.ubuntu.com/p/wbvYFZxdT8/ (
> https://paste(.)ubuntu(.)com/p/wbvYFZxdT8/)
>
>
>
> Thanks,
>
> Harshith
>
>
>
> *From: *Gary Yao 
> *Date: *Thursday, 14 March 2019 at 10:11 PM
> *To: *Harshith Kumar Bolar 
> *Cc: *user 
> *Subject: *[External] Re: Re: Flink 1.7.2: Task Manager not able to
> connect to Job Manager
>
>
>
> Hi Harshith,
>
> The truncated log is not enough. Can you share the complete logs? If that's
> not possible, I'd like to see the beginning of the log files where the
> cluster
> configuration is logged.
>
> The TaskManager tries to connect to the leader that is advertised in
> ZooKeeper. In your case the "cluster" hostname is advertised which hints a
> problem in your Flink configuration.
>
> Best,
> Gary
>
>
>
> On Thu, Mar 14, 2019 at 4:54 PM Kumar Bolar, Harshith 
> wrote:
>
> Hi Gary,
>
>
>
> I’ve attached the relevant portions of the JM and TM logs.
>
>
>
> *Job Manager Logs:*
>
> 2019-03-14 11:38:28,257 INFO
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
> - State change: CONNECTED
> 2019-03-14 11:38:28,309 INFO
> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined
> location of main cluster component log file:
> /opt/flink-1.7.2/log/flink-root-standalonesession-4-flink0-1.flink1.us-east-1.log
> 2019-03-14 11:38:28,309 INFO
> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - 

Re: Rocksdb to filesystem state migration errors

2019-03-15 Thread Lakshmi Gururaja Rao
Thanks for pointing me to the JIRA, Congxian.

On Thu, Mar 14, 2019 at 6:14 PM Congxian Qiu  wrote:

> Hi Lakshmi
>
> Currently, we can’t switch between rocksdb and filesystem backend using
> savepoint, there is an issue to fix this[1].
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-11254
>
>
> Best,
> Congxian
>
>
> Lakshmi Gururaja Rao  于2019年3月15日周五 上午8:07写道:
>
>> Hey all,
>>
>> I'm trying to do a state migration from rocksdb --> filesystem backend.
>> The approach I'm taking here is:
>> 1) Cancel job with savepoint while its running on rocksdb
>> 2) Update the job/cluster with filesystem as the state backend
>> 3) Submit a job with the previous rocksdb savepoint
>>
>> From what I understand about savepoints, this should work out of the box?
>> However, it works in some cases but fails in others. Specifically, whenever
>> there's a job with user managed state, for e.g., a Process Function with a
>> ValueState, it throws the following error:
>>
>> Caused by: java.lang.IllegalStateException: Unexpected key-group in restore.
>>  at 
>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>  at 
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:418)
>>  at 
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:315)
>>  at 
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:95)
>>  at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>>  at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>>
>>
>>
>> The error specifically comes from a precondition check in
>> HeapKeyedStateBackend
>> 
>>  .
>> On doing some debugging, I find that the value of writtenKeyGroupIndex
>> 
>>  always
>> evaluates to 0, thus failing the check.
>>
>> Has anyone run into this issue before?
>>
>> Thanks
>> Lakshmi
>>
>

-- 
*Lakshmi Gururaja Rao*
SWE
217.778.7218 <+12177787218>
[image: Lyft]