[jira] [Commented] (FLINK-4500) Cassandra sink can lose messages

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241624#comment-16241624
 ] 

ASF GitHub Bot commented on FLINK-4500:
---

Github user mcfongtw commented on the issue:

https://github.com/apache/flink/pull/4605
  
Hi, @zentol, since there are some conflicts in this branch and a bit out of 
date from current master, I will rebase on this branch. However, I need to know 
if you are planning to review and merge this PR recently; otherwise, I could do 
that in later time. 


> Cassandra sink can lose messages
> 
>
> Key: FLINK-4500
> URL: https://issues.apache.org/jira/browse/FLINK-4500
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector
>Affects Versions: 1.1.0
>Reporter: Elias Levy
>Assignee: Michael Fong
>
> The problem is the same as I pointed out with the Kafka producer sink 
> (FLINK-4027).  The CassandraTupleSink's send() and CassandraPojoSink's send() 
> both send data asynchronously to Cassandra and record whether an error occurs 
> via a future callback.  But CassandraSinkBase does not implement 
> Checkpointed, so it can't stop checkpoint from happening even though the are 
> Cassandra queries in flight from the checkpoint that may fail.  If they do 
> fail, they would subsequently not be replayed when the job recovered, and 
> would thus be lost.
> In addition, 
> CassandraSinkBase's close should check whether there is a pending exception 
> and throw it, rather than silently close.  It should also wait for any 
> pending async queries to complete and check their status before closing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4605: [FLINK-4500] [C* Connector] CassandraSinkBase implements ...

2017-11-06 Thread mcfongtw
Github user mcfongtw commented on the issue:

https://github.com/apache/flink/pull/4605
  
Hi, @zentol, since there are some conflicts in this branch and a bit out of 
date from current master, I will rebase on this branch. However, I need to know 
if you are planning to review and merge this PR recently; otherwise, I could do 
that in later time. 


---


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241620#comment-16241620
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/4911
  
@tillrohrmann, Yes, the Resource is a little too generic and prone to 
typos. However, the resource are various and closely related to the 
platform(YARN/MESOS), only a GPUResource and FPGAResource may not satisfy 
user's need. For example, we have at lease two types of FPGA resources in our 
cluster. And it could consider the users who need to specify extended resources 
as advanced users. General users only need to know vcore and memory, which are 
already defined in ResurceSpec. Advanced users should be familiar with not only 
flink but also the resource platform. They should know the resources types 
YARN/MESOS supports. And, If flink resource manager passes all the extended 
resource to YARN/MESOS when starting a container, it need not change when 
adding a new resource type only if YARN/MESOS can recognize it from extended 
resources. There has to be a compromise between extendibility and ease of use. 
I suggest we can add a general GPUResource and FPGAResource for general use 
while still keeping the Resource for extension. Does this make sense?


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

2017-11-06 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/4911
  
@tillrohrmann, Yes, the Resource is a little too generic and prone to 
typos. However, the resource are various and closely related to the 
platform(YARN/MESOS), only a GPUResource and FPGAResource may not satisfy 
user's need. For example, we have at lease two types of FPGA resources in our 
cluster. And it could consider the users who need to specify extended resources 
as advanced users. General users only need to know vcore and memory, which are 
already defined in ResurceSpec. Advanced users should be familiar with not only 
flink but also the resource platform. They should know the resources types 
YARN/MESOS supports. And, If flink resource manager passes all the extended 
resource to YARN/MESOS when starting a container, it need not change when 
adding a new resource type only if YARN/MESOS can recognize it from extended 
resources. There has to be a compromise between extendibility and ease of use. 
I suggest we can add a general GPUResource and FPGAResource for general use 
while st
 ill keeping the Resource for extension. Does this make sense?


---


[jira] [Closed] (FLINK-1317) Add a standalone version of the Flink Plan Visualizer to the Website

2017-11-06 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 closed FLINK-1317.
-
Resolution: Won't Fix

> Add a standalone version of the Flink Plan Visualizer to the Website
> 
>
> Key: FLINK-1317
> URL: https://issues.apache.org/jira/browse/FLINK-1317
> Project: Flink
>  Issue Type: New Feature
>  Components: Project Website
>Reporter: Stephan Ewen
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7590) Flink failed to flush and close the file system output stream for checkpointing because of s3 read timeout

2017-11-06 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-7590.
---
Resolution: Won't Fix

> Flink failed to flush and close the file system output stream for 
> checkpointing because of s3 read timeout
> --
>
> Key: FLINK-7590
> URL: https://issues.apache.org/jira/browse/FLINK-7590
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> Flink job failed once over the weekend because of the following issue. It 
> picked itself up afterwards and has been running well. But the issue might 
> worth taking a look at.
> {code:java}
> 2017-09-03 13:18:38,998 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- reduce 
> (14/18) (c97256badc87e995d456e7a13cec5de9) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 163 for operator reduce (14/18).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 163 for 
> operator reduce (14/18).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain the 
> stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.io.IOException: Could not flush and close the file system output stream 
> to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain 
> the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 7 more
>   Caused by: java.io.IOException: Could not flush and close the file 
> system output stream to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa 
> in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:693)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:531)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:420)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> 

[jira] [Comment Edited] (FLINK-7692) Support user-defined variables in Metrics

2017-11-06 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235098#comment-16235098
 ] 

Wei-Che Wei edited comment on FLINK-7692 at 11/7/17 5:36 AM:
-

Hi [~Zentol]

I proposed a draft for this issue and want to get some feedback from you.

A new {{KeyedGenericMetricGroup}} to support a new method 
{{MetricGroup#addGroup(String name, String value)}}.
{code}
class KeyedGenericMetricGroup extends AbstractMetricGroup {
  /** append name, value after parent.getScopeComponents()*/
  public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup 
parent, String name, String value)

  /** append (name -> value) to variables map */
  override public getAllVariables()
}
{code}

The Comparison between {{group.addGroup(name, value)}} and 
{{group.addGroup(name).addGroup(value)}}
| |group.addGroup(name, value)|group.addGroup(name).addGroup(value)|
|depth from parent| 1 | 2 |
|getMetricIdentifier()|\{parentIdentifier\}.name.value|\{parentIdentifier\}.name.value|
|getLogicalScope()|\{parentScope\}.name|\{parentScope\}.name.value|
|getAllVariables()|\{parentVariables\} ++ (name -> value)|\{parentVariables\}|

This can benefit the reporter such as Prometheus to use logical scope to 
aggregate the same type of metrics and distinguish each by variables.

There are some problems I met during designing this draft.
- Since {{group.addGroup(name, value)}} and 
{{group.addGroup(name).addGroup(value)}} have the same id of the returning 
metric group, there is only one metric group that will be registered. However, 
because they are not in the same layer, it is not easy to check if the other is 
exist. You should check it from your parent's or child's metric group. Is it 
acceptable to reject to register metric group from {{group.addGroup(name)}} 
when {{group.addGroup(name, value)}} has been called, vice versa?
- If the above is acceptable, what is the return value when we meet the 
conflict, because we don't want to throw RuntimeException on {{Metrics}} API 
and make the user program fail, right? I preferred to return 
{{group.addGroup(name, null)}} when called {{group.addGroup(name)}} after 
{{group.addGroup(name, some_value)}}; on the other hand, return 
{{group.addGroup(name).addGroup(value)}} and log the warning message.

What do you think? Are there other better approaches we can compare their pros 
and cons? Thank you.


was (Author: tonywei):
Hi [~Zentol]

I proposed a draft for this issue and want to get some feedback from you.

A new {{KeyedGenericMetricGroup}} to support a new method 
{{MetricGroup#addGroup(String name, String value)}}.
{code}
class KeyedGenericMetricGroup extends AbstractMetricGroup {
  /** append name, value after parent.getScopeComponents()*/
  public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup 
parent, String name, String value)

  /** append (name -> value) to variables map */
  override public getAllVariables()
}
{code}

The Comparison between {{group.addGroup(name, value)}} and 
{{group.addGroup(name).addGroup(value)}}
| |group.addGroup(name, value)|group.addGroup(name).addGroup(value)|
|depth from parent| 1 | 2 |
|getMetricIdentifier()|\{parentIdentifier\}.name.value|\{parentIdentifier\}.name.value|
|getLogicalScope()|\{parentScope\}.name|\{parentScope\}.name.value|
|getAllVariables()|\{parentVariables\} ++ (name -> value)|\{parentVariables\}|

This can benefit the reporter such as Prometheus to use logical scope to 
aggregate the same type of metrics and distinguish each by variables.

There are some problems I met during designing this draft.
- Since {{group.addGroup(name, value)}} and 
{{group.addGroup(name).addGroup(value)}} have the same id of the returning 
metric group, there is only one metric group that will be registered. However, 
because they are not in the same layer, it is not easy to check if the other is 
exist. You should check it from your parent's or child's metric group. Is it 
acceptable to reject to register metric group from {{group.addGroup(name)}} 
when {{group.addGroup(name, value)}} has been called, vice versa?
- If the above is acceptable, what is the return value when we meet the 
conflict, because we don't want to throw RuntimeException on {{Metrics}} API 
and make the user program fail, right? I preferred to return 
{{group.addGroup(name, null)}} when called {{group.addGroup(name)}} after 
{{group.addGroup(name, some_value)}}; on the other hand, return 
{{group.addGroup(name).addGroup(value) and log the warning message.

What do you think? Are there other better approaches we can compare their pros 
and cons? Thank you.

> Support user-defined variables in Metrics
> -
>
> Key: FLINK-7692
> URL: https://issues.apache.org/jira/browse/FLINK-7692
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
> 

[jira] [Closed] (FLINK-1166) Add a QA bot to Flink that is testing pull requests

2017-11-06 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 closed FLINK-1166.
-
Resolution: Fixed
  Assignee: Robert Metzger

> Add a QA bot to Flink that is testing pull requests
> ---
>
> Key: FLINK-1166
> URL: https://issues.apache.org/jira/browse/FLINK-1166
> Project: Flink
>  Issue Type: New Feature
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Minor
>  Labels: starter
>
> We should have a QA bot (similar to Hadoop) that is checking incoming pull 
> requests for a few things:
> - Changes to user-facing APIs
> - More compiler warnings than before
> - more Javadoc warnings than before
> - change of the number of files in the lib/ directory.
> - unused dependencies
> - {{@author}} tag.
> - guava (and other shaded jars) in the lib/ directory.
> It should be somehow extensible to add new tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-8001) Mark Kafka Consumer as idle if it doesn't have partitions

2017-11-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241455#comment-16241455
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-8001 at 11/7/17 4:36 AM:
-

Seems like we have to check that all the per-partition watermark emitters 
within the Kafka consumer implements the idleness logic correctly, such that we 
emit the {{IDLE}} marker instead of the {{Long.MAX_VALUE}} watermark.
I'll try to see if I can reuse the {{StatusWatermarkValve}} in the consumer.


was (Author: tzulitai):
Seems like we have to check that all the per-partition watermark logic within 
the Kafka consumer implements the idleness logic correctly.
I'll try to see if I can reuse the {{StatusWatermarkValve}} in the consumer.

> Mark Kafka Consumer as idle if it doesn't have partitions
> -
>
> Key: FLINK-8001
> URL: https://issues.apache.org/jira/browse/FLINK-8001
> Project: Flink
>  Issue Type: Bug
>Reporter: Aljoscha Krettek
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> In Flink 1.3.x the Kafka Consumer will emit a {{Long.MAX_VALUE}} watermark if 
> it has zero partitions assigned. If this happens and other parallel instances 
> of the Kafka Consumer are marked as idle (which currently never happens by 
> default but does happen in custom forks of our Kafka code) this means that 
> the watermark jumps to {{Long.MAX_VALUE}} downstream.
> In Flink 1.4.x this happens implicitly in the {{PeriodicWatermarkEmitter}} in 
> {{AbstractFetcher}} where the watermark is {{Long.MAX_VALUE}} if we don't 
> have any partitions. This should be changed to mark the source as idle 
> instead, if we don't have any partitions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4826: [FLINK-7608][metric] Refactor latency statistics metric

2017-11-06 Thread yew1eb
Github user yew1eb commented on the issue:

https://github.com/apache/flink/pull/4826
  
ping @zentol 


---


[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241468#comment-16241468
 ] 

ASF GitHub Bot commented on FLINK-7608:
---

Github user yew1eb commented on the issue:

https://github.com/apache/flink/pull/4826
  
ping @zentol 


> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.4.0
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8001) Mark Kafka Consumer as idle if it doesn't have partitions

2017-11-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241466#comment-16241466
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8001:


I don't think 1.3 isn't affected by this though.
In 1.3, if there is no partitions to consume at the very beginning, the 
`Long.MAX_VALUE` watermark is emitted as soon as the consumer starts, in which 
case the per-partition watermark code paths are never relevant, I think.

> Mark Kafka Consumer as idle if it doesn't have partitions
> -
>
> Key: FLINK-8001
> URL: https://issues.apache.org/jira/browse/FLINK-8001
> Project: Flink
>  Issue Type: Bug
>Reporter: Aljoscha Krettek
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> In Flink 1.3.x the Kafka Consumer will emit a {{Long.MAX_VALUE}} watermark if 
> it has zero partitions assigned. If this happens and other parallel instances 
> of the Kafka Consumer are marked as idle (which currently never happens by 
> default but does happen in custom forks of our Kafka code) this means that 
> the watermark jumps to {{Long.MAX_VALUE}} downstream.
> In Flink 1.4.x this happens implicitly in the {{PeriodicWatermarkEmitter}} in 
> {{AbstractFetcher}} where the watermark is {{Long.MAX_VALUE}} if we don't 
> have any partitions. This should be changed to mark the source as idle 
> instead, if we don't have any partitions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-8001) Mark Kafka Consumer as idle if it doesn't have partitions

2017-11-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241466#comment-16241466
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-8001 at 11/7/17 4:25 AM:
-

I don't think 1.3 isn't affected by this though.
In 1.3, if there is no partitions to consume at the very beginning, the 
{{Long.MAX_VALUE}} watermark is emitted as soon as the consumer starts, in 
which case the per-partition watermark code paths are never relevant, I think.


was (Author: tzulitai):
I don't think 1.3 isn't affected by this though.
In 1.3, if there is no partitions to consume at the very beginning, the 
`Long.MAX_VALUE` watermark is emitted as soon as the consumer starts, in which 
case the per-partition watermark code paths are never relevant, I think.

> Mark Kafka Consumer as idle if it doesn't have partitions
> -
>
> Key: FLINK-8001
> URL: https://issues.apache.org/jira/browse/FLINK-8001
> Project: Flink
>  Issue Type: Bug
>Reporter: Aljoscha Krettek
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> In Flink 1.3.x the Kafka Consumer will emit a {{Long.MAX_VALUE}} watermark if 
> it has zero partitions assigned. If this happens and other parallel instances 
> of the Kafka Consumer are marked as idle (which currently never happens by 
> default but does happen in custom forks of our Kafka code) this means that 
> the watermark jumps to {{Long.MAX_VALUE}} downstream.
> In Flink 1.4.x this happens implicitly in the {{PeriodicWatermarkEmitter}} in 
> {{AbstractFetcher}} where the watermark is {{Long.MAX_VALUE}} if we don't 
> have any partitions. This should be changed to mark the source as idle 
> instead, if we don't have any partitions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-8001) Mark Kafka Consumer as idle if it doesn't have partitions

2017-11-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241455#comment-16241455
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-8001 at 11/7/17 4:19 AM:
-

Seems like we have to check that all the per-partition watermark logic within 
the Kafka consumer implements the idleness logic correctly.
I'll try to see if I can reuse the {{StatusWatermarkValve}} in the consumer.


was (Author: tzulitai):
Seems like we have to check that all the per-partition watermark logic within 
the Kafka consumer implements the idleness logic correctly.
I'll try to see if I can reuse the `StatusWatermarkValve` in the consumer.

> Mark Kafka Consumer as idle if it doesn't have partitions
> -
>
> Key: FLINK-8001
> URL: https://issues.apache.org/jira/browse/FLINK-8001
> Project: Flink
>  Issue Type: Bug
>Reporter: Aljoscha Krettek
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> In Flink 1.3.x the Kafka Consumer will emit a {{Long.MAX_VALUE}} watermark if 
> it has zero partitions assigned. If this happens and other parallel instances 
> of the Kafka Consumer are marked as idle (which currently never happens by 
> default but does happen in custom forks of our Kafka code) this means that 
> the watermark jumps to {{Long.MAX_VALUE}} downstream.
> In Flink 1.4.x this happens implicitly in the {{PeriodicWatermarkEmitter}} in 
> {{AbstractFetcher}} where the watermark is {{Long.MAX_VALUE}} if we don't 
> have any partitions. This should be changed to mark the source as idle 
> instead, if we don't have any partitions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8001) Mark Kafka Consumer as idle if it doesn't have partitions

2017-11-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241455#comment-16241455
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8001:


Seems like we have to check that all the per-partition watermark logic within 
the Kafka consumer implements the idleness logic correctly.
I'll try to see if I can reuse the `StatusWatermarkValve` in the consumer.

> Mark Kafka Consumer as idle if it doesn't have partitions
> -
>
> Key: FLINK-8001
> URL: https://issues.apache.org/jira/browse/FLINK-8001
> Project: Flink
>  Issue Type: Bug
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> In Flink 1.3.x the Kafka Consumer will emit a {{Long.MAX_VALUE}} watermark if 
> it has zero partitions assigned. If this happens and other parallel instances 
> of the Kafka Consumer are marked as idle (which currently never happens by 
> default but does happen in custom forks of our Kafka code) this means that 
> the watermark jumps to {{Long.MAX_VALUE}} downstream.
> In Flink 1.4.x this happens implicitly in the {{PeriodicWatermarkEmitter}} in 
> {{AbstractFetcher}} where the watermark is {{Long.MAX_VALUE}} if we don't 
> have any partitions. This should be changed to mark the source as idle 
> instead, if we don't have any partitions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-8001) Mark Kafka Consumer as idle if it doesn't have partitions

2017-11-06 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-8001:
--

Assignee: Tzu-Li (Gordon) Tai

> Mark Kafka Consumer as idle if it doesn't have partitions
> -
>
> Key: FLINK-8001
> URL: https://issues.apache.org/jira/browse/FLINK-8001
> Project: Flink
>  Issue Type: Bug
>Reporter: Aljoscha Krettek
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> In Flink 1.3.x the Kafka Consumer will emit a {{Long.MAX_VALUE}} watermark if 
> it has zero partitions assigned. If this happens and other parallel instances 
> of the Kafka Consumer are marked as idle (which currently never happens by 
> default but does happen in custom forks of our Kafka code) this means that 
> the watermark jumps to {{Long.MAX_VALUE}} downstream.
> In Flink 1.4.x this happens implicitly in the {{PeriodicWatermarkEmitter}} in 
> {{AbstractFetcher}} where the watermark is {{Long.MAX_VALUE}} if we don't 
> have any partitions. This should be changed to mark the source as idle 
> instead, if we don't have any partitions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241412#comment-16241412
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4893
  
@tillrohrmann Thank you for you suggestions, and I think move these things 
to the `JobMaster` would be good. I think this issue could be fixed as follows:

1. Add `CompletableFuture 
requestVertexBackPressure(JobID jobId, JobVertexID vertexID, @RpcTimeout Time 
timeout);` method in `RestfulGateway` and `JobMasterGateway`.
2. Add `BackPressureStatsTracker` to `JobMaster,` and return 
`JobVertexBackPressureInfo` in method `requestVertexBackPressure`
3. Use `LegacyRestHandlerAdapter` in `DispatcherRestEndpoint` for 
`JobVertexBackPressureHandler`

What do you think? thanks


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHandler to ...

2017-11-06 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4893
  
@tillrohrmann Thank you for you suggestions, and I think move these things 
to the `JobMaster` would be good. I think this issue could be fixed as follows:

1. Add `CompletableFuture 
requestVertexBackPressure(JobID jobId, JobVertexID vertexID, @RpcTimeout Time 
timeout);` method in `RestfulGateway` and `JobMasterGateway`.
2. Add `BackPressureStatsTracker` to `JobMaster,` and return 
`JobVertexBackPressureInfo` in method `requestVertexBackPressure`
3. Use `LegacyRestHandlerAdapter` in `DispatcherRestEndpoint` for 
`JobVertexBackPressureHandler`

What do you think? thanks


---


[jira] [Commented] (FLINK-7999) Variable Join Window Boundaries

2017-11-06 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241303#comment-16241303
 ] 

Xingcan Cui commented on FLINK-7999:


Hi [~sjwiesman], thanks for opening this. Could you give some more specific 
explanations about this kind of join?

Thanks, Xingcan

> Variable Join Window Boundaries
> ---
>
> Key: FLINK-7999
> URL: https://issues.apache.org/jira/browse/FLINK-7999
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Seth Wiesman
>
> Allow window joins with variable length based on row attributes. 
> Consider a two streams joined on an id, where one has start and end dates, it 
> would be useful to be able to join each row during is live durations. Today 
> this can be expressed in the datastream api using a CoProcessFunction. 
>  left.id = right.id AND (left.time > right.start and left.time < right.end)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4964: [hotfix][docs] Add type for numLateRecordsDropped ...

2017-11-06 Thread yew1eb
GitHub user yew1eb opened a pull request:

https://github.com/apache/flink/pull/4964

[hotfix][docs] Add type for numLateRecordsDropped metric in documentation

This "numLateRecordsDropped" metric missing type described in the document:


![image](https://user-images.githubusercontent.com/4133864/32472016-f5b095d8-c325-11e7-936a-c2eff6f6dfe9.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/flink hofix_docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4964.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4964


commit 5e62f5e53324d40e536acd37796a8832c4a11d56
Author: yew1eb 
Date:   2017-11-07T01:06:45Z

[hotfix][docs] Add type for numLateRecordsDropped metric




---


[jira] [Updated] (FLINK-7475) support update() in ListState

2017-11-06 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7475:

Affects Version/s: 1.4.0

> support update() in ListState
> -
>
> Key: FLINK-7475
> URL: https://issues.apache.org/jira/browse/FLINK-7475
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: yf
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>
> If I want to update the list. 
> I have to do two steps: 
> listState.clear() 
> for (Element e : myList) { 
> listState.add(e); 
> } 
> Why not I update the state by: 
> listState.update(myList) ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7475) support update() in ListState

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241262#comment-16241262
 ] 

ASF GitHub Bot commented on FLINK-7475:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4963

[FLINK-7475] [core][state backend] support update() in ListState

## What is the purpose of the change

If users want to update the list, they have to do two steps: 

```java
listState.clear() 
for (Element e : myList) { 
listState.add(e); 
} 
```

We should enable users to do such actions by providing an API 
`listState.update(myNewList)`

## Brief change log

- Added `update()` API to `ListState`
- Added and updated unit/IT tests
- Updated Flink doc

`update()` can actually be an API in `AppendingState`, or at least in 
`MapState`. We can consider it later.

## Verifying this change

This change added tests and can be verified in 
`testListStateAddUpdateAndGet()`

## Does this pull request potentially affect one of the following parts:

  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs and JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7475

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4963.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4963


commit 6ee905066137bafe2619be24446a7ed3439ec65d
Author: Bowen Li 
Date:   2017-10-27T08:21:20Z

[FLINK-7475] add ListState#update() API and implementations to Flink

commit 390b5656988505c69367118ed295dd83cb7ae784
Author: Bowen Li 
Date:   2017-10-27T08:59:08Z

add IT tests

commit e63baa1fae56681d3760ba833b5da8c0cfbcae1a
Author: Bowen Li 
Date:   2017-10-27T09:04:04Z

update doc

commit 03be4982a490df8d4d8da805916c9a29c9174ebd
Author: Bowen Li 
Date:   2017-10-27T23:17:50Z

format comments

commit 75dcdc134f705a3a997d65d6c547fd3a35d19edd
Author: Bowen Li 
Date:   2017-10-27T23:23:55Z

checkout empty list

commit 18a952c9978b24287c01aa865d8f28beb4fe31ec
Author: Bowen Li 
Date:   2017-10-27T23:42:19Z

format code

commit c4daba92b04d3789aeb7f72419b7fdd36e11177b
Author: Bowen Li 
Date:   2017-10-28T05:12:18Z

add update() to FlinkKafkaConsumerBaseTest




> support update() in ListState
> -
>
> Key: FLINK-7475
> URL: https://issues.apache.org/jira/browse/FLINK-7475
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataStream API, State Backends, Checkpointing
>Reporter: yf
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>
> If I want to update the list. 
> I have to do two steps: 
> listState.clear() 
> for (Element e : myList) { 
> listState.add(e); 
> } 
> Why not I update the state by: 
> listState.update(myList) ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4963: [FLINK-7475] [core][state backend] support update(...

2017-11-06 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4963

[FLINK-7475] [core][state backend] support update() in ListState

## What is the purpose of the change

If users want to update the list, they have to do two steps: 

```java
listState.clear() 
for (Element e : myList) { 
listState.add(e); 
} 
```

We should enable users to do such actions by providing an API 
`listState.update(myNewList)`

## Brief change log

- Added `update()` API to `ListState`
- Added and updated unit/IT tests
- Updated Flink doc

`update()` can actually be an API in `AppendingState`, or at least in 
`MapState`. We can consider it later.

## Verifying this change

This change added tests and can be verified in 
`testListStateAddUpdateAndGet()`

## Does this pull request potentially affect one of the following parts:

  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs and JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7475

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4963.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4963


commit 6ee905066137bafe2619be24446a7ed3439ec65d
Author: Bowen Li 
Date:   2017-10-27T08:21:20Z

[FLINK-7475] add ListState#update() API and implementations to Flink

commit 390b5656988505c69367118ed295dd83cb7ae784
Author: Bowen Li 
Date:   2017-10-27T08:59:08Z

add IT tests

commit e63baa1fae56681d3760ba833b5da8c0cfbcae1a
Author: Bowen Li 
Date:   2017-10-27T09:04:04Z

update doc

commit 03be4982a490df8d4d8da805916c9a29c9174ebd
Author: Bowen Li 
Date:   2017-10-27T23:17:50Z

format comments

commit 75dcdc134f705a3a997d65d6c547fd3a35d19edd
Author: Bowen Li 
Date:   2017-10-27T23:23:55Z

checkout empty list

commit 18a952c9978b24287c01aa865d8f28beb4fe31ec
Author: Bowen Li 
Date:   2017-10-27T23:42:19Z

format code

commit c4daba92b04d3789aeb7f72419b7fdd36e11177b
Author: Bowen Li 
Date:   2017-10-28T05:12:18Z

add update() to FlinkKafkaConsumerBaseTest




---


[jira] [Updated] (FLINK-7475) support update() in ListState

2017-11-06 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7475:

Component/s: State Backends, Checkpointing

> support update() in ListState
> -
>
> Key: FLINK-7475
> URL: https://issues.apache.org/jira/browse/FLINK-7475
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataStream API, State Backends, Checkpointing
>Reporter: yf
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>
> If I want to update the list. 
> I have to do two steps: 
> listState.clear() 
> for (Element e : myList) { 
> listState.add(e); 
> } 
> Why not I update the state by: 
> listState.update(myList) ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4960: Update version to 1.5-SNAPSHOT

2017-11-06 Thread yew1eb
Github user yew1eb commented on the issue:

https://github.com/apache/flink/pull/4960
  
:clap:


---


[jira] [Created] (FLINK-8003) Support Calcite's ROW value constructor in Flink SQL

2017-11-06 Thread Shuyi Chen (JIRA)
Shuyi Chen created FLINK-8003:
-

 Summary: Support Calcite's ROW value constructor in Flink SQL
 Key: FLINK-8003
 URL: https://issues.apache.org/jira/browse/FLINK-8003
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: Shuyi Chen
Assignee: Shuyi Chen






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4962: [FLINK-8002] [table] Fix join window boundary for ...

2017-11-06 Thread fhueske
GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/4962

[FLINK-8002] [table] Fix join window boundary for LESS_THAN and 
GREATER_THAN predicates.

## What is the purpose of the change

Fix the computation of join window boundaries for LESS_THAN and 
GREATER_THAN predicates if the time attribute of the right input is referenced 
on the left side of the predicate.

## Brief change log

- check which input is referenced on which side of the predicate to 
determine whether to add or subtract 1 from the boundary.

## Verifying this change

- `JoinTest` was extended to verify the computation of the boundaries.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **n/a**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableJoinBoundaryFix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4962.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4962


commit e2ca2627c1c3fdcc09bfd20b670d287942185eb0
Author: Fabian Hueske 
Date:   2017-11-06T20:22:35Z

[FLINK-8002] [table] Fix join window boundary for LESS_THAN and 
GREATER_THAN predicates.




---


[jira] [Commented] (FLINK-8002) Incorrect join window boundaries for LESS_THAN and GREATER_THAN predicates

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240842#comment-16240842
 ] 

ASF GitHub Bot commented on FLINK-8002:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/4962

[FLINK-8002] [table] Fix join window boundary for LESS_THAN and 
GREATER_THAN predicates.

## What is the purpose of the change

Fix the computation of join window boundaries for LESS_THAN and 
GREATER_THAN predicates if the time attribute of the right input is referenced 
on the left side of the predicate.

## Brief change log

- check which input is referenced on which side of the predicate to 
determine whether to add or subtract 1 from the boundary.

## Verifying this change

- `JoinTest` was extended to verify the computation of the boundaries.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **n/a**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableJoinBoundaryFix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4962.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4962


commit e2ca2627c1c3fdcc09bfd20b670d287942185eb0
Author: Fabian Hueske 
Date:   2017-11-06T20:22:35Z

[FLINK-8002] [table] Fix join window boundary for LESS_THAN and 
GREATER_THAN predicates.




> Incorrect join window boundaries for LESS_THAN and GREATER_THAN predicates
> --
>
> Key: FLINK-8002
> URL: https://issues.apache.org/jira/browse/FLINK-8002
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.4.0
>
>
> The boundaries of LESS_THAN and GREATER_THAN predicates are not correctly 
> computed if the time attribute of the right table is referenced on the left 
> side of the join predicate.
> Instead of adding (subtracting) 1 millisecond, 1 millisecond is subtracted 
> (added). 
> Hence, the boundary is off-by-2.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8002) Incorrect join window boundaries for LESS_THAN and GREATER_THAN predicates

2017-11-06 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-8002:


 Summary: Incorrect join window boundaries for LESS_THAN and 
GREATER_THAN predicates
 Key: FLINK-8002
 URL: https://issues.apache.org/jira/browse/FLINK-8002
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.4.0
Reporter: Fabian Hueske
Assignee: Fabian Hueske
Priority: Critical
 Fix For: 1.4.0


The boundaries of LESS_THAN and GREATER_THAN predicates are not correctly 
computed if the time attribute of the right table is referenced on the left 
side of the join predicate.
Instead of adding (subtracting) 1 millisecond, 1 millisecond is subtracted 
(added). 
Hence, the boundary is off-by-2.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4960: Update version to 1.5-SNAPSHOT

2017-11-06 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4960
  
👍 


---


[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240749#comment-16240749
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4939
  
I addresses the PR comments in my latest commit but I will also have to 
adapt the S3 test to not use our S3 implementation and instead the one in the 
Hadoop version YARN uses. Stay tuned ;)


> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3a defau...

2017-11-06 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4939
  
I addresses the PR comments in my latest commit but I will also have to 
adapt the S3 test to not use our S3 implementation and instead the one in the 
Hadoop version YARN uses. Stay tuned ;)


---


[jira] [Commented] (FLINK-7973) Fix service shading relocation for S3 file systems

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240742#comment-16240742
 ] 

ASF GitHub Bot commented on FLINK-7973:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4961

[FLINK-7973] fix shading and relocating Hhadoop for the S3 filesystems

## What is the purpose of the change

The current shading of the `flink-s3-fs-hadoop` and `flink-s3-fs-presto` 
projects also relocates Flink core classes and even some from the JDK itself. 
Additionally, the relocation of Hadoop does not work as expected since Hadoop 
loads classes based on class names in its `core-default.xml` which are unshaded 
and thus use the original namespace.

## Brief change log

- adapt the `pom.xml` of both `flink-s3-fs-hadoop` and `flink-s3-fs-presto`:
  - do not shade everything and instead define include patterns explicitly
  - only shade and relocate Flink classes imported from flink-hadoop-fs
- hack around Hadoop loading (unshaded/non-relocated) classes based on 
names in the `core-default.xml` by overwriting the `Configuration` class (we 
may need to also extend this for the `mapred-default.xml` and 
`hdfs-defaults.xml` and their respective configuration classes in the future):
  - provide a `core-default-shaded.xml` file with shaded class names and
  - copy and adapt the `Configuration` class of the respective Hadoop 
version to load this file instead of `core-default.xml`

## Verifying this change

This change can (and was) manually tested as follows:
- verify the shaded `jar` file does not contain non-relocated classes
- verify the changed `Configuration` classes reside in the shaded namespace 
where the original Hadoop `Configuration` classes would go into, e.g. 
`org.apache.flink.fs.s3hadoop.shaded.org.hadoop.conf` (look for 
`core-default-shaded.xml` string in the `Configuration.class` file)
- verify the `META-INF/services` files are still correct (name + content)

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (yes)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-7973

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4961.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4961






> Fix service shading relocation for S3 file systems
> --
>
> Key: FLINK-7973
> URL: https://issues.apache.org/jira/browse/FLINK-7973
> Project: Flink
>  Issue Type: Bug
>Reporter: Stephan Ewen
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The shade plugin relocates services incorrectly currently, applying 
> relocation patterns multiple times.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4961: [FLINK-7973] fix shading and relocating Hhadoop fo...

2017-11-06 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4961

[FLINK-7973] fix shading and relocating Hhadoop for the S3 filesystems

## What is the purpose of the change

The current shading of the `flink-s3-fs-hadoop` and `flink-s3-fs-presto` 
projects also relocates Flink core classes and even some from the JDK itself. 
Additionally, the relocation of Hadoop does not work as expected since Hadoop 
loads classes based on class names in its `core-default.xml` which are unshaded 
and thus use the original namespace.

## Brief change log

- adapt the `pom.xml` of both `flink-s3-fs-hadoop` and `flink-s3-fs-presto`:
  - do not shade everything and instead define include patterns explicitly
  - only shade and relocate Flink classes imported from flink-hadoop-fs
- hack around Hadoop loading (unshaded/non-relocated) classes based on 
names in the `core-default.xml` by overwriting the `Configuration` class (we 
may need to also extend this for the `mapred-default.xml` and 
`hdfs-defaults.xml` and their respective configuration classes in the future):
  - provide a `core-default-shaded.xml` file with shaded class names and
  - copy and adapt the `Configuration` class of the respective Hadoop 
version to load this file instead of `core-default.xml`

## Verifying this change

This change can (and was) manually tested as follows:
- verify the shaded `jar` file does not contain non-relocated classes
- verify the changed `Configuration` classes reside in the shaded namespace 
where the original Hadoop `Configuration` classes would go into, e.g. 
`org.apache.flink.fs.s3hadoop.shaded.org.hadoop.conf` (look for 
`core-default-shaded.xml` string in the `Configuration.class` file)
- verify the `META-INF/services` files are still correct (name + content)

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (yes)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-7973

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4961.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4961






---


[GitHub] flink pull request #4960: Update version to 1.5-SNAPSHOT

2017-11-06 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4960

Update version to 1.5-SNAPSHOT



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink update-1.5

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4960.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4960






---


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-06 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240735#comment-16240735
 ] 

Stephan Ewen commented on FLINK-6022:
-

With the merged pull request, Avro Specific Records now automatically go 
through Avro and schema is communicated via the TypeSerializer's parameters.

The Avro Serializer also readily handles all non-specific records via the 
reflect datum readers and writers.

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7997) Avro should be always in the user code

2017-11-06 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-7997.
---

> Avro should be always in the user code
> --
>
> Key: FLINK-7997
> URL: https://issues.apache.org/jira/browse/FLINK-7997
> Project: Flink
>  Issue Type: Improvement
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> Having Avro in the user code space makes it possible for users to use 
> different Avro versions that the ones pulled in by an overloaded classpath 
> (for example when having Hadoop in the classpath)
> This is possible through the new child-first classloading in Flink 1.4.
> Also, this should fix the problem of "X cannot be cast to X", because Avro 
> classes will be scoped to the user code class loader, and the Avro schema 
> cache will not be JVM-wide-



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7997) Avro should be always in the user code

2017-11-06 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-7997.
-
Resolution: Fixed

Fixed in c85f15ead50e9961e284eef50e5dc569560db022

> Avro should be always in the user code
> --
>
> Key: FLINK-7997
> URL: https://issues.apache.org/jira/browse/FLINK-7997
> Project: Flink
>  Issue Type: Improvement
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> Having Avro in the user code space makes it possible for users to use 
> different Avro versions that the ones pulled in by an overloaded classpath 
> (for example when having Hadoop in the classpath)
> This is possible through the new child-first classloading in Flink 1.4.
> Also, this should fix the problem of "X cannot be cast to X", because Avro 
> classes will be scoped to the user code class loader, and the Avro schema 
> cache will not be JVM-wide-



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8001) Mark Kafka Consumer as idle if it doesn't have partitions

2017-11-06 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-8001:
---

 Summary: Mark Kafka Consumer as idle if it doesn't have partitions
 Key: FLINK-8001
 URL: https://issues.apache.org/jira/browse/FLINK-8001
 Project: Flink
  Issue Type: Bug
Reporter: Aljoscha Krettek
Priority: Blocker
 Fix For: 1.4.0, 1.3.3


In Flink 1.3.x the Kafka Consumer will emit a {{Long.MAX_VALUE}} watermark if 
it has zero partitions assigned. If this happens and other parallel instances 
of the Kafka Consumer are marked as idle (which currently never happens by 
default but does happen in custom forks of our Kafka code) this means that the 
watermark jumps to {{Long.MAX_VALUE}} downstream.

In Flink 1.4.x this happens implicitly in the {{PeriodicWatermarkEmitter}} in 
{{AbstractFetcher}} where the watermark is {{Long.MAX_VALUE}} if we don't have 
any partitions. This should be changed to mark the source as idle instead, if 
we don't have any partitions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4959: private scope is changed to public to resolve acce...

2017-11-06 Thread vetriselvan1187
GitHub user vetriselvan1187 opened a pull request:

https://github.com/apache/flink/pull/4959

private scope is changed to public to resolve access exception

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vetriselvan1187/flink FLINK-7998

Alternatively you can review and apply these changes as the patch at:

 

[jira] [Commented] (FLINK-8000) Sort REST handler URLs in RestServerEndpoint

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240558#comment-16240558
 ] 

ASF GitHub Bot commented on FLINK-8000:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4958

[FLINK-8000] Sort Rest handler URLS in RestServerEndpoint

## What is the purpose of the change

Introduce special `RestHandlerUrlComparator` to sort REST URLs such that
URLs with path parameters are sorted after those without or fewer.

E.g. the following order would be established

```
/jobs
/jobs/overview
/jobs/:jobid
/jobs/:jobid/config
/:*
```

## Brief change log

- Introduce `RestHandlerUrlComparator`
- Sort list of returned rest handlers in `RestServerEndpoint` before 
registering them

## Verifying this change

- `RestServerEndpointTest#testRestHandlerUrlSorting`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink sortRestHandlerUrls

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4958.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4958


commit d1c436ef0e9e446004e21e3c6be6173d15aba359
Author: Till Rohrmann 
Date:   2017-11-04T13:56:11Z

[FLINK-8000] Sort Rest handler URLS in RestServerEndpoint

Introduce special RestHandlerUrlComparator to sort REST URLs such that
URLs with path parameters are sorted after those without or fewer.

E.g. the following order would be established

/jobs
/jobs/overview
/jobs/:jobid
/jobs/:jobid/config
/:*




> Sort REST handler URLs in RestServerEndpoint
> 
>
> Key: FLINK-8000
> URL: https://issues.apache.org/jira/browse/FLINK-8000
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> In order to make the {{RestServerEndpoint}} more easily extendable, we should 
> automatically sort the returned list of rest handler when calling 
> {{RestServerEndpoint#initializeHandlers}}. That way the order in which the 
> handlers are added to the list is independent of the actual registration 
> order. This is, for example, important for the static file server which 
> always needs to be registered last.
> I propose to add a special {{String}} {{Comparator}} which considers the 
> charactor {{':'}} to be the character with the largest value. That way we 
> should get always the following sort order:
> - URLs without path parameters have precedence over similar URLs where parts 
> are replaced by path parameters (e.g. {{/jobs/overview}}, {{/jobs/:jobid}} 
> and {{/jobs/:jobid/config}}, {{/jobs/:jobid/vertices/:vertexId}})
> - Prefixes are sorted before URLs containing the prefix (e.g. {{/jobs}}, 
> {{/jobs/overview}})



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4958: [FLINK-8000] Sort Rest handler URLS in RestServerE...

2017-11-06 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4958

[FLINK-8000] Sort Rest handler URLS in RestServerEndpoint

## What is the purpose of the change

Introduce special `RestHandlerUrlComparator` to sort REST URLs such that
URLs with path parameters are sorted after those without or fewer.

E.g. the following order would be established

```
/jobs
/jobs/overview
/jobs/:jobid
/jobs/:jobid/config
/:*
```

## Brief change log

- Introduce `RestHandlerUrlComparator`
- Sort list of returned rest handlers in `RestServerEndpoint` before 
registering them

## Verifying this change

- `RestServerEndpointTest#testRestHandlerUrlSorting`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink sortRestHandlerUrls

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4958.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4958


commit d1c436ef0e9e446004e21e3c6be6173d15aba359
Author: Till Rohrmann 
Date:   2017-11-04T13:56:11Z

[FLINK-8000] Sort Rest handler URLS in RestServerEndpoint

Introduce special RestHandlerUrlComparator to sort REST URLs such that
URLs with path parameters are sorted after those without or fewer.

E.g. the following order would be established

/jobs
/jobs/overview
/jobs/:jobid
/jobs/:jobid/config
/:*




---


[jira] [Created] (FLINK-8000) Sort REST handler URLs in RestServerEndpoint

2017-11-06 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8000:


 Summary: Sort REST handler URLs in RestServerEndpoint
 Key: FLINK-8000
 URL: https://issues.apache.org/jira/browse/FLINK-8000
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Minor


In order to make the {{RestServerEndpoint}} more easily extendable, we should 
automatically sort the returned list of rest handler when calling 
{{RestServerEndpoint#initializeHandlers}}. That way the order in which the 
handlers are added to the list is independent of the actual registration order. 
This is, for example, important for the static file server which always needs 
to be registered last.

I propose to add a special {{String}} {{Comparator}} which considers the 
charactor {{':'}} to be the character with the largest value. That way we 
should get always the following sort order:

- URLs without path parameters have precedence over similar URLs where parts 
are replaced by path parameters (e.g. {{/jobs/overview}}, {{/jobs/:jobid}} and 
{{/jobs/:jobid/config}}, {{/jobs/:jobid/vertices/:vertexId}})
- Prefixes are sorted before URLs containing the prefix (e.g. {{/jobs}}, 
{{/jobs/overview}})



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7999) Variable Join Window Boundaries

2017-11-06 Thread Seth Wiesman (JIRA)
Seth Wiesman created FLINK-7999:
---

 Summary: Variable Join Window Boundaries
 Key: FLINK-7999
 URL: https://issues.apache.org/jira/browse/FLINK-7999
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: Seth Wiesman


Allow window joins with variable length based on row attributes. 

Consider a two streams joined on an id, where one has start and end dates, it 
would be useful to be able to join each row during is live durations. Today 
this can be expressed in the datastream api using a CoProcessFunction. 

 left.id = right.id AND (left.time > right.start and left.time < right.end)




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7773) Test instability in UtilsTest#testYarnFlinkResourceManagerJobManagerLostLeadership

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240485#comment-16240485
 ] 

ASF GitHub Bot commented on FLINK-7773:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4957

[FLINK-7773] [tests] Move all mocking before testing code in UtilsTest to 
avoid unfinished stubbing

## What is the purpose of the change

Move all mocking code before the actual testing code in 
UtilsTest#testYarnFlinkResourceManagerJobManagerLostLeadership.
Hopefully, this will fix the unfinished stubbing exception which still 
occurs spuriously.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink hardenUtilsTest

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4957.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4957


commit 8c572f943dd064c8d22100ced1e1f50645f6e519
Author: Till Rohrmann 
Date:   2017-11-06T16:07:38Z

[FLINK-7773] [tests] Move all mocking before testing code in UtilsTest to 
avoid unfinished stubbing

Move all mocking code before the actual testing code in 
UtilsTest#testYarnFlinkResourceManagerJobManagerLostLeadership.
Hopefully, this will fix the unfinished stubbing exception which still 
occurs spuriously.




> Test instability in 
> UtilsTest#testYarnFlinkResourceManagerJobManagerLostLeadership
> --
>
> Key: FLINK-7773
> URL: https://issues.apache.org/jira/browse/FLINK-7773
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> {{UtilsTest#testYarnFlinkResourceManagerJobManagerLostLeadership}} may result 
> in the following exception (repeated run in IntelliJ until failure, but also 
> on Travis here: https://travis-ci.org/NicoK/flink/jobs/283696974 )
> {code}
> org.apache.flink.yarn.UtilsTest "Until Failure"
> org.mockito.exceptions.misusing.UnfinishedStubbingException: 
> Unfinished stubbing detected here:
> -> at org.apache.flink.yarn.UtilsTest$1.(UtilsTest.java:171)
> E.g. thenReturn() may be missing.
> Examples of correct stubbing:
> when(mock.isOk()).thenReturn(true);
> when(mock.isOk()).thenThrow(exception);
> doThrow(exception).when(mock).someVoidMethod();
> Hints:
>  1. missing thenReturn()
>  2. you are trying to stub a final method, you naughty developer!
>  3: you are stubbing the behaviour of another mock inside before 'thenReturn' 
> instruction if completed
>   at org.apache.flink.yarn.UtilsTest$1.(UtilsTest.java:179)
>   at 
> org.apache.flink.yarn.UtilsTest.testYarnFlinkResourceManagerJobManagerLostLeadership(UtilsTest.java:95)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> 

[GitHub] flink pull request #4957: [FLINK-7773] [tests] Move all mocking before testi...

2017-11-06 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4957

[FLINK-7773] [tests] Move all mocking before testing code in UtilsTest to 
avoid unfinished stubbing

## What is the purpose of the change

Move all mocking code before the actual testing code in 
UtilsTest#testYarnFlinkResourceManagerJobManagerLostLeadership.
Hopefully, this will fix the unfinished stubbing exception which still 
occurs spuriously.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink hardenUtilsTest

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4957.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4957


commit 8c572f943dd064c8d22100ced1e1f50645f6e519
Author: Till Rohrmann 
Date:   2017-11-06T16:07:38Z

[FLINK-7773] [tests] Move all mocking before testing code in UtilsTest to 
avoid unfinished stubbing

Move all mocking code before the actual testing code in 
UtilsTest#testYarnFlinkResourceManagerJobManagerLostLeadership.
Hopefully, this will fix the unfinished stubbing exception which still 
occurs spuriously.




---


[jira] [Commented] (FLINK-7986) Introduce FilterSetOpTransposeRule to Flink

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240466#comment-16240466
 ] 

ASF GitHub Bot commented on FLINK-7986:
---

GitHub user Xpray opened a pull request:

https://github.com/apache/flink/pull/4956

[FLINK-7986][TableAPI & SQL] Introduce FilterSetOpTransposeRule to Flink



## What is the purpose of the change

Introduce FilterSetOpTransposeRule to Flink


## Brief change log

add `FilterSetOpTransposeRule.INSTANCE` to `FlinkRuleSets`


## Verifying this change


This change is already covered by existing tests, such as 
*(org.apache.flink.table.api.stream.table.SetOpTransposeTest)*.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency):  no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Xpray/flink FLINK-7986

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4956.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4956


commit be50be94109ca63ce0587f10ab5479978231237b
Author: Xpray 
Date:   2017-11-06T15:47:33Z

[FLINK-7986][TableAPI & SQL] Introduce FilterSetOpTransposeRule to Flink




> Introduce FilterSetOpTransposeRule to Flink
> ---
>
> Key: FLINK-7986
> URL: https://issues.apache.org/jira/browse/FLINK-7986
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Trivial
>
> A.unionAll(B).where.groupBy.select  
> =>
> A.where.unionAll(B.where).groupBy.select
> this rule will reduce networkIO



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7963) Add ability to call trigger savepoint on flink cluster shutdown

2017-11-06 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240467#comment-16240467
 ] 

Aljoscha Krettek commented on FLINK-7963:
-

Let me think about that for a bit but we should definitely have a solution for 
this.

> Add ability to call trigger savepoint on flink cluster shutdown
> ---
>
> Key: FLINK-7963
> URL: https://issues.apache.org/jira/browse/FLINK-7963
> Project: Flink
>  Issue Type: New Feature
>  Components: Configuration
>Reporter: Rinat Sharipov
>Priority: Trivial
>
> Hi guys, I've got an idea of a little improvement for testing flink jobs.
> All my jobs are written in the following manner, I've got a context class, 
> which contains details of job components and information about how to wire 
> them. Also I've got a bootstrap class, that initializes this context, 
> retrieves flink env from there and executes it.
> This approach provides an ability to implement jobs in the same manner and 
> simplify job testing. All I need, to do, when writing tests is to override 
> flink env with local env and override some of job components.
> Everything was well, until I wanted to enable checkpointing, and implement 
> some kind of business logic, that should be called, when checkpointing is 
> triggered. I understood, that I would like to test this logic, and the best 
> approach for me, is to trigger savepoint on flink cluster shutdown, but, when 
> I've looked through the source code, I understood, that it's quite 
> challenging and couldn't be realised using only configuration.
> So, I would like to discuss the further proposals:
> * add ability to create local env using configuration 
> `org.apache.flink.streaming.api.scala.StreamExecutionEnvironment#createLocalEnv(parallelism,
>  configiuation), currently, using scala api we have only ability to specifiy 
> parallelizm, but java api (that is used by scala api) contains such method
> * add ability to trigger savepoint in flink mini cluster on `stop`, if such 
> kind of property were specified in configuration
> What do you sink about it ? As for me, it'll give as more flexibility in 
> tests, and will not force us to use special test templates, such as 
> `SavepointMigrationTestBase`
> Thx



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4956: [FLINK-7986][TableAPI & SQL] Introduce FilterSetOp...

2017-11-06 Thread Xpray
GitHub user Xpray opened a pull request:

https://github.com/apache/flink/pull/4956

[FLINK-7986][TableAPI & SQL] Introduce FilterSetOpTransposeRule to Flink



## What is the purpose of the change

Introduce FilterSetOpTransposeRule to Flink


## Brief change log

add `FilterSetOpTransposeRule.INSTANCE` to `FlinkRuleSets`


## Verifying this change


This change is already covered by existing tests, such as 
*(org.apache.flink.table.api.stream.table.SetOpTransposeTest)*.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency):  no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Xpray/flink FLINK-7986

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4956.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4956


commit be50be94109ca63ce0587f10ab5479978231237b
Author: Xpray 
Date:   2017-11-06T15:47:33Z

[FLINK-7986][TableAPI & SQL] Introduce FilterSetOpTransposeRule to Flink




---


[jira] [Updated] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full

2017-11-06 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7949:

Priority: Critical  (was: Major)

> AsyncWaitOperator is not restarting when queue is full
> --
>
> Key: FLINK-7949
> URL: https://issues.apache.org/jira/browse/FLINK-7949
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.2
>Reporter: Bartłomiej Tartanus
>Priority: Critical
>   Original Estimate: 0.25h
>  Remaining Estimate: 0.25h
>
> Issue was describe here:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html
> Issue - AsyncWaitOperator can't restart properly after failure (thread is 
> waiting forever)
> Scenario to reproduce this issue:
> 1. The queue is full (let's assume that its capacity is N elements) 
> 2. There is some pending element waiting, so the 
> pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and 
> while-loop in addAsyncBufferEntry method is trying to add this element to 
> the queue (but element is not added because queue is full) 
> 3. Now the snapshot is taken - the whole queue of N elements is being 
> written into the ListState in snapshotState method and also (what is more 
> important) this pendingStreamElementQueueEntry is written to this list too. 
> 4. The process is being restarted, so it tries to recover all the elements 
> and put them again into the queue, but the list of recovered elements hold 
> N+1 element and our queue capacity is only N. Process is not started yet, so 
> it can not process any element and this one element is waiting endlessly. 
> But it's never added and the process will never process anything. Deadlock. 
> 5. Trigger is fired and indeed discarded because the process is not running 
> yet. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240443#comment-16240443
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4943
  
 

Btw, now users have to manually depend on `flink-avro` in their projects 
where before they didn't necessarily have to, right? If yes, we should also put 
it in the release notes.


> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in flight ...

2017-11-06 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4943
  
👍 

Btw, now users have to manually depend on `flink-avro` in their projects 
where before they didn't necessarily have to, right? If yes, we should also put 
it in the release notes.


---


[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240442#comment-16240442
 ] 

ASF GitHub Bot commented on FLINK-6022:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4943
  
I would suggest adding that last paragraph in the `release notes` field of 
FLINK-6022 so we don't forget about this.


> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4943: [FLINK-6022] [avro] Use Avro to serialize Avro in flight ...

2017-11-06 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4943
  
I would suggest adding that last paragraph in the `release notes` field of 
FLINK-6022 so we don't forget about this.


---


[jira] [Assigned] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-06 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-6022:
---

Assignee: Stephan Ewen

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-06 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-6022:

Priority: Blocker  (was: Major)

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6022) Improve support for Avro GenericRecord

2017-11-06 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-6022:

Fix Version/s: 1.4.0

> Improve support for Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
> Fix For: 1.4.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7515) allow actual 0-length content in NettyMessage#allocateBuffer()

2017-11-06 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7515.
--
   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via f1c4eb6b65c4a7c310c325e3dd3f59e89337b3ff

> allow actual 0-length content in NettyMessage#allocateBuffer()
> --
>
> Key: FLINK-7515
> URL: https://issues.apache.org/jira/browse/FLINK-7515
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
> Fix For: 1.4.0
>
>
> Previously, length {{0}} meant "unknown content length" but there are cases 
> where the actual length is 0 and we do not need a larger buffer. Let's use 
> {{-1}} for tagging the special case instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7745) add tests for ensuring NetworkBufferPool overprovisioning behaviour

2017-11-06 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7745.
--
   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via e285a4143aa86ef82690ed93db17a549ab83ff12

> add tests for ensuring NetworkBufferPool overprovisioning behaviour
> ---
>
> Key: FLINK-7745
> URL: https://issues.apache.org/jira/browse/FLINK-7745
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
> Fix For: 1.4.0
>
>
> Currently, there are no unit tests verifying {{NetworkBufferPool}}'s 
> behaviour in the case that the available number of buffers is too small for 
> it to create {{LocalBufferPool}} instances. We should add some.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7516) HybridMemorySegment: do not allow copies into a read-only ByteBuffer

2017-11-06 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7516.
--
   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via fbc12630e6439b02546398b04b60f2c764ccba71

> HybridMemorySegment: do not allow copies into a read-only ByteBuffer
> 
>
> Key: FLINK-7516
> URL: https://issues.apache.org/jira/browse/FLINK-7516
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
> Fix For: 1.4.0
>
>
> {{HybridMemorySegment#get(int, ByteBuffer, int)}} allows writing into a 
> read-only {{ByteBuffer}} but this operation should be forbidden.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7701) IllegalArgumentException in Netty bootstrap with small memory state segment size

2017-11-06 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7701.
--
   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via 88737cf9fcf15e660c920e575798e241157f6d17

> IllegalArgumentException in Netty bootstrap with small memory state segment 
> size
> 
>
> Key: FLINK-7701
> URL: https://issues.apache.org/jira/browse/FLINK-7701
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Nico Kruber
>Assignee: Nico Kruber
> Fix For: 1.4.0
>
>
> FLINK-7258 broke setting high and low watermarks for small segment sizes. We 
> should tackle both use cases.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4733: [FLINK-7701][network] really fix watermark configu...

2017-11-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4733


---


[jira] [Commented] (FLINK-7515) allow actual 0-length content in NettyMessage#allocateBuffer()

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240431#comment-16240431
 ] 

ASF GitHub Bot commented on FLINK-7515:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4592


> allow actual 0-length content in NettyMessage#allocateBuffer()
> --
>
> Key: FLINK-7515
> URL: https://issues.apache.org/jira/browse/FLINK-7515
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
>
> Previously, length {{0}} meant "unknown content length" but there are cases 
> where the actual length is 0 and we do not need a larger buffer. Let's use 
> {{-1}} for tagging the special case instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4592: [FLINK-7515][network] allow actual 0-length conten...

2017-11-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4592


---


[jira] [Commented] (FLINK-7516) HybridMemorySegment: do not allow copies into a read-only ByteBuffer

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240430#comment-16240430
 ] 

ASF GitHub Bot commented on FLINK-7516:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4593


> HybridMemorySegment: do not allow copies into a read-only ByteBuffer
> 
>
> Key: FLINK-7516
> URL: https://issues.apache.org/jira/browse/FLINK-7516
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> {{HybridMemorySegment#get(int, ByteBuffer, int)}} allows writing into a 
> read-only {{ByteBuffer}} but this operation should be forbidden.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7745) add tests for ensuring NetworkBufferPool overprovisioning behaviour

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240429#comment-16240429
 ] 

ASF GitHub Bot commented on FLINK-7745:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4758


> add tests for ensuring NetworkBufferPool overprovisioning behaviour
> ---
>
> Key: FLINK-7745
> URL: https://issues.apache.org/jira/browse/FLINK-7745
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, there are no unit tests verifying {{NetworkBufferPool}}'s 
> behaviour in the case that the available number of buffers is too small for 
> it to create {{LocalBufferPool}} instances. We should add some.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7701) IllegalArgumentException in Netty bootstrap with small memory state segment size

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240432#comment-16240432
 ] 

ASF GitHub Bot commented on FLINK-7701:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4733


> IllegalArgumentException in Netty bootstrap with small memory state segment 
> size
> 
>
> Key: FLINK-7701
> URL: https://issues.apache.org/jira/browse/FLINK-7701
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> FLINK-7258 broke setting high and low watermarks for small segment sizes. We 
> should tackle both use cases.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4593: [FLINK-7516][memory] do not allow copies into a re...

2017-11-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4593


---


[GitHub] flink pull request #4758: [FLINK-7745][tests] add tests for ensuring Network...

2017-11-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4758


---


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149098417
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -61,18 +79,17 @@
/** How many state size in mb are used */
private final int stateSizeInMB;
 
+   private final Map extendedResources = new 
HashMap<>(1);
--- End diff --

This violates the serializability of `ResourceSpec` if `Resource` itself is 
not serializable.


---


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149097234
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ResourceSpecTest {
+
+   @Test
+   public void testIsValid() throws Exception {
+   ResourceSpec rs = new ResourceSpec(1.0, 100);
+   assertTrue(rs.isValid());
+
+   rs = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("GPU", 1));
+   assertTrue(rs.isValid());
+
+   rs = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("GPU", -1));
+   assertFalse(rs.isValid());
+   }
+
+   @Test
+   public void testLessThanOrEqual() throws Exception {
+   ResourceSpec rs1 = new ResourceSpec(1.0, 100);
+   ResourceSpec rs2 = new ResourceSpec(1.0, 100);
+   assertTrue(rs1.lessThanOrEqual(rs2));
+   assertTrue(rs2.lessThanOrEqual(rs1));
+
+   rs2 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 1));
+   assertTrue(rs1.lessThanOrEqual(rs2));
+   assertFalse(rs2.lessThanOrEqual(rs1));
+
+   rs1 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 2));
+   assertFalse(rs1.lessThanOrEqual(rs2));
+   assertTrue(rs2.lessThanOrEqual(rs1));
+
+   rs2 = new ResourceSpec(1.0, 100,
+   new ResourceSpec.Resource("FPGA", 1),
+   new ResourceSpec.Resource("GPU", 1));
+   assertFalse(rs1.lessThanOrEqual(rs2));
+   assertFalse(rs2.lessThanOrEqual(rs1));
+   }
+
+   @Test
+   public void testEquals() throws Exception {
+   ResourceSpec rs1 = new ResourceSpec(1.0, 100);
+   ResourceSpec rs2 = new ResourceSpec(1.0, 100);
+   assertTrue(rs1.equals(rs2));
+   assertTrue(rs2.equals(rs1));
+
+   rs1 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 2.2));
+   rs2 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 1));
+   assertFalse(rs1.equals(rs2));
+
+   rs2 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 2.2));
+   assertTrue(rs1.equals(rs2));
+
+   rs1 = new ResourceSpec(1.0, 100,
+   new ResourceSpec.Resource("FPGA", 2),
+   new ResourceSpec.Resource("GPU", 0.5));
+   rs2 = new ResourceSpec(1.0, 100,
+   new ResourceSpec.Resource("FPGA", 2),
+   new ResourceSpec.Resource("GPU", 
ResourceSpec.ResourceAggregateType.AGGREGATE_TYPE_MAX, 0.5));
+   assertFalse(rs1.equals(rs2));
+   }
+
+   @Test
+   public void testHashCode() throws Exception {
+   ResourceSpec rs1 = new ResourceSpec(1.0, 100);
+   ResourceSpec rs2 = new ResourceSpec(1.0, 100);
+   assertEquals(rs1.hashCode(), rs2.hashCode());
+
+   rs1 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 2.2));
+   rs2 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 1));
+   assertFalse(rs1.hashCode() == rs2.hashCode());
+
+   rs2 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 2.2));
+   assertEquals(rs1.hashCode(), rs2.hashCode());
+
+   rs1 = new ResourceSpec(1.0, 100,
+   new ResourceSpec.Resource("FPGA", 2),
+   new 

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149096457
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ResourceSpecTest {
--- End diff --

JavaDocs would also be good.


---


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149096221
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +238,81 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   private void addResource(String name, double value, 
ResourceAggregateType type) {
+   extendedResources.put(name, new Resource(name, type, value));
+   }
+
+   public static class Resource {
+   private String name;
+   private ResourceAggregateType type;
+   private Double value;
+
+   public Resource(String name, double value) {
+   this(name, ResourceAggregateType.AGGREGATE_TYPE_SUM, 
value);
+   }
+
+   public Resource(String name, ResourceAggregateType type, double 
value) {
+   this.name = name;
+   this.type = type;
--- End diff --

`checkNotNull` missing


---


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149096402
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ResourceSpecTest {
--- End diff --

Tests should extend from the `TestLogger`


---


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149096804
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ResourceSpecTest {
+
+   @Test
+   public void testIsValid() throws Exception {
+   ResourceSpec rs = new ResourceSpec(1.0, 100);
+   assertTrue(rs.isValid());
+
+   rs = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("GPU", 1));
+   assertTrue(rs.isValid());
+
+   rs = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("GPU", -1));
+   assertFalse(rs.isValid());
+   }
+
+   @Test
+   public void testLessThanOrEqual() throws Exception {
+   ResourceSpec rs1 = new ResourceSpec(1.0, 100);
+   ResourceSpec rs2 = new ResourceSpec(1.0, 100);
+   assertTrue(rs1.lessThanOrEqual(rs2));
+   assertTrue(rs2.lessThanOrEqual(rs1));
+
+   rs2 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 1));
+   assertTrue(rs1.lessThanOrEqual(rs2));
+   assertFalse(rs2.lessThanOrEqual(rs1));
+
+   rs1 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 2));
--- End diff --

reusing variable names makes things harder to follow in a tests. Thus, I 
would recommend introducing a fresh name.


---


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149098202
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +238,81 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   private void addResource(String name, double value, 
ResourceAggregateType type) {
+   extendedResources.put(name, new Resource(name, type, value));
+   }
+
+   public static class Resource {
+   private String name;
+   private ResourceAggregateType type;
+   private Double value;
+
+   public Resource(String name, double value) {
+   this(name, ResourceAggregateType.AGGREGATE_TYPE_SUM, 
value);
--- End diff --

`ResourceAggregateType` should be the last argument since `name` and 
`value` are properly passed to this method.


---


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240410#comment-16240410
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149097234
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ResourceSpecTest {
+
+   @Test
+   public void testIsValid() throws Exception {
+   ResourceSpec rs = new ResourceSpec(1.0, 100);
+   assertTrue(rs.isValid());
+
+   rs = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("GPU", 1));
+   assertTrue(rs.isValid());
+
+   rs = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("GPU", -1));
+   assertFalse(rs.isValid());
+   }
+
+   @Test
+   public void testLessThanOrEqual() throws Exception {
+   ResourceSpec rs1 = new ResourceSpec(1.0, 100);
+   ResourceSpec rs2 = new ResourceSpec(1.0, 100);
+   assertTrue(rs1.lessThanOrEqual(rs2));
+   assertTrue(rs2.lessThanOrEqual(rs1));
+
+   rs2 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 1));
+   assertTrue(rs1.lessThanOrEqual(rs2));
+   assertFalse(rs2.lessThanOrEqual(rs1));
+
+   rs1 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 2));
+   assertFalse(rs1.lessThanOrEqual(rs2));
+   assertTrue(rs2.lessThanOrEqual(rs1));
+
+   rs2 = new ResourceSpec(1.0, 100,
+   new ResourceSpec.Resource("FPGA", 1),
+   new ResourceSpec.Resource("GPU", 1));
+   assertFalse(rs1.lessThanOrEqual(rs2));
+   assertFalse(rs2.lessThanOrEqual(rs1));
+   }
+
+   @Test
+   public void testEquals() throws Exception {
+   ResourceSpec rs1 = new ResourceSpec(1.0, 100);
+   ResourceSpec rs2 = new ResourceSpec(1.0, 100);
+   assertTrue(rs1.equals(rs2));
+   assertTrue(rs2.equals(rs1));
+
+   rs1 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 2.2));
+   rs2 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 1));
+   assertFalse(rs1.equals(rs2));
+
+   rs2 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 2.2));
+   assertTrue(rs1.equals(rs2));
+
+   rs1 = new ResourceSpec(1.0, 100,
+   new ResourceSpec.Resource("FPGA", 2),
+   new ResourceSpec.Resource("GPU", 0.5));
+   rs2 = new ResourceSpec(1.0, 100,
+   new ResourceSpec.Resource("FPGA", 2),
+   new ResourceSpec.Resource("GPU", 
ResourceSpec.ResourceAggregateType.AGGREGATE_TYPE_MAX, 0.5));
+   assertFalse(rs1.equals(rs2));
+   }
+
+   @Test
+   public void testHashCode() throws Exception {
+   ResourceSpec rs1 = new ResourceSpec(1.0, 100);
+   ResourceSpec rs2 = new ResourceSpec(1.0, 100);
+   assertEquals(rs1.hashCode(), rs2.hashCode());
+
+   rs1 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 2.2));
+   rs2 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 1));
+   assertFalse(rs1.hashCode() == rs2.hashCode());
+
+   rs2 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 2.2));
+  

[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240412#comment-16240412
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149096402
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ResourceSpecTest {
--- End diff --

Tests should extend from the `TestLogger`


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240409#comment-16240409
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149098417
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -61,18 +79,17 @@
/** How many state size in mb are used */
private final int stateSizeInMB;
 
+   private final Map extendedResources = new 
HashMap<>(1);
--- End diff --

This violates the serializability of `ResourceSpec` if `Resource` itself is 
not serializable.


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240411#comment-16240411
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149096457
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ResourceSpecTest {
--- End diff --

JavaDocs would also be good.


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149096169
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +238,81 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   private void addResource(String name, double value, 
ResourceAggregateType type) {
--- End diff --

Who calls this method?


---


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240415#comment-16240415
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149098202
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +238,81 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   private void addResource(String name, double value, 
ResourceAggregateType type) {
+   extendedResources.put(name, new Resource(name, type, value));
+   }
+
+   public static class Resource {
+   private String name;
+   private ResourceAggregateType type;
+   private Double value;
+
+   public Resource(String name, double value) {
+   this(name, ResourceAggregateType.AGGREGATE_TYPE_SUM, 
value);
--- End diff --

`ResourceAggregateType` should be the last argument since `name` and 
`value` are properly passed to this method.


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149096103
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +238,81 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   private void addResource(String name, double value, 
ResourceAggregateType type) {
+   extendedResources.put(name, new Resource(name, type, value));
+   }
+
+   public static class Resource {
+   private String name;
+   private ResourceAggregateType type;
+   private Double value;
--- End diff --

fields should be `final`


---


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240414#comment-16240414
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149096804
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ResourceSpecTest {
+
+   @Test
+   public void testIsValid() throws Exception {
+   ResourceSpec rs = new ResourceSpec(1.0, 100);
+   assertTrue(rs.isValid());
+
+   rs = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("GPU", 1));
+   assertTrue(rs.isValid());
+
+   rs = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("GPU", -1));
+   assertFalse(rs.isValid());
+   }
+
+   @Test
+   public void testLessThanOrEqual() throws Exception {
+   ResourceSpec rs1 = new ResourceSpec(1.0, 100);
+   ResourceSpec rs2 = new ResourceSpec(1.0, 100);
+   assertTrue(rs1.lessThanOrEqual(rs2));
+   assertTrue(rs2.lessThanOrEqual(rs1));
+
+   rs2 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 1));
+   assertTrue(rs1.lessThanOrEqual(rs2));
+   assertFalse(rs2.lessThanOrEqual(rs1));
+
+   rs1 = new ResourceSpec(1.0, 100, new 
ResourceSpec.Resource("FPGA", 2));
--- End diff --

reusing variable names makes things harder to follow in a tests. Thus, I 
would recommend introducing a fresh name.


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240407#comment-16240407
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149096169
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +238,81 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   private void addResource(String name, double value, 
ResourceAggregateType type) {
--- End diff --

Who calls this method?


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240408#comment-16240408
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149096103
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +238,81 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   private void addResource(String name, double value, 
ResourceAggregateType type) {
+   extendedResources.put(name, new Resource(name, type, value));
+   }
+
+   public static class Resource {
+   private String name;
+   private ResourceAggregateType type;
+   private Double value;
--- End diff --

fields should be `final`


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240413#comment-16240413
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4911#discussion_r149096221
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
@@ -183,17 +238,81 @@ public int hashCode() {
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
result = 31 * result + stateSizeInMB;
+   result = 31 * result + extendedResources.hashCode();
return result;
}
 
@Override
public String toString() {
+   String extend = "";
+   for (Resource resource : extendedResources.values()) {
+   extend += ", " + resource.name + "=" + resource.value;
+   }
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
", nativeMemoryInMB=" + nativeMemoryInMB +
-   ", stateSizeInMB=" + stateSizeInMB +
+   ", stateSizeInMB=" + stateSizeInMB + extend +
'}';
}
+
+   private void addResource(String name, double value, 
ResourceAggregateType type) {
+   extendedResources.put(name, new Resource(name, type, value));
+   }
+
+   public static class Resource {
+   private String name;
+   private ResourceAggregateType type;
+   private Double value;
+
+   public Resource(String name, double value) {
+   this(name, ResourceAggregateType.AGGREGATE_TYPE_SUM, 
value);
+   }
+
+   public Resource(String name, ResourceAggregateType type, double 
value) {
+   this.name = name;
+   this.type = type;
--- End diff --

`checkNotNull` missing


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5513) Remove relocation of internal API classes

2017-11-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5513:

Fix Version/s: (was: 1.4.0)
   1.5.0

> Remove relocation of internal API classes
> -
>
> Key: FLINK-5513
> URL: https://issues.apache.org/jira/browse/FLINK-5513
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
> Fix For: 1.5.0
>
>
> Currently, we are relocating the {{curator}} dependency in order to avoid 
> conflicts with user code classes. This happens for example in the 
> {{flink-runtime}} module. The problem with that is that {{curator}} classes, 
> such as the {{CuratorFramework}}, are part of Flink's internal API. So for 
> example, the {{ZooKeeperStateHandleStore}} requires a {{CuratorFramework}} as 
> argument in order to instantiate it. By relocating {{curator}} it is no 
> longer possible to use this class outside of {{flink-runtime}} without some 
> nasty tricks (see {{flink-mesos}} for that).
> I think it is not good practice to relocate internal API classes because it 
> hinders easy code reuse. I propose to either introduce our own interfaces 
> which abstract the {{CuratorFramework}} away or (imo the better solution) to 
> get rid of the {{Curator}} relocation. The latter might entail that we 
> properly separate the API modules from the runtime modules so that users 
> don't have to pull in the runtime dependencies if they want to develop a 
> Flink job.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7990) Strange behavior when configuring Logback for logging

2017-11-06 Thread Teena K (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240404#comment-16240404
 ] 

Teena K commented on FLINK-7990:


I always restart the cluster after updating the jars in the lib folder. 
I don't always restart it after running a job. But to test this logging issue, 
I have restarted the cluster after each job run. In such cases, the logs work 
sometimes after the restart, but not always. 

> Strange behavior when configuring Logback for logging
> -
>
> Key: FLINK-7990
> URL: https://issues.apache.org/jira/browse/FLINK-7990
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration, Logging
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> The following issue was reported on the [user 
> mailinglist|https://lists.apache.org/thread.html/c06a9f0b1189bf21d946d3d9728631295c88bfc57043cdbe18409d52@%3Cuser.flink.apache.org%3E]
> {quote}
> I have a single node Flink instance which has the required jars for logback 
> in the lib folder (logback-classic.jar, logback-core.jar, 
> log4j-over-slf4j.jar). I have removed the jars for log4j from the lib folder 
> (log4j-1.2.17.jar, slf4j-log4j12-1.7.7.jar). 'logback.xml' is also correctly 
> updated in 'conf' folder. I have also included 'logback.xml' in the 
> classpath, although this does not seem to be considered while the job is run. 
> Flink refers to logback.xml inside the conf folder only. I have updated 
> pom.xml as per Flink's documentation in order to exclude log4j. I have some 
> log entries set inside a few map and flatmap functions and some log entries 
> outside those functions (eg: "program execution started").
> When I run the job, Flink writes only those logs that are coded outside the 
> transformations. Those logs that are coded inside the transformations (map, 
> flatmap etc) are not getting written to the log file. If this was happening 
> always, I could have assumed that the Task Manager is not writing the logs. 
> But Flink displays a strange behavior regarding this. Whenever I update the 
> logback jars inside the the lib folder(due to version changes), during the 
> next job run, all logs (even those inside map and flatmap) are written 
> correctly into the log file. But the logs don't get written in any of the 
> runs after that. This means that my 'logback.xml' file is correct and the 
> settings are also correct. But I don't understand why the same settings don't 
> work while the same job is run again.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7726) Move marshalling testbases out of legacy namespace

2017-11-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-7726:

Description: 
The marshalling test bases currently reside under 
{{org.apache.flink.runtime.rest.handler.legacy.messages}} which doesn't make 
sense as this isn't legacy code.

We should do this once the port of all handlers has been finished to avoid 
merge conflicts.

  was:The marshalling test bases currently reside under 
{{org.apache.flink.runtime.rest.handler.legacy.messages}} which doesn't make 
sense as this isn't really legacy code.


> Move marshalling testbases out of legacy namespace
> --
>
> Key: FLINK-7726
> URL: https://issues.apache.org/jira/browse/FLINK-7726
> Project: Flink
>  Issue Type: Improvement
>  Components: REST, Tests
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.5.0
>
>
> The marshalling test bases currently reside under 
> {{org.apache.flink.runtime.rest.handler.legacy.messages}} which doesn't make 
> sense as this isn't legacy code.
> We should do this once the port of all handlers has been finished to avoid 
> merge conflicts.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-11-06 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-7878:
-
Issue Type: Improvement  (was: Bug)

> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7993) Kafka 08 curator shading pattern out-of-sync

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240393#comment-16240393
 ] 

ASF GitHub Bot commented on FLINK-7993:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4953


> Kafka 08 curator shading pattern out-of-sync
> 
>
> Key: FLINK-7993
> URL: https://issues.apache.org/jira/browse/FLINK-7993
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The kafka 08 shading pattern for curator is out-of-sync with flink-runtime.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4953: [FLINK-7993][kafka] Sync curator shading patterns

2017-11-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4953


---


[jira] [Closed] (FLINK-7993) Kafka 08 curator shading pattern out-of-sync

2017-11-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7993.
---
Resolution: Fixed

1.4: 7978a174474fed8a3d8e6eecfdc13bb129745893

> Kafka 08 curator shading pattern out-of-sync
> 
>
> Key: FLINK-7993
> URL: https://issues.apache.org/jira/browse/FLINK-7993
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The kafka 08 shading pattern for curator is out-of-sync with flink-runtime.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7993) Kafka 08 curator shading pattern out-of-sync

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240385#comment-16240385
 ] 

ASF GitHub Bot commented on FLINK-7993:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4953
  
merging.


> Kafka 08 curator shading pattern out-of-sync
> 
>
> Key: FLINK-7993
> URL: https://issues.apache.org/jira/browse/FLINK-7993
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The kafka 08 shading pattern for curator is out-of-sync with flink-runtime.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4953: [FLINK-7993][kafka] Sync curator shading patterns

2017-11-06 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4953
  
merging.


---


[jira] [Updated] (FLINK-7009) dogstatsd mode in statsd reporter

2017-11-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-7009:

Fix Version/s: (was: 1.4.0)
   1.5.0

> dogstatsd mode in statsd reporter
> -
>
> Key: FLINK-7009
> URL: https://issues.apache.org/jira/browse/FLINK-7009
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
> Environment: org.apache.flink.metrics.statsd.StatsDReporter
>Reporter: David Brinegar
> Fix For: 1.5.0
>
>
> The current statsd reporter can only report a subset of Flink metrics owing 
> to the manner in which Flink variables are handled, mainly around invalid 
> characters and metrics too long.  As an option, it would be quite useful to 
> have a stricter dogstatsd compliant output.  Dogstatsd metrics are tagged, 
> should be less than 200 characters including tag names and values, be 
> alphanumeric + underbar, delimited by periods.  As a further pragmatic 
> restriction, negative and other invalid values should be ignored rather than 
> sent to the backend.  These restrictions play well with a broad set of 
> collectors and time series databases.
> This mode would:
> * convert output to ascii alphanumeric characters with underbar, delimited by 
> periods.  Runs of invalid characters within a metric segment would be 
> collapsed to a single underbar.
> * report all Flink variables as tags
> * compress overly long segments, say over 50 chars, to a symbolic 
> representation of the metric name, to preserve the unique metric time series 
> but avoid downstream truncation
> * compress 32 character Flink IDs like tm_id, task_id, job_id, 
> task_attempt_id, to the first 8 characters, again to preserve enough 
> distinction amongst metrics while trimming up to 96 characters from the metric
> * remove object references from names, such as the instance hash id of the 
> serializer
> * drop negative or invalid numeric values such as "n/a", "-1" which is used 
> for unknowns like JVM.Memory.NonHeap.Max, and "-9223372036854775808" which is 
> used for unknowns like currentLowWaterMark
> With these in place, it becomes quite reasonable to support LatencyGauge 
> metrics as well.
> One idea for symbolic compression is to take the first 10 valid characters 
> plus a hash of the long name.  For example, a value like this operator_name:
> {code:java}
> TriggerWindow(TumblingProcessingTimeWindows(5000), 
> ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@f3395ffa,
>  
> reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@4201c465},
>  ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))
> {code}
> would first drop the instance references.  The stable version would be:
>  
> {code:java}
> TriggerWindow(TumblingProcessingTimeWindows(5000), 
> ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer,
>  
> reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1},
>  ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java-301))
> {code}
> and then the compressed name would be the first ten valid characters plus the 
> hash of the stable string:
> {code}
> TriggerWin_d8c007da
> {code}
> This is just one way of dealing with unruly default names, the main point 
> would be to preserve the metrics so they are valid, avoid truncation, and can 
> be aggregated along other dimensions even if this particular dimension is 
> hard to parse after the compression.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6900) Limit size of indiivual components in DropwizardReporter

2017-11-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-6900:

Fix Version/s: (was: 1.3.3)
   (was: 1.4.0)
   1.5.0

> Limit size of indiivual components in DropwizardReporter
> 
>
> Key: FLINK-6900
> URL: https://issues.apache.org/jira/browse/FLINK-6900
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6437) Move history server configuration to a separate file

2017-11-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-6437:

Fix Version/s: (was: 1.4.0)
   1.5.0

> Move history server configuration to a separate file
> 
>
> Key: FLINK-6437
> URL: https://issues.apache.org/jira/browse/FLINK-6437
> Project: Flink
>  Issue Type: Improvement
>  Components: History Server
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
> Fix For: 1.5.0
>
>
> I suggest to keep the {{flink-conf.yaml}} leaner by moving configuration of 
> the History Server to a different file.
> In general, I would propose to move configurations of separate, independent 
> and optional components to individual config files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7341) Update code snippets in documentation

2017-11-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-7341:

Issue Type: Improvement  (was: Sub-task)
Parent: (was: FLINK-7242)

> Update code snippets in documentation 
> --
>
> Key: FLINK-7341
> URL: https://issues.apache.org/jira/browse/FLINK-7341
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Eron Wright 
>Priority: Minor
> Fix For: 1.4.0
>
>
> Consider updating the documentation to use Java 8 lambdas where possible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   >