Re: Breakage in Flink CLI in 1.5.0

2018-06-19 Thread Sampath Bhat
Hi Chesnay

If REST API (i.e. the web server) is mandatory for submitting jobs then why
is there an option to set rest.port to -1? I think it should be mandatory
to set some valid port for rest.port and make sure flink job manager does
not come up if valid port is not set for rest.port? Or else there must be
some way to submit jobs even if REST API (i.e. the web server) is not
instantiated.

If jobmanger.rpc.address is not required for flink client then why is it
still looking for that property in flink-conf.yaml? Isn't it not a bug?
Because if we comment out the jobmanger.rpc.address and jobmanger.rpc.port
then flink client will not be able to submit the job.


On Tue, Jun 19, 2018 at 5:49 PM, Chesnay Schepler 
wrote:

> In 1.5 we reworked the job-submission to go through the REST API instead
> of akka.
>
> I believe the jobmanager rpc port shouldn't be necessary anymore, the rpc
> address is still *required *due to some technical implementations; it may
> be that you can set this to some arbitrary value however.
>
> As a result the REST API (i.e. the web server) must be running in order to
> submit jobs.
>
>
> On 19.06.2018 14:12, Sampath Bhat wrote:
>
> Hello
>
> I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink cluster.
>
> In flink 1.4.2 only job manager rpc address and job manager rpc port were
> sufficient for flink client to connect to job manager and submit the job.
>
> But in flink 1.5.0 the flink client additionally requires the rest.address
> and rest.port for submitting the job to job manager. What is the advantage
> of this new method over the 1.4.2 method of submitting job?
>
> Moreover if we make rest.port = -1 the web server will not be instantiated
> then how should we submit the job?
>
> Regards
> Sampath
>
>
>


[jira] [Created] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager

2018-06-19 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9619:
-

 Summary: Always close the task manager connection when the 
container is completed in YarnResourceManager
 Key: FLINK-9619
 URL: https://issues.apache.org/jira/browse/FLINK-9619
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.6.0, 1.5.1
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0, 1.5.1


We should always eagerly close the connection with task manager when the 
container is completed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: DataStreamCalcRule$1802" grows beyond 64 KB when execute long sql.

2018-06-19 Thread zhangminglei
Thanks to Fabian and Timo, I watched the scalar udf and find it is very quick 
to implements a case when udf for the specify logic to  meet my necessary
 
Cheers
Minglei

> 在 2018年6月19日,下午10:52,Fabian Hueske  写道:
> 
> I see, then this case wasn't covered by the fix that we added for Flink
> 1.5.0.
> I guess the problem is that the code is needed to evaluate a single field.
> 
> Implementing a scalar user-function is not very difficult [1].
> However, you need to register it in the TableEnvironment before you can use
> it in a SQL query.
> 
> Best, Fabian
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/udfs.html#scalar-functions
> 
> 2018-06-19 16:46 GMT+02:00 zhangminglei <18717838...@163.com>:
> 
>> Hi, Fabian, Absolutely, Flink 1.5.0 I am using for this. A big CASE WHEN
>> statement. Is it hard to implement ? I am a new to flink table api & sql.
>> 
>> Best Minglei.
>> 
>> 在 2018年6月19日,下午10:36,Fabian Hueske  写道:
>> 
>> Hi,
>> 
>> Which version are you using? We fixed a similar issue for Flink 1.5.0.
>> If you can't upgrade yet, you can also implement a user-defined function
>> that evaluates the big CASE WHEN statement.
>> 
>> Best, Fabian
>> 
>> 2018-06-19 16:27 GMT+02:00 zhangminglei <18717838...@163.com>:
>> 
>>> Hi, friends.
>>> 
>>> When I execute a long sql and get the follow error, how can I have a
>>> quick fix ?
>>> 
>>> org.apache.flink.api.common.InvalidProgramException: Table program
>>> cannot be compiled. This is a bug. Please file an issue.
>>>at org.apache.flink.table.codegen.Compiler$class.compile(
>>> Compiler.scala:36)
>>>at org.apache.flink.table.runtime.CRowProcessRunner.compile(
>>> CRowProcessRunner.scala:35)
>>>at org.apache.flink.table.runtime.CRowProcessRunner.open(
>>> CRowProcessRunner.scala:49)
>>>at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>>> nFunction(FunctionUtils.java:36)
>>>at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>>> erator.open(AbstractUdfStreamOperator.java:102)
>>>at org.apache.flink.streaming.api.operators.ProcessOperator.ope
>>> n(ProcessOperator.java:56)
>>>at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>>> perators(StreamTask.java:393)
>>>at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:254)
>>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.RuntimeException: Compiling
>>> "DataStreamCalcRule$1802": Code of method "processElement(Ljava/lang/Obj
>>> ect;Lorg/apache/flink/streaming/api/functions/ProcessFunctio
>>> n$Context;Lorg/apache/flink/util/Collector;)V" of class
>>> "DataStreamCalcRule$1802" grows beyond 64 KB
>>>at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.
>>> java:361)
>>> 
>>> select case when cast(status as bigint) in (200) then 10 else 1 end as
>>> pv,\
>>> case when cast(status as bigint) between 303 and 1000 then 1 when
>>> cast(status as bigint) between 100 and 199 then 1 else 0 end as
>>> service_fail,\
>>> case when cast(status as bigint) in (200) then 10 when cast(status as
>>> bigint) in (301,302) then 1 when cast(status as bigint) between 201 and 299
>>> then 1 else 0 end as service_success,\
>>> case when cast(status as bigint) not between 100 and 1000 then 1 else 0
>>> end as network_fail,\
>>> qqiplib(ip, 'isp') as isp,\
>>> case when response_time - request_time <= 6 then response_time -
>>> request_time else 0 end as response_t,\
>>> case when response_time - request_time <= 6 then 1 else 0 end as
>>> count_in,\
>>> case when host in ('116.31.114.22','116.31.114.2
>>> 3','183.60.219.231','183.60.219.232','183.60.219.235','183.
>>> 60.219.236','183.60.220.231','183.60.220.232',\
>>> '183.60.219.247','183.60.219.248','183.60.219.243','183.60.2
>>> 19.244','183.60.219.251','183.60.219.252','116.31.114.202','
>>> 116.31.114.204','116.31.114.206'\
>>> ,'116.31.114.208') then '佛山力通电信_SR' \
>>> when host in ('183.232.169.11','183.232.169.12','183.232.169.13','183.232
>>> .169.14','183.232.169.15','183.232.169.16','183.232.169.17','183.232.169.18')
>>> \
>>> then '佛山力通移动_SR' \
>>> when host in ('112.93.112.11','112.93.112.12','112.93.112.13','112.93.112
>>> .14','112.93.112.15','112.93.112.16','112.93.112.17','112.93.112.18') \
>>> then '佛山力通联通_SR' \
>>> when host in ('114.67.56.79','114.67.56.80','114.67.56.83','114.67.56.84'
>>> ,'114.67.56.87','114.67.56.88','114.67.56.112','114.67.56.11
>>> 3','114.67.56.116',\
>>> '114.67.56.117','114.67.60.214','114.67.60.215','114.67.54.
>>> 111','114.67.54.112','114.67.56.95','114.67.56.96','114.67.
>>> 54.12','114.67.54.13',\
>>> '114.67.56.93') \
>>> then '佛山力通BGP_SR' \
>>> when host in ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.
>>> 106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247',\
>>> 

[jira] [Created] (FLINK-9618) NullPointerException in FlinkKinesisProducer when aws.region is not set and aws.endpoint is set

2018-06-19 Thread Aaron Langford (JIRA)
Aaron Langford created FLINK-9618:
-

 Summary: NullPointerException in FlinkKinesisProducer when 
aws.region is not set and aws.endpoint is set
 Key: FLINK-9618
 URL: https://issues.apache.org/jira/browse/FLINK-9618
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Affects Versions: 1.5.0
 Environment: N/A
Reporter: Aaron Langford


This problem arose while trying to write to a local kinesalite instance. 
Specifying the aws.region and the aws.endpoint is not allowed. However when the 
aws.region is not present, a NullPointer exception is thrown.

Here is some example Scala code:
{code:java}
/**
  *
  * @param region the AWS region the stream lives in
  * @param streamName the stream to write records to
  * @param endpoint if in local dev, this points to a kinesalite instance
  * @return
  */
def getSink(region: String,
streamName: String,
endpoint: Option[String]): 
FlinkKinesisProducer[ProcessedMobilePageView] = {
  val props = new Properties()
  props.put(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO")

  endpoint match {
case Some(uri) => props.put(AWSConfigConstants.AWS_ENDPOINT, uri)
case None => props.put(AWSConfigConstants.AWS_REGION, region)
  }

  val producer = new FlinkKinesisProducer[ProcessedMobilePageView](
new JsonSerializer[ProcessedMobilePageView](DefaultSerializationBuilder),
props
  )
  producer.setDefaultStream(streamName)

  producer
}
{code}
To produce the NullPointerException, pass in `Some("localhost:4567")` for 
endpoint.

The source of the error is found at 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.java, on 
line 194. This line should perform some kind of check if aws.endpoint is 
present before grabbing it from the Properties object.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Ordering of stream from different kafka partitions

2018-06-19 Thread Andrey Zagrebin
Hi Amol,

I think you could try (based on your stack overflow code)
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
like this:

DataStream streamSource = env
   .addSource(kafkaConsumer)
   .setParallelism(4)
   .assignTimestampsAndWatermarks(
   new 
BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(3500)) {
   @Override
   public long extractTimestamp(Event element) {
   Map timeStamp = (Map) event.get("ts”);
   return (long) timeStamp.get("value");
   }
   });

In general, if records are sorted by anything in a Kafka partition, parallel 
subtask of Flink Kafka source will consume these records and push to user 
operators in the same order. There is maximum one consuming subtask per Kafka 
partition but several partitions might be served by one subtask. It means that 
there are the same guarantees as in Kafka: ordering per partition but not 
across them, including no global ordering. 

The case of global and per window ordering is already described by Sihua. The 
global ordering might be impractical in case of distributed system.

If a subtask of your Flink operator consumes from several partitions or there 
is no ordering at all, you can try the above approach with 
BoundedOutOfOrdernessTimestampExtractor to get approximate ordering across 
these partitions per key or all records. It is similar to ordering within a 
window. It means there could still be late records coming after out of 
orderness period of time which can break the ordering. This operator buffers 
records in state to maintain the order but only for out of orderness period of 
time which also increases latency.

Cheers,
Andrey

> On 19 Jun 2018, at 14:12, sihua zhou  wrote:
> 
> 
> 
> Hi Amol,
> 
> 
> I'm not sure whether this is impossible, especially when you need to operate 
> the record in multi parallelism. 
> 
> 
> IMO, in theroy, we can only get a ordered stream when there is a single 
> partition of kafka and operate it with a single parallelism in flink. Even in 
> this case, if you only want to order the records in a window, than you need 
> to store the records in the state, and order them when the window is 
> triggered. But if you want to order the records with a single 
> `keyBy()`(non-window), I think that's maybe impossible in practice, because 
> you need to store the all the incoming records and order the all data for 
> every incoming records, also you need to send retracted message for the 
> previous result(because every incoming record might change the global order 
> of the records).
> 
> 
> Best, Sihua
> On 06/19/2018 19:19,Amol S - iProgrammer wrote:
> Hi,
> 
> I have used flink streaming API in my application where the source of
> streaming is kafka. My kafka producer will publish data in ascending order
> of time in different partitions of kafka and consumer will read data from
> these partitions. However some kafka partitions may be slow due to some
> operation and produce late results. Is there any way to maintain order in
> this stream though the data arrive out of order. I have tried
> BoundedOutOfOrdernessTimestampExtractor but it didn't served the purpose.
> While digging this problem I came across your documentation (URL:
> https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams)
> and tried to implement this but it didnt worked. I also tried with Table
> API order by but it seems you not support orderBy in flink 1.5 version.
> Please suggest me any workaround for this.
> 
> I have raised same concern on stack overflow
> 
> https://stackoverflow.com/questions/50904615/ordering-of-streams-while-reading-data-from-multiple-kafka-partitions
> 
> Thanks,
> 
> ---
> *Amol Suryawanshi*
> Java Developer
> am...@iprogrammer.com
> 
> 
> *iProgrammer Solutions Pvt. Ltd.*
> 
> 
> 
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com 
> 



Re: Ordering of stream from different kafka partitions

2018-06-19 Thread Andrey Zagrebin
Hi Amol,

I think you could try (based on your stack overflow code)
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
like this:

DataStream streamSource = env
.addSource(kafkaConsumer)
.setParallelism(4)
.assignTimestampsAndWatermarks(
new 
BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(3500)) {
@Override
public long extractTimestamp(Event element) {
Map timeStamp = (Map) event.get("ts”);
return (long) timeStamp.get("value");
}
});

In general, if records are sorted by anything in a Kafka partition, parallel 
subtask of Flink Kafka source will consume these records and push to user 
operators in the same order. There is maximum one consuming subtask per Kafka 
partition but several partitions might be served by one subtask. It means that 
there are the same guarantees as in Kafka: ordering per partition but not 
across them, including no global ordering. 

The case of global and per window ordering is already described by Sihua. The 
global ordering might be impractical in case of distributed system.

If a subtask of your Flink operator consumes from several partitions or there 
is no ordering at all, you can try the above approach with 
BoundedOutOfOrdernessTimestampExtractor to get approximate ordering across 
these partitions per key or all records. It is similar to ordering within a 
window. It means there could still be late records coming after out of 
orderness period of time which can break the ordering. This operator buffers 
records in state to maintain the order but only for out of orderness period of 
time which also increases latency.

Cheers,
Andrey

> On 19 Jun 2018, at 14:12, sihua zhou  wrote:
> 
> 
> 
> Hi Amol,
> 
> 
> I'm not sure whether this is impossible, especially when you need to operate 
> the record in multi parallelism. 
> 
> 
> IMO, in theroy, we can only get a ordered stream when there is a single 
> partition of kafka and operate it with a single parallelism in flink. Even in 
> this case, if you only want to order the records in a window, than you need 
> to store the records in the state, and order them when the window is 
> triggered. But if you want to order the records with a single 
> `keyBy()`(non-window), I think that's maybe impossible in practice, because 
> you need to store the all the incoming records and order the all data for 
> every incoming records, also you need to send retracted message for the 
> previous result(because every incoming record might change the global order 
> of the records).
> 
> 
> Best, Sihua
> On 06/19/2018 19:19,Amol S - iProgrammer wrote:
> Hi,
> 
> I have used flink streaming API in my application where the source of
> streaming is kafka. My kafka producer will publish data in ascending order
> of time in different partitions of kafka and consumer will read data from
> these partitions. However some kafka partitions may be slow due to some
> operation and produce late results. Is there any way to maintain order in
> this stream though the data arrive out of order. I have tried
> BoundedOutOfOrdernessTimestampExtractor but it didn't served the purpose.
> While digging this problem I came across your documentation (URL:
> https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams)
> and tried to implement this but it didnt worked. I also tried with Table
> API order by but it seems you not support orderBy in flink 1.5 version.
> Please suggest me any workaround for this.
> 
> I have raised same concern on stack overflow
> 
> https://stackoverflow.com/questions/50904615/ordering-of-streams-while-reading-data-from-multiple-kafka-partitions
> 
> Thanks,
> 
> ---
> *Amol Suryawanshi*
> Java Developer
> am...@iprogrammer.com
> 
> 
> *iProgrammer Solutions Pvt. Ltd.*
> 
> 
> 
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com 
> 



[jira] [Created] (FLINK-9617) Provide alias for whole records in Table API

2018-06-19 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-9617:
-

 Summary: Provide alias for whole records in Table API
 Key: FLINK-9617
 URL: https://issues.apache.org/jira/browse/FLINK-9617
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Affects Versions: 1.5.0
Reporter: Piotr Nowojski


In SQL we can provide an alias for whole table to avoid column name collisions 
between two tables. For example:
{code:java}
SELECT
  SUM(o.amount * r.rate) 
FROM 
  Orders AS o, 
  Rates AS r
WHERE r.currency = o.currency{code}
However that's not possible in table API. In Table API user have to provide 
aliases for all of the columns, which can be annoying especially if table 
consists of tens or even hundred of columns

For example I would expect some feature like this:
{code:java}
val result = orders.as('o)
  .join(rates(`o.rowtime).as('r), "o.currency = r.currency")
  .select("SUM(o.amount * r.rate) AS amount")
{code}
where \{{rates}} is a TableFunction.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: DataStreamCalcRule$1802" grows beyond 64 KB when execute long sql.

2018-06-19 Thread Fabian Hueske
I see, then this case wasn't covered by the fix that we added for Flink
1.5.0.
I guess the problem is that the code is needed to evaluate a single field.

Implementing a scalar user-function is not very difficult [1].
However, you need to register it in the TableEnvironment before you can use
it in a SQL query.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/udfs.html#scalar-functions

2018-06-19 16:46 GMT+02:00 zhangminglei <18717838...@163.com>:

> Hi, Fabian, Absolutely, Flink 1.5.0 I am using for this. A big CASE WHEN
> statement. Is it hard to implement ? I am a new to flink table api & sql.
>
> Best Minglei.
>
> 在 2018年6月19日,下午10:36,Fabian Hueske  写道:
>
> Hi,
>
> Which version are you using? We fixed a similar issue for Flink 1.5.0.
> If you can't upgrade yet, you can also implement a user-defined function
> that evaluates the big CASE WHEN statement.
>
> Best, Fabian
>
> 2018-06-19 16:27 GMT+02:00 zhangminglei <18717838...@163.com>:
>
>> Hi, friends.
>>
>> When I execute a long sql and get the follow error, how can I have a
>> quick fix ?
>>
>> org.apache.flink.api.common.InvalidProgramException: Table program
>> cannot be compiled. This is a bug. Please file an issue.
>> at org.apache.flink.table.codegen.Compiler$class.compile(
>> Compiler.scala:36)
>> at org.apache.flink.table.runtime.CRowProcessRunner.compile(
>> CRowProcessRunner.scala:35)
>> at org.apache.flink.table.runtime.CRowProcessRunner.open(
>> CRowProcessRunner.scala:49)
>> at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>> nFunction(FunctionUtils.java:36)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.open(AbstractUdfStreamOperator.java:102)
>> at org.apache.flink.streaming.api.operators.ProcessOperator.ope
>> n(ProcessOperator.java:56)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>> perators(StreamTask.java:393)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:254)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.RuntimeException: Compiling
>> "DataStreamCalcRule$1802": Code of method "processElement(Ljava/lang/Obj
>> ect;Lorg/apache/flink/streaming/api/functions/ProcessFunctio
>> n$Context;Lorg/apache/flink/util/Collector;)V" of class
>> "DataStreamCalcRule$1802" grows beyond 64 KB
>> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.
>> java:361)
>>
>> select case when cast(status as bigint) in (200) then 10 else 1 end as
>> pv,\
>> case when cast(status as bigint) between 303 and 1000 then 1 when
>> cast(status as bigint) between 100 and 199 then 1 else 0 end as
>> service_fail,\
>> case when cast(status as bigint) in (200) then 10 when cast(status as
>> bigint) in (301,302) then 1 when cast(status as bigint) between 201 and 299
>> then 1 else 0 end as service_success,\
>> case when cast(status as bigint) not between 100 and 1000 then 1 else 0
>> end as network_fail,\
>> qqiplib(ip, 'isp') as isp,\
>> case when response_time - request_time <= 6 then response_time -
>> request_time else 0 end as response_t,\
>> case when response_time - request_time <= 6 then 1 else 0 end as
>> count_in,\
>> case when host in ('116.31.114.22','116.31.114.2
>> 3','183.60.219.231','183.60.219.232','183.60.219.235','183.
>> 60.219.236','183.60.220.231','183.60.220.232',\
>> '183.60.219.247','183.60.219.248','183.60.219.243','183.60.2
>> 19.244','183.60.219.251','183.60.219.252','116.31.114.202','
>> 116.31.114.204','116.31.114.206'\
>> ,'116.31.114.208') then '佛山力通电信_SR' \
>> when host in ('183.232.169.11','183.232.169.12','183.232.169.13','183.232
>> .169.14','183.232.169.15','183.232.169.16','183.232.169.17','183.232.169.18')
>> \
>> then '佛山力通移动_SR' \
>> when host in ('112.93.112.11','112.93.112.12','112.93.112.13','112.93.112
>> .14','112.93.112.15','112.93.112.16','112.93.112.17','112.93.112.18') \
>> then '佛山力通联通_SR' \
>> when host in ('114.67.56.79','114.67.56.80','114.67.56.83','114.67.56.84'
>> ,'114.67.56.87','114.67.56.88','114.67.56.112','114.67.56.11
>> 3','114.67.56.116',\
>> '114.67.56.117','114.67.60.214','114.67.60.215','114.67.54.
>> 111','114.67.54.112','114.67.56.95','114.67.56.96','114.67.
>> 54.12','114.67.54.13',\
>> '114.67.56.93') \
>> then '佛山力通BGP_SR' \
>> when host in ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.
>> 106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247',\
>> '183.60.219.248','114.67.60.201','114.67.60.203','114.67.60.205','114.67.60.207')
>> \
>> then '佛山力通BGP_SR' \
>> when host in ('183.240.167.24','183.240.167.25','183.240.167.26','183.240
>> .167.27','183.240.167.28','183.240.167.29','183.240.167.30','183.240.167.31')
>> \
>> then '佛山互联移动_SR' when host in ('27.45.160.24','27.45.160.25'
>> 

[jira] [Created] (FLINK-9616) DatadogHttpReporter fails to be created due to missing shaded dependency

2018-06-19 Thread Addison Higham (JIRA)
Addison Higham created FLINK-9616:
-

 Summary: DatadogHttpReporter fails to be created due to missing 
shaded dependency
 Key: FLINK-9616
 URL: https://issues.apache.org/jira/browse/FLINK-9616
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.5.0
Reporter: Addison Higham


When using the DatadogHttpReporter, it fails to instantiate with the following 
exception:


{code:java}
2018-06-19 06:01:19,640 INFO 
org.apache.flink.runtime.metrics.MetricRegistryImpl - Configuring dghttp with 
{apikey=, tags=, 
class=org.apache.flink.metrics.datadog.DatadogHttpReporter}.
2018-06-19 06:01:19,642 ERROR 
org.apache.flink.runtime.metrics.MetricRegistryImpl - Could not instantiate 
metrics reporter dghttp. Metrics might not be exposed/reported.
java.lang.NoClassDefFoundError: org/apache/flink/shaded/okhttp3/MediaType
at 
org.apache.flink.metrics.datadog.DatadogHttpClient.(DatadogHttpClient.java:45)
at 
org.apache.flink.metrics.datadog.DatadogHttpReporter.open(DatadogHttpReporter.java:105)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl.(MetricRegistryImpl.java:150)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:413)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:274)
at 
org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.initializeServices(MesosSessionClusterEntrypoint.java:92)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:225)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:189)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1889)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:188)
at 
org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.main(MesosSessionClusterEntrypoint.java:181)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.shaded.okhttp3.MediaType
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 14 more

{code}
Looking at the pom.xml for `flink-metrics-datadog` it looks like that 
dependency is intended to be shaded and included in the jar, however, when we 
build the jar we see the following lines:

 
{noformat}
$ mvn package
[INFO] Scanning for projects...
[INFO] 
[INFO] 
[INFO] Building flink-metrics-datadog 1.5.0
[INFO] 


[INFO] --- maven-shade-plugin:3.0.0:shade (shade-flink) @ flink-metrics-datadog 
---
[INFO] Excluding com.squareup.okhttp3:okhttp:jar:3.7.0 from the shaded jar.
[INFO] Excluding com.squareup.okio:okio:jar:1.12.0 from the shaded jar.
[INFO] Including org.apache.flink:force-shading:jar:1.5.0 in the shaded jar.
[INFO] Replacing original artifact with shaded artifact.
{noformat}
And inspecting the built jar:
{noformat}
$ jar tf flink-metrics-datadog-1.5.0.jar
META-INF/
META-INF/MANIFEST.MF
org/
org/apache/
org/apache/flink/
org/apache/flink/metrics/
org/apache/flink/metrics/datadog/
org/apache/flink/metrics/datadog/DatadogHttpClient$EmptyCallback.class
org/apache/flink/metrics/datadog/DMetric.class
org/apache/flink/metrics/datadog/DSeries.class
org/apache/flink/metrics/datadog/DGauge.class
org/apache/flink/metrics/datadog/DatadogHttpReporter.class
org/apache/flink/metrics/datadog/DatadogHttpClient.class
org/apache/flink/metrics/datadog/MetricType.class
org/apache/flink/metrics/datadog/DatadogHttpReporter$DatadogHttpRequest.class
org/apache/flink/metrics/datadog/DMeter.class
org/apache/flink/metrics/datadog/DCounter.class
META-INF/DEPENDENCIES
META-INF/maven/
META-INF/maven/org.apache.flink/
META-INF/maven/org.apache.flink/flink-metrics-datadog/
META-INF/maven/org.apache.flink/flink-metrics-datadog/pom.xml
META-INF/maven/org.apache.flink/flink-metrics-datadog/pom.properties
META-INF/NOTICE
{noformat}
We don't see the included dependencies

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: DataStreamCalcRule$1802" grows beyond 64 KB when execute long sql.

2018-06-19 Thread zhangminglei
Hi, Fabian, Absolutely, Flink 1.5.0 I am using for this. A big CASE WHEN 
statement. Is it hard to implement ? I am a new to flink table api & sql.

Best Minglei.

> 在 2018年6月19日,下午10:36,Fabian Hueske  写道:
> 
> Hi,
> 
> Which version are you using? We fixed a similar issue for Flink 1.5.0.
> If you can't upgrade yet, you can also implement a user-defined function that 
> evaluates the big CASE WHEN statement.
> 
> Best, Fabian
> 
> 2018-06-19 16:27 GMT+02:00 zhangminglei <18717838...@163.com 
> >:
> Hi, friends.
> 
> When I execute a long sql and get the follow error, how can I have a quick 
> fix ?
> 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
> at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
> at 
> org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
> at 
> org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Compiling "DataStreamCalcRule$1802": 
> Code of method 
> "processElement(Ljava/lang/Object;Lorg/apache/flink/streaming/api/functions/ProcessFunction$Context;Lorg/apache/flink/util/Collector;)V"
>  of class "DataStreamCalcRule$1802" grows beyond 64 KB
> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)
> 
> select case when cast(status as bigint) in (200) then 10 else 1 end as pv,\
> case when cast(status as bigint) between 303 and 1000 then 1 when cast(status 
> as bigint) between 100 and 199 then 1 else 0 end as service_fail,\
> case when cast(status as bigint) in (200) then 10 when cast(status as bigint) 
> in (301,302) then 1 when cast(status as bigint) between 201 and 299 then 1 
> else 0 end as service_success,\
> case when cast(status as bigint) not between 100 and 1000 then 1 else 0 end 
> as network_fail,\
> qqiplib(ip, 'isp') as isp,\
> case when response_time - request_time <= 6 then response_time - 
> request_time else 0 end as response_t,\
> case when response_time - request_time <= 6 then 1 else 0 end as 
> count_in,\
> case when host in 
> ('116.31.114.22','116.31.114.23','183.60.219.231','183.60.219.232','183.60.219.235','183.60.219.236','183.60.220.231','183.60.220.232',\
> '183.60.219.247','183.60.219.248','183.60.219.243','183.60.219.244','183.60.219.251','183.60.219.252','116.31.114.202','116.31.114.204','116.31.114.206'\
> ,'116.31.114.208') then '佛山力通电信_SR' \
> when host in 
> ('183.232.169.11','183.232.169.12','183.232.169.13','183.232.169.14','183.232.169.15','183.232.169.16','183.232.169.17','183.232.169.18')
>  \
> then '佛山力通移动_SR' \
> when host in 
> ('112.93.112.11','112.93.112.12','112.93.112.13','112.93.112.14','112.93.112.15','112.93.112.16','112.93.112.17','112.93.112.18')
>  \
> then '佛山力通联通_SR' \
> when host in 
> ('114.67.56.79','114.67.56.80','114.67.56.83','114.67.56.84','114.67.56.87','114.67.56.88','114.67.56.112','114.67.56.113','114.67.56.116',\
> '114.67.56.117','114.67.60.214','114.67.60.215','114.67.54.111','114.67.54.112','114.67.56.95','114.67.56.96','114.67.54.12','114.67.54.13',\
> '114.67.56.93') \
> then '佛山力通BGP_SR' \
> when host in 
> ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247',\
> '183.60.219.248','114.67.60.201','114.67.60.203','114.67.60.205','114.67.60.207')
>  \
> then '佛山力通BGP_SR' \
> when host in 
> ('183.240.167.24','183.240.167.25','183.240.167.26','183.240.167.27','183.240.167.28','183.240.167.29','183.240.167.30','183.240.167.31')
>  \
> then '佛山互联移动_SR' when host in 
> ('27.45.160.24','27.45.160.25','27.45.160.26','27.45.160.27','27.45.160.28','27.45.160.29','27.45.160.30','27.45.160.31')
>  \
> then '佛山互联联通_SR' \
> when host in 
> ('43.255.228.11','43.255.228.12','43.255.228.13','43.255.228.14','43.255.228.15','43.255.228.16','43.255.228.17','43.255.228.18','43.255.228.19',\
> '43.255.228.20','43.255.228.21','43.255.228.22','43.255.228.23') \
> then '佛山互联BGP_SR' \
> when host in 
> ('43.255.228.24','43.255.228.25','43.255.228.26','43.255.228.27','43.255.228.28','43.255.228.29','43.255.228.30','43.255.228.31','43.255.228.32',\
> '43.255.228.33','43.255.228.34') \
> then '佛山互联BGP_SR' \
> when host 

Re: DataStreamCalcRule$1802" grows beyond 64 KB when execute long sql.

2018-06-19 Thread Fabian Hueske
Hi,

Which version are you using? We fixed a similar issue for Flink 1.5.0.
If you can't upgrade yet, you can also implement a user-defined function
that evaluates the big CASE WHEN statement.

Best, Fabian

2018-06-19 16:27 GMT+02:00 zhangminglei <18717838...@163.com>:

> Hi, friends.
>
> When I execute a long sql and get the follow error, how can I have a quick
> fix ?
>
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
> at org.apache.flink.table.codegen.Compiler$class.
> compile(Compiler.scala:36)
> at org.apache.flink.table.runtime.CRowProcessRunner.
> compile(CRowProcessRunner.scala:35)
> at org.apache.flink.table.runtime.CRowProcessRunner.
> open(CRowProcessRunner.scala:49)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.
> AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.api.operators.ProcessOperator.
> open(ProcessOperator.java:56)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:393)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Compiling
> "DataStreamCalcRule$1802": Code of method "processElement(Ljava/lang/
> Object;Lorg/apache/flink/streaming/api/functions/
> ProcessFunction$Context;Lorg/apache/flink/util/Collector;)V" of class
> "DataStreamCalcRule$1802" grows beyond 64 KB
> at org.codehaus.janino.UnitCompiler.compileUnit(
> UnitCompiler.java:361)
>
> select case when cast(status as bigint) in (200) then 10 else 1 end as pv,\
> case when cast(status as bigint) between 303 and 1000 then 1 when
> cast(status as bigint) between 100 and 199 then 1 else 0 end as
> service_fail,\
> case when cast(status as bigint) in (200) then 10 when cast(status as
> bigint) in (301,302) then 1 when cast(status as bigint) between 201 and 299
> then 1 else 0 end as service_success,\
> case when cast(status as bigint) not between 100 and 1000 then 1 else 0
> end as network_fail,\
> qqiplib(ip, 'isp') as isp,\
> case when response_time - request_time <= 6 then response_time -
> request_time else 0 end as response_t,\
> case when response_time - request_time <= 6 then 1 else 0 end as
> count_in,\
> case when host in ('116.31.114.22','116.31.114.
> 23','183.60.219.231','183.60.219.232','183.60.219.235','
> 183.60.219.236','183.60.220.231','183.60.220.232',\
> '183.60.219.247','183.60.219.248','183.60.219.243','183.60.
> 219.244','183.60.219.251','183.60.219.252','116.31.114.
> 202','116.31.114.204','116.31.114.206'\
> ,'116.31.114.208') then '佛山力通电信_SR' \
> when host in ('183.232.169.11','183.232.169.12','183.232.169.13','183.
> 232.169.14','183.232.169.15','183.232.169.16','183.232.169.17','183.232.169.18')
> \
> then '佛山力通移动_SR' \
> when host in ('112.93.112.11','112.93.112.12','112.93.112.13','112.93.
> 112.14','112.93.112.15','112.93.112.16','112.93.112.17','112.93.112.18') \
> then '佛山力通联通_SR' \
> when host in ('114.67.56.79','114.67.56.80','114.67.56.83','114.67.56.84'
> ,'114.67.56.87','114.67.56.88','114.67.56.112','114.67.56.
> 113','114.67.56.116',\
> '114.67.56.117','114.67.60.214','114.67.60.215','114.67.
> 54.111','114.67.54.112','114.67.56.95','114.67.56.96','114.
> 67.54.12','114.67.54.13',\
> '114.67.56.93') \
> then '佛山力通BGP_SR' \
> when host in ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.
> 56.106','114.67.56.107','183.60.220.231','183.60.220.232','
> 183.60.219.247',\
> '183.60.219.248','114.67.60.201','114.67.60.203','114.67.60.205','114.67.60.207')
> \
> then '佛山力通BGP_SR' \
> when host in ('183.240.167.24','183.240.167.25','183.240.167.26','183.
> 240.167.27','183.240.167.28','183.240.167.29','183.240.167.30','183.240.167.31')
> \
> then '佛山互联移动_SR' when host in ('27.45.160.24','27.45.160.25'
> ,'27.45.160.26','27.45.160.27','27.45.160.28','27.45.160.29'
> ,'27.45.160.30','27.45.160.31') \
> then '佛山互联联通_SR' \
> when host in ('43.255.228.11','43.255.228.12','43.255.228.13','43.255.
> 228.14','43.255.228.15','43.255.228.16','43.255.228.17','
> 43.255.228.18','43.255.228.19',\
> '43.255.228.20','43.255.228.21','43.255.228.22','43.255.228.23') \
> then '佛山互联BGP_SR' \
> when host in ('43.255.228.24','43.255.228.25','43.255.228.26','43.255.
> 228.27','43.255.228.28','43.255.228.29','43.255.228.30','
> 43.255.228.31','43.255.228.32',\
> '43.255.228.33','43.255.228.34') \
> then '佛山互联BGP_SR' \
> when host in ('14.17.91.113','14.17.91.114','14.17.72.11','14.17.72.12','
> 14.17.72.15','14.17.72.16','14.17.72.19','14.17.72.20','
> 183.61.170.121','183.61.170.122',\
> 

DataStreamCalcRule$1802" grows beyond 64 KB when execute long sql.

2018-06-19 Thread zhangminglei
Hi, friends.

When I execute a long sql and get the follow error, how can I have a quick fix ?

org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at 
org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
at 
org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Compiling "DataStreamCalcRule$1802": 
Code of method 
"processElement(Ljava/lang/Object;Lorg/apache/flink/streaming/api/functions/ProcessFunction$Context;Lorg/apache/flink/util/Collector;)V"
 of class "DataStreamCalcRule$1802" grows beyond 64 KB
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)

select case when cast(status as bigint) in (200) then 10 else 1 end as pv,\
case when cast(status as bigint) between 303 and 1000 then 1 when cast(status 
as bigint) between 100 and 199 then 1 else 0 end as service_fail,\
case when cast(status as bigint) in (200) then 10 when cast(status as bigint) 
in (301,302) then 1 when cast(status as bigint) between 201 and 299 then 1 else 
0 end as service_success,\
case when cast(status as bigint) not between 100 and 1000 then 1 else 0 end as 
network_fail,\
qqiplib(ip, 'isp') as isp,\
case when response_time - request_time <= 6 then response_time - 
request_time else 0 end as response_t,\
case when response_time - request_time <= 6 then 1 else 0 end as count_in,\
case when host in 
('116.31.114.22','116.31.114.23','183.60.219.231','183.60.219.232','183.60.219.235','183.60.219.236','183.60.220.231','183.60.220.232',\
'183.60.219.247','183.60.219.248','183.60.219.243','183.60.219.244','183.60.219.251','183.60.219.252','116.31.114.202','116.31.114.204','116.31.114.206'\
,'116.31.114.208') then '佛山力通电信_SR' \
when host in 
('183.232.169.11','183.232.169.12','183.232.169.13','183.232.169.14','183.232.169.15','183.232.169.16','183.232.169.17','183.232.169.18')
 \
then '佛山力通移动_SR' \
when host in 
('112.93.112.11','112.93.112.12','112.93.112.13','112.93.112.14','112.93.112.15','112.93.112.16','112.93.112.17','112.93.112.18')
 \
then '佛山力通联通_SR' \
when host in 
('114.67.56.79','114.67.56.80','114.67.56.83','114.67.56.84','114.67.56.87','114.67.56.88','114.67.56.112','114.67.56.113','114.67.56.116',\
'114.67.56.117','114.67.60.214','114.67.60.215','114.67.54.111','114.67.54.112','114.67.56.95','114.67.56.96','114.67.54.12','114.67.54.13',\
'114.67.56.93') \
then '佛山力通BGP_SR' \
when host in 
('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247',\
'183.60.219.248','114.67.60.201','114.67.60.203','114.67.60.205','114.67.60.207')
 \
then '佛山力通BGP_SR' \
when host in 
('183.240.167.24','183.240.167.25','183.240.167.26','183.240.167.27','183.240.167.28','183.240.167.29','183.240.167.30','183.240.167.31')
 \
then '佛山互联移动_SR' when host in 
('27.45.160.24','27.45.160.25','27.45.160.26','27.45.160.27','27.45.160.28','27.45.160.29','27.45.160.30','27.45.160.31')
 \
then '佛山互联联通_SR' \
when host in 
('43.255.228.11','43.255.228.12','43.255.228.13','43.255.228.14','43.255.228.15','43.255.228.16','43.255.228.17','43.255.228.18','43.255.228.19',\
'43.255.228.20','43.255.228.21','43.255.228.22','43.255.228.23') \
then '佛山互联BGP_SR' \
when host in 
('43.255.228.24','43.255.228.25','43.255.228.26','43.255.228.27','43.255.228.28','43.255.228.29','43.255.228.30','43.255.228.31','43.255.228.32',\
'43.255.228.33','43.255.228.34') \
then '佛山互联BGP_SR' \
when host in 
('14.17.91.113','14.17.91.114','14.17.72.11','14.17.72.12','14.17.72.15','14.17.72.16','14.17.72.19','14.17.72.20','183.61.170.121','183.61.170.122',\
'14.17.91.120','14.17.91.121','14.17.72.79','14.17.72.80','14.17.72.81','14.17.72.82')
 \
then '东莞电信_SR' \
when host in 
('221.228.213.94','221.228.213.95','221.228.213.68','221.228.213.69','221.228.213.76','221.228.213.77','221.228.213.98','221.228.213.99',\
'221.228.213.122','221.228.213.124') \
then '无锡电信_SR' \
when host in 
('43.247.88.49','43.247.88.50','43.247.88.135','43.247.88.136','43.247.88.16','43.247.88.17','43.247.88.143','43.247.88.144','43.247.88.183','43.247.88.185',\
'43.247.88.187','43.247.88.189') \
then '无锡BGP_SR' \

Re: Please review FLINK-9610 Add Kafka partitioner that uses the key to partition by

2018-06-19 Thread Niels Basjes
I fixed the problem indicated in your comment and added an extra test for
that.
CI is currently running the tests.

Niels

On Tue, Jun 19, 2018 at 12:19 PM, Ted Yu  wrote:

> Interesting enhancement.
>
> I left a minor comment on the PR.
>
> Cheers
>
> On Tue, Jun 19, 2018 at 12:26 AM, Niels Basjes  wrote:
>
> > Hi,
> >
> > Yesterday we ran into problems regarding the distribution of records
> across
> > Kafka where Flink was used as the producer. So we fixed this and realized
> > that the code to do this would be useful to others.
> >
> > I put up a Jira ticket and pull request yesterday and it passes all
> > automated tests.
> > Please review.
> >
> > https://issues.apache.org/jira/browse/FLINK-9610
> > https://github.com/apache/flink/pull/6181
> >
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
> >
>



-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


[jira] [Created] (FLINK-9615) Add

2018-06-19 Thread JIRA
Dominik Wosiński created FLINK-9615:
---

 Summary: Add
 Key: FLINK-9615
 URL: https://issues.apache.org/jira/browse/FLINK-9615
 Project: Flink
  Issue Type: Improvement
Reporter: Dominik Wosiński


AFAIK, there is currently no possibility to use Kafka or other connectors as a 
sink in SQL Client. Such feature would be good for prototyping or quick streams 
manipulation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9614) Improve the error message for Compiler#compile

2018-06-19 Thread mingleizhang (JIRA)
mingleizhang created FLINK-9614:
---

 Summary: Improve the error message for Compiler#compile
 Key: FLINK-9614
 URL: https://issues.apache.org/jira/browse/FLINK-9614
 Project: Flink
  Issue Type: Improvement
Reporter: mingleizhang
Assignee: mingleizhang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Breakage in Flink CLI in 1.5.0

2018-06-19 Thread Chesnay Schepler
In 1.5 we reworked the job-submission to go through the REST API instead 
of akka.


I believe the jobmanager rpc port shouldn't be necessary anymore, the 
rpc address is still /required /due to some technical implementations; 
it may be that you can set this to some arbitrary value however.


As a result the REST API (i.e. the web server) must be running in order 
to submit jobs.


On 19.06.2018 14:12, Sampath Bhat wrote:

Hello

I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink 
cluster.


In flink 1.4.2 only job manager rpc address and job manager rpc port 
were sufficient for flink client to connect to job manager and submit 
the job.


But in flink 1.5.0 the flink client additionally requires the 
rest.address and rest.port for submitting the job to job manager. What 
is the advantage of this new method over the 1.4.2 method of 
submitting job?


Moreover if we make rest.port = -1 the web server will not be 
instantiated then how should we submit the job?


Regards
Sampath





Re:Ordering of stream from different kafka partitions

2018-06-19 Thread sihua zhou


Hi Amol,


I'm not sure whether this is impossible, especially when you need to operate 
the record in multi parallelism. 


IMO, in theroy, we can only get a ordered stream when there is a single 
partition of kafka and operate it with a single parallelism in flink. Even in 
this case, if you only want to order the records in a window, than you need to 
store the records in the state, and order them when the window is triggered. 
But if you want to order the records with a single `keyBy()`(non-window), I 
think that's maybe impossible in practice, because you need to store the all 
the incoming records and order the all data for every incoming records, also 
you need to send retracted message for the previous result(because every 
incoming record might change the global order of the records).


Best, Sihua
On 06/19/2018 19:19,Amol S - iProgrammer wrote:
Hi,

I have used flink streaming API in my application where the source of
streaming is kafka. My kafka producer will publish data in ascending order
of time in different partitions of kafka and consumer will read data from
these partitions. However some kafka partitions may be slow due to some
operation and produce late results. Is there any way to maintain order in
this stream though the data arrive out of order. I have tried
BoundedOutOfOrdernessTimestampExtractor but it didn't served the purpose.
While digging this problem I came across your documentation (URL:
https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams)
and tried to implement this but it didnt worked. I also tried with Table
API order by but it seems you not support orderBy in flink 1.5 version.
Please suggest me any workaround for this.

I have raised same concern on stack overflow

https://stackoverflow.com/questions/50904615/ordering-of-streams-while-reading-data-from-multiple-kafka-partitions

Thanks,

---
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com 



Breakage in Flink CLI in 1.5.0

2018-06-19 Thread Sampath Bhat
Hello

I'm using Flink 1.5.0 version and Flink CLI to submit jar to flink cluster.

In flink 1.4.2 only job manager rpc address and job manager rpc port were
sufficient for flink client to connect to job manager and submit the job.

But in flink 1.5.0 the flink client additionally requires the rest.address
and rest.port for submitting the job to job manager. What is the advantage
of this new method over the 1.4.2 method of submitting job?

Moreover if we make rest.port = -1 the web server will not be instantiated
then how should we submit the job?

Regards
Sampath


Ordering of stream from different kafka partitions

2018-06-19 Thread Amol S - iProgrammer
Hi,

I have used flink streaming API in my application where the source of
streaming is kafka. My kafka producer will publish data in ascending order
of time in different partitions of kafka and consumer will read data from
these partitions. However some kafka partitions may be slow due to some
operation and produce late results. Is there any way to maintain order in
this stream though the data arrive out of order. I have tried
BoundedOutOfOrdernessTimestampExtractor but it didn't served the purpose.
While digging this problem I came across your documentation (URL:
https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams)
and tried to implement this but it didnt worked. I also tried with Table
API order by but it seems you not support orderBy in flink 1.5 version.
Please suggest me any workaround for this.

I have raised same concern on stack overflow

https://stackoverflow.com/questions/50904615/ordering-of-streams-while-reading-data-from-multiple-kafka-partitions

Thanks,

---
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com 



Re: Please review FLINK-9610 Add Kafka partitioner that uses the key to partition by

2018-06-19 Thread Ted Yu
Interesting enhancement.

I left a minor comment on the PR.

Cheers

On Tue, Jun 19, 2018 at 12:26 AM, Niels Basjes  wrote:

> Hi,
>
> Yesterday we ran into problems regarding the distribution of records across
> Kafka where Flink was used as the producer. So we fixed this and realized
> that the code to do this would be useful to others.
>
> I put up a Jira ticket and pull request yesterday and it passes all
> automated tests.
> Please review.
>
> https://issues.apache.org/jira/browse/FLINK-9610
> https://github.com/apache/flink/pull/6181
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


[jira] [Created] (FLINK-9613) YARNSessionCapacitySchedulerITCase failed because YarnTestBase.checkClusterEmpty()

2018-06-19 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9613:
-

 Summary: YARNSessionCapacitySchedulerITCase failed because 
YarnTestBase.checkClusterEmpty()
 Key: FLINK-9613
 URL: https://issues.apache.org/jira/browse/FLINK-9613
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.6.0
Reporter: Sihua Zhou


The test YARNSessionCapacitySchedulerITCase failed on travis because of 
.YarnTestBase.checkClusterEmpty().

https://api.travis-ci.org/v3/job/394017104/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Improvements to Mesos Deployments Using Docker

2018-06-19 Thread Till Rohrmann
Hi Addison,

thanks for starting the discussion. My gut feeling is that we could solve
FLINK-9611 and FLINK-9612 both with allowing the user to specify a custom
AbstractContainerOverlay implementation. Thus, introducing an
AbstractContainerOverlayFactory instead of the specific
mesos.resourcemanager.tasks.uris option sounds favorable to me.

When being able to specify a custom overlay, we should extend the
MessoJobClusterEntrypoint to add the retrieved job graph and the use code
jars to the overlay. That way, we don't have to fetch the user code jars
via the BlobServer. But this would be a follow up task.

I'm not aware of any other redundant layers for the pre-built docker images.

Cheers,
Till

On Tue, Jun 19, 2018 at 3:57 AM Addison Higham  wrote:

> I am currently in the process of getting flink 1.5 running in a mesos
> cluster using docker.
>
> I have come across a few improvements that I think could be helpful with
> this configuration (and will probably also apply for any future
> containerized deployments, like Kubernetes)
>
> I have already created two issues to track this:
> https://issues.apache.org/jira/browse/FLINK-9611 and
> https://issues.apache.org/jira/browse/FLINK-9612.
>
> A quick summary:
> FLINK-9611 - Allow for a configuration option to add user defined artifacts
> to be downloaded into the container. This is useful for cases where you
> want to add credentials to pull a private docker image (but probably has
> many other use cases). While this could easily be done via config, it
> *might* allow for better extensiblity to dynamic classload a user defined
> overlay class, that could tweak the container specification as needed
>
> FLINK-9612 - Add an option for disabling pulling of most of the
> FlinkDistributionOverlay. Currently, if you are trying to deploy many
> TaskManagers with a pre-built docker image with a flink distribution, it is
> very wasteful, as it re-downloads all the dependencies. This can cause
> problems with swarming the MesosArtifactServer and it doesn't take too many
> nodes deploying to see some failed downloads.
>
> I am willing to implement these two features, but would be interested in
> getting some feedback.
>
> Some questions
> - Would a limited (but simple) property like `
> mesos.resourcemanager.tasks.uris` with a comma separated list of URIs be
> preferable to a more powerful (but more complex)
> `mesos.resourcemanager.tasks.user-overlay` property that, when defined,
> would use a classloader to dynamically add another overlay?
> - Is there any files that are generated by flink that would need to always
> be downloaded from as an artifact into the container? As best as I can
> tell, that isn't the case, at least in the `FlinkDistributionOverlay`
> - Are there any other overlay layers that are redundant in container
> deployment using pre-built docker images?
>
> Thanks for your feedback!
>


Please review FLINK-9610 Add Kafka partitioner that uses the key to partition by

2018-06-19 Thread Niels Basjes
Hi,

Yesterday we ran into problems regarding the distribution of records across
Kafka where Flink was used as the producer. So we fixed this and realized
that the code to do this would be useful to others.

I put up a Jira ticket and pull request yesterday and it passes all
automated tests.
Please review.

https://issues.apache.org/jira/browse/FLINK-9610
https://github.com/apache/flink/pull/6181


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: Why not put "FLIP: Network Stack Improvements" into Confluence page?

2018-06-19 Thread Tony Wei
Hi Piotrek,

Thanks for your information. =)

Best Regards,
Tony Wei

2018-06-19 15:15 GMT+08:00 Piotr Nowojski :

> Hi,
>
> Besides FLIP document describing network improvements there is not much
> more and it is actually pretty up to date.
>
> I will link this document on wiki with FLIP proposals.
>
> Piotrek
>
> > On 19 Jun 2018, at 06:22, Tony Wei  wrote:
> >
> > Hi,
> >
> > I read Flink 1.5.0 release announcements[1] recently and noticed the
> > improvements to Flink’s Network Stack.
> > Because I'm interested in knowing more design details about this
> > improvements, I tried to find more related
> > issues and discussions about it. I found it has a FLIP proposal[2] but
> not
> > mentioned in release announcements[1].
> >
> > I also found some issues and slides.
> > https://issues.apache.org/jira/browse/FLINK-7315
> > https://issues.apache.org/jira/browse/FLINK-8581
> > https://issues.apache.org/jira/browse/FLINK-7282
> > https://www.slideshare.net/FlinkForward/flink-forward-
> berlin-2017-nico-kruber-building-a-network-stack-for-
> optimal-throughput-lowlatency-tradeoffs
> >
> > Since I'm not sure if I missed any discussion in mailing list or public
> > documents, could someone share more to me?
> > And is it possible to organize these documents and issues and put on
> > Confluence page for everyone who is
> > interested in this topic could quickly find them?
> >
> > Best Regards,
> > Tony Wei
> >
> > [1] https://flink.apache.org/news/2018/05/25/release-1.5.0.html
> > [2]
> > https://docs.google.com/document/d/1chTOuOqe0sBsjldA_
> r-wXYeSIhU2zRGpUaTaik7QZ84/edit
>
>


Re: Why not put "FLIP: Network Stack Improvements" into Confluence page?

2018-06-19 Thread Piotr Nowojski
Hi,

Besides FLIP document describing network improvements there is not much more 
and it is actually pretty up to date.

I will link this document on wiki with FLIP proposals.

Piotrek

> On 19 Jun 2018, at 06:22, Tony Wei  wrote:
> 
> Hi,
> 
> I read Flink 1.5.0 release announcements[1] recently and noticed the
> improvements to Flink’s Network Stack.
> Because I'm interested in knowing more design details about this
> improvements, I tried to find more related
> issues and discussions about it. I found it has a FLIP proposal[2] but not
> mentioned in release announcements[1].
> 
> I also found some issues and slides.
> https://issues.apache.org/jira/browse/FLINK-7315
> https://issues.apache.org/jira/browse/FLINK-8581
> https://issues.apache.org/jira/browse/FLINK-7282
> https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-nico-kruber-building-a-network-stack-for-optimal-throughput-lowlatency-tradeoffs
> 
> Since I'm not sure if I missed any discussion in mailing list or public
> documents, could someone share more to me?
> And is it possible to organize these documents and issues and put on
> Confluence page for everyone who is
> interested in this topic could quickly find them?
> 
> Best Regards,
> Tony Wei
> 
> [1] https://flink.apache.org/news/2018/05/25/release-1.5.0.html
> [2]
> https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit