[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15368921#comment-15368921
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70161839
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+ Event Time for Consumed Records
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+
+
+
+If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used by default. 
This timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
+ascending).
+
+Users can choose to override this default with a custom timestamp, as 
described [here]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html),
+or use one from the [predefined ones]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html). After doing so,
+it can be passed to the consumer in the following way:
+
+
+
+{% highlight java %}
+DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
+"kinesis_stream_name", new SimpleStringSchema(), 
kinesisConsumerConfig));
+kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
--- End diff --

Thanks Robert.
I'm not quite sure about the problem with using 
`assignTimestampsAndWatermarks()` here, can you explain? I looked at the code, 
and from my understanding there's not much difference  with 
`assignTimestamps()` except that `assignTimestamps()` is deprecated.


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

2016-07-08 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70161839
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+ Event Time for Consumed Records
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+
+
+
+If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used by default. 
This timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
+ascending).
+
+Users can choose to override this default with a custom timestamp, as 
described [here]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html),
+or use one from the [predefined ones]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html). After doing so,
+it can be passed to the consumer in the following way:
+
+
+
+{% highlight java %}
+DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
+"kinesis_stream_name", new SimpleStringSchema(), 
kinesisConsumerConfig));
+kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
--- End diff --

Thanks Robert.
I'm not quite sure about the problem with using 
`assignTimestampsAndWatermarks()` here, can you explain? I looked at the code, 
and from my understanding there's not much difference  with 
`assignTimestamps()` except that `assignTimestamps()` is deprecated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1517: [FLINK-3477] [runtime] Add hash-based combine stra...

2016-07-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r70146784
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

I think it is OK to add overloaded `reduce()` methods to `DataSet` and 
`GroupedDataSet`. These methods should be `PublicEvolving`. I would not add 
overloaded methods to more specialized operations as in the first approach to 
add the `CombineHint` to the Java API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15368557#comment-15368557
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r70146784
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

I think it is OK to add overloaded `reduce()` methods to `DataSet` and 
`GroupedDataSet`. These methods should be `PublicEvolving`. I would not add 
overloaded methods to more specialized operations as in the first approach to 
add the `CombineHint` to the Java API.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15368461#comment-15368461
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r70139954
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

@fhueske or @StephanEwen since we cannot break the scala DataSet API by 
creating and returning a `ReduceOperator`, do you agree with Gábor's 
recommendation to overload `DataSet.reduce` with `CombineHint`?


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #1517: [FLINK-3477] [runtime] Add hash-based combine stra...

2016-07-08 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r70139954
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

@fhueske or @StephanEwen since we cannot break the scala DataSet API by 
creating and returning a `ReduceOperator`, do you agree with Gábor's 
recommendation to overload `DataSet.reduce` with `CombineHint`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4172) Don't proxy a ProxiedObject

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15368272#comment-15368272
 ] 

ASF GitHub Bot commented on FLINK-4172:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2213


> Don't proxy a ProxiedObject
> ---
>
> Key: FLINK-4172
> URL: https://issues.apache.org/jira/browse/FLINK-4172
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Some graph algorithms pass through a DataSet unmodified (at least, until we 
> have VertexSet and EdgeSet). We need to prevent a DataSet from being proxied 
> twice. Allowing two methods to own a single object sounds brittle, so we can 
> instead provide access to the original DataSet which can be wrapped in a new 
> proxy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4172) Don't proxy a ProxiedObject

2016-07-08 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-4172.
-
Resolution: Fixed

Fixed in 6c5eebb7e40439310b6949e35b67c72a3dc61c75

> Don't proxy a ProxiedObject
> ---
>
> Key: FLINK-4172
> URL: https://issues.apache.org/jira/browse/FLINK-4172
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Some graph algorithms pass through a DataSet unmodified (at least, until we 
> have VertexSet and EdgeSet). We need to prevent a DataSet from being proxied 
> twice. Allowing two methods to own a single object sounds brittle, so we can 
> instead provide access to the original DataSet which can be wrapped in a new 
> proxy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2213: [FLINK-4172] [gelly] Don't proxy a ProxiedObject

2016-07-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2213


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4116) Document metrics

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15368189#comment-15368189
 ] 

ASF GitHub Bot commented on FLINK-4116:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r70123498
  
--- Diff: docs/apis/metrics.md ---
@@ -0,0 +1,441 @@
+---
+title: "Metrics"
+# Top-level navigation
+top-nav-group: apis
+top-nav-pos: 13
+top-nav-title: "Metrics"
+---
+
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+ Counter
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a `MetricGroup`.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+this.counter = getRuntimeContext()
+  .getMetricGroup()
+  .counter("myCounter");
+  }
+
+  @public Integer map(String value) throws Exception {
+this.counter.inc();
+  }
+}
+
+{% endhighlight %}
+
+Alternatively you can also use your own `Counter` implementation:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+this.counter = getRuntimeContext()
+  .getMetricGroup()
+  .counter("myCustomCounter", new CustomCounter());
+  }
+}
+
+{% endhighlight %}
+
+ Gauge
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is no restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
`MetricGroup`.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private int valueToExpose;
+
+  @Override
+  public void open(Configuration config) {
+getRuntimeContext()
+  .getMetricGroup()
+  .gauge("MyGauge", new Gauge() {
+@Override
+public Integer getValue() {
+  return valueToExpose;
+}
+  });
+  }
+}
+
+{% endhighlight %}
+
+Note that reporters will turn the exposed object into a `String`, which 
means that a meaningful `toString()` implementation is required.
+
+ Histogram
+
+A `Histogram` measures the distribution of long values.
+You can register one by calling `histogram(String name, Histogram 
histogram)` on a `MetricGroup`.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+this.histogram = getRuntimeContext()
+  .getMetricGroup()
+  .histogram("myHistogram", new MyHistogram());
+  }
+
+  @public Integer map(Long value) throws Exception {
+this.histogram.update(value);
+  }
+}
+{% endhighlight %}
+
+Flink does not provide a default implementation for `Histogram`, but 
offers a {% gh_link 
flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
 "Wrapper" %} that allows usage of Codahale/DropWizard histograms.
+To use this wrapper add the following dependency in your `pom.xml`:
+{% highlight xml %}
+
+  org.apache.flink
+  flink-metrics-dropwizard
+  {{site.version}}
+
+{% endhighlight %}
+
+You can then register a Codahale/DropWizard histogram like this:
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+com.codahale.metrics.Histogram histogram =
+  new com.codahale.metrics.Histogram(new 

[GitHub] flink pull request #2158: [FLINK-4116] Metrics documentation

2016-07-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r70123498
  
--- Diff: docs/apis/metrics.md ---
@@ -0,0 +1,441 @@
+---
+title: "Metrics"
+# Top-level navigation
+top-nav-group: apis
+top-nav-pos: 13
+top-nav-title: "Metrics"
+---
+
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+ Counter
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a `MetricGroup`.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+this.counter = getRuntimeContext()
+  .getMetricGroup()
+  .counter("myCounter");
+  }
+
+  @public Integer map(String value) throws Exception {
+this.counter.inc();
+  }
+}
+
+{% endhighlight %}
+
+Alternatively you can also use your own `Counter` implementation:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+this.counter = getRuntimeContext()
+  .getMetricGroup()
+  .counter("myCustomCounter", new CustomCounter());
+  }
+}
+
+{% endhighlight %}
+
+ Gauge
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is no restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
`MetricGroup`.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private int valueToExpose;
+
+  @Override
+  public void open(Configuration config) {
+getRuntimeContext()
+  .getMetricGroup()
+  .gauge("MyGauge", new Gauge() {
+@Override
+public Integer getValue() {
+  return valueToExpose;
+}
+  });
+  }
+}
+
+{% endhighlight %}
+
+Note that reporters will turn the exposed object into a `String`, which 
means that a meaningful `toString()` implementation is required.
+
+ Histogram
+
+A `Histogram` measures the distribution of long values.
+You can register one by calling `histogram(String name, Histogram 
histogram)` on a `MetricGroup`.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+this.histogram = getRuntimeContext()
+  .getMetricGroup()
+  .histogram("myHistogram", new MyHistogram());
+  }
+
+  @public Integer map(Long value) throws Exception {
+this.histogram.update(value);
+  }
+}
+{% endhighlight %}
+
+Flink does not provide a default implementation for `Histogram`, but 
offers a {% gh_link 
flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
 "Wrapper" %} that allows usage of Codahale/DropWizard histograms.
+To use this wrapper add the following dependency in your `pom.xml`:
+{% highlight xml %}
+
+  org.apache.flink
+  flink-metrics-dropwizard
+  {{site.version}}
+
+{% endhighlight %}
+
+You can then register a Codahale/DropWizard histogram like this:
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+com.codahale.metrics.Histogram histogram =
+  new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
+
+this.histogram = getRuntimeContext()
+  .getMetricGroup()
+  .histogram("myHistogram", new DropWizardHistogramWrapper(histogram));
+  }
+}
+{% endhighlight %}
+
+## Scope
+
  

[jira] [Commented] (FLINK-4154) Correction of murmur hash breaks backwards compatibility

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15368130#comment-15368130
 ] 

ASF GitHub Bot commented on FLINK-4154:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2223

[FLINK-4154] [core] Correction of murmur hash breaks backwards compatibility

Revert "[FLINK-3623] [runtime] Adjust MurmurHash Algorithm"

This reverts commit 641a0d436c9b7a34ff33ceb370cf29962cac4dee.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4154_correction_of_murmurhash_breaks_backwards_compatibility

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2223.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2223


commit 99696c2fc40db7b5767c9c1b20bcc1fa5edfc890
Author: Greg Hogan 
Date:   2016-07-08T14:57:42Z

[FLINK-4154] [core] Correction of murmur hash breaks backwards compatibility

Revert "[FLINK-3623] [runtime] Adjust MurmurHash Algorithm"

This reverts commit 641a0d436c9b7a34ff33ceb370cf29962cac4dee.




> Correction of murmur hash breaks backwards compatibility
> 
>
> Key: FLINK-4154
> URL: https://issues.apache.org/jira/browse/FLINK-4154
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Greg Hogan
>Priority: Blocker
> Fix For: 1.1.0
>
>
> The correction of Flink's murmur hash with commit [1], breaks Flink's 
> backwards compatibility with respect to savepoints. The reason is that the 
> changed murmur hash which is used to partition elements in a {{KeyedStream}} 
> changes the mapping from keys to sub tasks. This changes the assigned key 
> spaces for a sub task. Consequently, an old savepoint (version 1.0) assigns 
> states with a different key space to the sub tasks.
> I think that this must be fixed for the upcoming 1.1 release. I see two 
> options to solve the problem:
> -  revert the changes, but then we don't know how the flawed murmur hash 
> performs
> - develop tooling to repartition state of old savepoints. This is probably 
> not trivial since a keyed stream can also contain non-partitioned state which 
> is not partitionable in all cases. And even if only partitioned state is 
> used, we would need some kind of special operator which can repartition the 
> state wrt the key.
> I think that the latter option requires some more thoughts and is thus 
> unlikely to be done before the release 1.1. Therefore, as a workaround, I 
> think that we should revert the murmur hash changes.
> [1] 
> https://github.com/apache/flink/commit/641a0d436c9b7a34ff33ceb370cf29962cac4dee



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2223: [FLINK-4154] [core] Correction of murmur hash brea...

2016-07-08 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2223

[FLINK-4154] [core] Correction of murmur hash breaks backwards compatibility

Revert "[FLINK-3623] [runtime] Adjust MurmurHash Algorithm"

This reverts commit 641a0d436c9b7a34ff33ceb370cf29962cac4dee.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4154_correction_of_murmurhash_breaks_backwards_compatibility

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2223.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2223


commit 99696c2fc40db7b5767c9c1b20bcc1fa5edfc890
Author: Greg Hogan 
Date:   2016-07-08T14:57:42Z

[FLINK-4154] [core] Correction of murmur hash breaks backwards compatibility

Revert "[FLINK-3623] [runtime] Adjust MurmurHash Algorithm"

This reverts commit 641a0d436c9b7a34ff33ceb370cf29962cac4dee.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4159) Quickstart poms exclude unused dependencies

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15368060#comment-15368060
 ] 

ASF GitHub Bot commented on FLINK-4159:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2217#discussion_r70113214
  
--- Diff: 
flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 ---
@@ -207,15 +206,9 @@ under the License.

com.esotericsoftware.kryo:kryo

com.esotericsoftware.minlog:minlog

org.objenesis:objenesis
-   
com.twitter:chill_*
--- End diff --

you're right, must've slipped in.


> Quickstart poms exclude unused dependencies
> ---
>
> Key: FLINK-4159
> URL: https://issues.apache.org/jira/browse/FLINK-4159
> Project: Flink
>  Issue Type: Bug
>  Components: Quickstarts
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>
> The Quickstart poms exclude several dependencies from being packaged into the 
> fat-jar, even though they aren't used by Flink according to `mvn 
> dependency:tree`.
> com.amazonaws:aws-java-sdk
> com.twitter:chill-avro_*
> com.twitter:chill-bijection_*
> com.twitter:bijection-core_*
> com.twitter:bijection-avro_*
> de.javakaffee:kryo-serializers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2217: [FLINK-4159] Remove Quickstart exclusions for unus...

2016-07-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2217#discussion_r70113214
  
--- Diff: 
flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 ---
@@ -207,15 +206,9 @@ under the License.

com.esotericsoftware.kryo:kryo

com.esotericsoftware.minlog:minlog

org.objenesis:objenesis
-   
com.twitter:chill_*
--- End diff --

you're right, must've slipped in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4154) Correction of murmur hash breaks backwards compatibility

2016-07-08 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367950#comment-15367950
 ] 

Greg Hogan commented on FLINK-4154:
---

I wanted to see how much skew this causes in case we should consider fixing the 
algorithm for 2.0. With the proper MurmurHash I see no skew. In the Flink 
version the skew is typically much less than 1% when parallelism is small.

|| channels || min count || max count || relative skew ||
| 16 | 268391180 | 268480808 | 3.3394540014317905E-4 |
| 37 | 116042832 | 116126424 | 7.20354704890346E-4 |
| 64 | 67091700 | 67129376 | 5.615597756503413E-4 |
| 117 | 36686636 | 36742216 | 0.0015149930890365636 |
| 256 | 16767212 | 16786620 | 0.0011574971438304711 |
| 731 | 5857272 | 5890152 | 0.005613534764989572 |
| 1024 | 4186452 | 4202972 | 0.003946062202552424 |
| 1497 | 2860692 | 2878580 | 0.006253032483049556 |
| 2048 | 2090320 | 2103128 | 0.00612729151517471 |
| 4096 | 1043084 | 1053744 | 0.0102196946746379 |
|  | 768004 | 778560 | 0.013744720079582919 |
| 7143 | 596812 | 606124 | 0.015602903426874795 |
| 16384 | 259052 | 265432 | 0.02462825996325062 |

{code}
for (int channels : new int[]{16, 37, 64, 117, 256, 731, 1024}) {
int[] counts = new int[channels];

int i = Integer.MIN_VALUE;
do {
counts[murmurHash(i) % channels]++;
} while (i++ < Integer.MAX_VALUE);

Arrays.sort(counts);
System.out.println(channels + ": " + counts[0] + ", " + counts[channels 
- 1] + ", " + ((1.0 * counts[channels - 1] - counts[0]) / counts[0]));
}
{code}

> Correction of murmur hash breaks backwards compatibility
> 
>
> Key: FLINK-4154
> URL: https://issues.apache.org/jira/browse/FLINK-4154
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Greg Hogan
>Priority: Blocker
> Fix For: 1.1.0
>
>
> The correction of Flink's murmur hash with commit [1], breaks Flink's 
> backwards compatibility with respect to savepoints. The reason is that the 
> changed murmur hash which is used to partition elements in a {{KeyedStream}} 
> changes the mapping from keys to sub tasks. This changes the assigned key 
> spaces for a sub task. Consequently, an old savepoint (version 1.0) assigns 
> states with a different key space to the sub tasks.
> I think that this must be fixed for the upcoming 1.1 release. I see two 
> options to solve the problem:
> -  revert the changes, but then we don't know how the flawed murmur hash 
> performs
> - develop tooling to repartition state of old savepoints. This is probably 
> not trivial since a keyed stream can also contain non-partitioned state which 
> is not partitionable in all cases. And even if only partitioned state is 
> used, we would need some kind of special operator which can repartition the 
> state wrt the key.
> I think that the latter option requires some more thoughts and is thus 
> unlikely to be done before the release 1.1. Therefore, as a workaround, I 
> think that we should revert the murmur hash changes.
> [1] 
> https://github.com/apache/flink/commit/641a0d436c9b7a34ff33ceb370cf29962cac4dee



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

2016-07-08 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70103527
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+ Event Time for Consumed Records
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+
+
+
+If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used by default. 
This timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
+ascending).
+
+Users can choose to override this default with a custom timestamp, as 
described [here]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html),
+or use one from the [predefined ones]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html). After doing so,
+it can be passed to the consumer in the following way:
+
+
+
+{% highlight java %}
+DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
+"kinesis_stream_name", new SimpleStringSchema(), 
kinesisConsumerConfig));
+kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
--- End diff --

There is one minor thing here, you have to do kinesis = kinesis.assignTS() 
in order to work properly.
But I'll fix it while merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367948#comment-15367948
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70103527
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+ Event Time for Consumed Records
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+
+
+
+If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used by default. 
This timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
+ascending).
+
+Users can choose to override this default with a custom timestamp, as 
described [here]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html),
+or use one from the [predefined ones]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html). After doing so,
+it can be passed to the consumer in the following way:
+
+
+
+{% highlight java %}
+DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
+"kinesis_stream_name", new SimpleStringSchema(), 
kinesisConsumerConfig));
+kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
--- End diff --

There is one minor thing here, you have to do kinesis = kinesis.assignTS() 
in order to work properly.
But I'll fix it while merging.


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367942#comment-15367942
 ] 

ASF GitHub Bot commented on FLINK-4019:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2214
  
+1 to merge once travis is green.


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2214: [FLINK-4019][kinesis-connector] Use Kinesis records' appr...

2016-07-08 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2214
  
+1 to merge once travis is green.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4185) Reflecting rename from Tachyon to Alluxio

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367941#comment-15367941
 ] 

ASF GitHub Bot commented on FLINK-4185:
---

GitHub user jsimsa opened a pull request:

https://github.com/apache/flink/pull/

[FLINK-4185] Reflecting rename from Tachyon to Alluxio.

Addresses: https://issues.apache.org/jira/browse/FLINK-4185

The updated documentation has been tested following steps testing Alluxio 
with Flink detailed 
[here](http://www.alluxio.org/docs/master/en/Running-Flink-on-Alluxio.html)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jsimsa/flink rename

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #


commit cc9bb8967d0ec7ae57dc5f98c8759c4c788e58ab
Author: Jiri Simsa 
Date:   2016-07-08T16:30:26Z

Reflecting rename from Tachyon to Alluxio.




> Reflecting rename from Tachyon to Alluxio
> -
>
> Key: FLINK-4185
> URL: https://issues.apache.org/jira/browse/FLINK-4185
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.0.3
>Reporter: Jiri Simsa
>Priority: Trivial
>
> The Tachyon project has been renamed to Alluxio earlier this year. The goal 
> of this issue is to reflect this in the Flink documentation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2222: [FLINK-4185] Reflecting rename from Tachyon to All...

2016-07-08 Thread jsimsa
GitHub user jsimsa opened a pull request:

https://github.com/apache/flink/pull/

[FLINK-4185] Reflecting rename from Tachyon to Alluxio.

Addresses: https://issues.apache.org/jira/browse/FLINK-4185

The updated documentation has been tested following steps testing Alluxio 
with Flink detailed 
[here](http://www.alluxio.org/docs/master/en/Running-Flink-on-Alluxio.html)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jsimsa/flink rename

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #


commit cc9bb8967d0ec7ae57dc5f98c8759c4c788e58ab
Author: Jiri Simsa 
Date:   2016-07-08T16:30:26Z

Reflecting rename from Tachyon to Alluxio.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4185) Reflecting rename from Tachyon to Alluxio

2016-07-08 Thread Jiri Simsa (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiri Simsa updated FLINK-4185:
--
Description: The Tachyon project has been renamed to Alluxio earlier this 
year. The goal of this issue is to reflect this in the Flink documentation.

> Reflecting rename from Tachyon to Alluxio
> -
>
> Key: FLINK-4185
> URL: https://issues.apache.org/jira/browse/FLINK-4185
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.0.3
>Reporter: Jiri Simsa
>Priority: Trivial
>
> The Tachyon project has been renamed to Alluxio earlier this year. The goal 
> of this issue is to reflect this in the Flink documentation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4018) Configurable idle time between getRecords requests to Kinesis shards

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367938#comment-15367938
 ] 

ASF GitHub Bot commented on FLINK-4018:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2071
  
No problem :)
+1 to merge once travis is green


> Configurable idle time between getRecords requests to Kinesis shards
> 
>
> Key: FLINK-4018
> URL: https://issues.apache.org/jira/browse/FLINK-4018
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Currently, the Kinesis consumer is calling getRecords() right after finishing 
> previous calls. This results in easily reaching Amazon's limitation of 5 GET 
> requests per shard per second. Although the code already has backoff & retry 
> mechanism, this will affect other applications consuming from the same 
> Kinesis stream.
> Along with this new configuration and the already existing 
> `KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET`, users will have more 
> control on the desired throughput behaviour for the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2071: [FLINK-4018][streaming-connectors] Configurable idle time...

2016-07-08 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2071
  
No problem :)
+1 to merge once travis is green


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4185) Reflecting rename from Tachyon to Alluxio

2016-07-08 Thread Jiri Simsa (JIRA)
Jiri Simsa created FLINK-4185:
-

 Summary: Reflecting rename from Tachyon to Alluxio
 Key: FLINK-4185
 URL: https://issues.apache.org/jira/browse/FLINK-4185
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.0.3
Reporter: Jiri Simsa
Priority: Trivial






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2220: [FLINK-4184] [metrics] Replace invalid characters in Sche...

2016-07-08 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2220
  
This PR will heavily conflict with #2219, in their current state we can't 
merge one without blocking the other.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4184) Ganglia and GraphiteReporter report metric names with invalid characters

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367882#comment-15367882
 ] 

ASF GitHub Bot commented on FLINK-4184:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2220
  
This PR will heavily conflict with #2219, in their current state we can't 
merge one without blocking the other.


> Ganglia and GraphiteReporter report metric names with invalid characters
> 
>
> Key: FLINK-4184
> URL: https://issues.apache.org/jira/browse/FLINK-4184
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> Flink's {{GangliaReporter}} and {{GraphiteReporter}} report metrics with 
> names which contain invalid characters. For example, quotes are not filtered 
> out which can be problematic for Ganglia. Moreover, dots are not replaced 
> which causes Graphite to think that an IP address is actually a scoped metric 
> name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4183) Move checking for StreamTableEnvironment into validation layer

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367876#comment-15367876
 ] 

ASF GitHub Bot commented on FLINK-4183:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/2221

[FLINK-4183] [table] Move checking for StreamTableEnvironment into 
validation layer

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

See issue description.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-4183

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2221.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2221


commit 77b8f7d27ebcb6c44ee26d22cd24322e6634aa89
Author: twalthr 
Date:   2016-07-08T14:50:17Z

[FLINK-4183] [table] Move checking for StreamTableEnvironment into 
validation layer




> Move checking for StreamTableEnvironment into validation layer
> --
>
> Key: FLINK-4183
> URL: https://issues.apache.org/jira/browse/FLINK-4183
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Minor
>
> Some operators check the environment in `table.scala` instead of doing this 
> during the valdation phase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2221: [FLINK-4183] [table] Move checking for StreamTable...

2016-07-08 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/2221

[FLINK-4183] [table] Move checking for StreamTableEnvironment into 
validation layer

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

See issue description.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-4183

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2221.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2221


commit 77b8f7d27ebcb6c44ee26d22cd24322e6634aa89
Author: twalthr 
Date:   2016-07-08T14:50:17Z

[FLINK-4183] [table] Move checking for StreamTableEnvironment into 
validation layer




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4184) Ganglia and GraphiteReporter report metric names with invalid characters

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367871#comment-15367871
 ] 

ASF GitHub Bot commented on FLINK-4184:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2220#discussion_r70097230
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
 ---
@@ -84,4 +84,24 @@ public void notifyOfRemovedMetric(Metric metric, String 
metricName, AbstractMetr
protected String replaceInvalidChars(String metricName) {
return metricName;
}
+
+   /**
+* Method which constructs the fully qualified metric name from the 
metric group and the metric
+* name.
+*
+* @param metricName Name of the metric
+* @param group Associated metric group
+* @return Fully qualified metric name
+*/
+   private String constructMetricName(String metricName, 
AbstractMetricGroup group) {
+   StringBuilder builder = new StringBuilder();
+
+   for (String componentName : group.getScopeComponents()) {
+   
builder.append(replaceInvalidChars(componentName)).append(".");
--- End diff --

this is a bit inefficient. The output of this loop is identical for all 
metrics on that group, yet is computed for every single metric. Instead you 
could modify the getScopeString() method to accept a charFilter argument that 
is passed into ScopeFormat.concat().


> Ganglia and GraphiteReporter report metric names with invalid characters
> 
>
> Key: FLINK-4184
> URL: https://issues.apache.org/jira/browse/FLINK-4184
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> Flink's {{GangliaReporter}} and {{GraphiteReporter}} report metrics with 
> names which contain invalid characters. For example, quotes are not filtered 
> out which can be problematic for Ganglia. Moreover, dots are not replaced 
> which causes Graphite to think that an IP address is actually a scoped metric 
> name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2158: [FLINK-4116] Metrics documentation

2016-07-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r70097270
  
--- Diff: docs/apis/metrics.md ---
@@ -0,0 +1,441 @@
+---
+title: "Metrics"
+# Top-level navigation
+top-nav-group: apis
+top-nav-pos: 13
+top-nav-title: "Metrics"
+---
+
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+ Counter
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a `MetricGroup`.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+this.counter = getRuntimeContext()
+  .getMetricGroup()
+  .counter("myCounter");
+  }
+
+  @public Integer map(String value) throws Exception {
+this.counter.inc();
+  }
+}
+
+{% endhighlight %}
+
+Alternatively you can also use your own `Counter` implementation:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+this.counter = getRuntimeContext()
+  .getMetricGroup()
+  .counter("myCustomCounter", new CustomCounter());
+  }
+}
+
+{% endhighlight %}
+
+ Gauge
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is no restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
`MetricGroup`.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private int valueToExpose;
+
+  @Override
+  public void open(Configuration config) {
+getRuntimeContext()
+  .getMetricGroup()
+  .gauge("MyGauge", new Gauge() {
+@Override
+public Integer getValue() {
+  return valueToExpose;
+}
+  });
+  }
+}
+
+{% endhighlight %}
+
+Note that reporters will turn the exposed object into a `String`, which 
means that a meaningful `toString()` implementation is required.
+
+ Histogram
+
+A `Histogram` measures the distribution of long values.
+You can register one by calling `histogram(String name, Histogram 
histogram)` on a `MetricGroup`.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+this.histogram = getRuntimeContext()
+  .getMetricGroup()
+  .histogram("myHistogram", new MyHistogram());
+  }
+
+  @public Integer map(Long value) throws Exception {
+this.histogram.update(value);
+  }
+}
+{% endhighlight %}
+
+Flink does not provide a default implementation for `Histogram`, but 
offers a {% gh_link 
flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
 "Wrapper" %} that allows usage of Codahale/DropWizard histograms.
+To use this wrapper add the following dependency in your `pom.xml`:
+{% highlight xml %}
+
+  org.apache.flink
+  flink-metrics-dropwizard
+  {{site.version}}
+
+{% endhighlight %}
+
+You can then register a Codahale/DropWizard histogram like this:
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+com.codahale.metrics.Histogram histogram =
+  new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
+
+this.histogram = getRuntimeContext()
+  .getMetricGroup()
+  .histogram("myHistogram", new DropWizardHistogramWrapper(histogram));
+  }
+}
+{% endhighlight %}
+
+## Scope
  

[jira] [Commented] (FLINK-4116) Document metrics

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367873#comment-15367873
 ] 

ASF GitHub Bot commented on FLINK-4116:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r70097270
  
--- Diff: docs/apis/metrics.md ---
@@ -0,0 +1,441 @@
+---
+title: "Metrics"
+# Top-level navigation
+top-nav-group: apis
+top-nav-pos: 13
+top-nav-title: "Metrics"
+---
+
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+ Counter
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a `MetricGroup`.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+this.counter = getRuntimeContext()
+  .getMetricGroup()
+  .counter("myCounter");
+  }
+
+  @public Integer map(String value) throws Exception {
+this.counter.inc();
+  }
+}
+
+{% endhighlight %}
+
+Alternatively you can also use your own `Counter` implementation:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+this.counter = getRuntimeContext()
+  .getMetricGroup()
+  .counter("myCustomCounter", new CustomCounter());
+  }
+}
+
+{% endhighlight %}
+
+ Gauge
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is no restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
`MetricGroup`.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private int valueToExpose;
+
+  @Override
+  public void open(Configuration config) {
+getRuntimeContext()
+  .getMetricGroup()
+  .gauge("MyGauge", new Gauge() {
+@Override
+public Integer getValue() {
+  return valueToExpose;
+}
+  });
+  }
+}
+
+{% endhighlight %}
+
+Note that reporters will turn the exposed object into a `String`, which 
means that a meaningful `toString()` implementation is required.
+
+ Histogram
+
+A `Histogram` measures the distribution of long values.
+You can register one by calling `histogram(String name, Histogram 
histogram)` on a `MetricGroup`.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+this.histogram = getRuntimeContext()
+  .getMetricGroup()
+  .histogram("myHistogram", new MyHistogram());
+  }
+
+  @public Integer map(Long value) throws Exception {
+this.histogram.update(value);
+  }
+}
+{% endhighlight %}
+
+Flink does not provide a default implementation for `Histogram`, but 
offers a {% gh_link 
flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
 "Wrapper" %} that allows usage of Codahale/DropWizard histograms.
+To use this wrapper add the following dependency in your `pom.xml`:
+{% highlight xml %}
+
+  org.apache.flink
+  flink-metrics-dropwizard
+  {{site.version}}
+
+{% endhighlight %}
+
+You can then register a Codahale/DropWizard histogram like this:
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+com.codahale.metrics.Histogram histogram =
+  new com.codahale.metrics.Histogram(new 

[GitHub] flink pull request #2220: [FLINK-4184] [metrics] Replace invalid characters ...

2016-07-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2220#discussion_r70097230
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
 ---
@@ -84,4 +84,24 @@ public void notifyOfRemovedMetric(Metric metric, String 
metricName, AbstractMetr
protected String replaceInvalidChars(String metricName) {
return metricName;
}
+
+   /**
+* Method which constructs the fully qualified metric name from the 
metric group and the metric
+* name.
+*
+* @param metricName Name of the metric
+* @param group Associated metric group
+* @return Fully qualified metric name
+*/
+   private String constructMetricName(String metricName, 
AbstractMetricGroup group) {
+   StringBuilder builder = new StringBuilder();
+
+   for (String componentName : group.getScopeComponents()) {
+   
builder.append(replaceInvalidChars(componentName)).append(".");
--- End diff --

this is a bit inefficient. The output of this loop is identical for all 
metrics on that group, yet is computed for every single metric. Instead you 
could modify the getScopeString() method to accept a charFilter argument that 
is passed into ScopeFormat.concat().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1954: [FLINK-3190] failure rate restart strategy

2016-07-08 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/1954
  
The changes look good to me. Thanks for your work @fijolekProjects :-) I 
had one final remark concerning the `FixedSizeFifoQueue`. Maybe we could try to 
replace it with the `ArrayDeque`. 

I think you cannot make `ArrayDeque` fixed in size, but this is not a 
problem if you check the size before appending elements. If the maximum size is 
reached, then we have to remove elements from the queue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3190) Retry rate limits for DataStream API

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367848#comment-15367848
 ] 

ASF GitHub Bot commented on FLINK-3190:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/1954
  
The changes look good to me. Thanks for your work @fijolekProjects :-) I 
had one final remark concerning the `FixedSizeFifoQueue`. Maybe we could try to 
replace it with the `ArrayDeque`. 

I think you cannot make `ArrayDeque` fixed in size, but this is not a 
problem if you check the size before appending elements. If the maximum size is 
reached, then we have to remove elements from the queue.


> Retry rate limits for DataStream API
> 
>
> Key: FLINK-3190
> URL: https://issues.apache.org/jira/browse/FLINK-3190
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Michał Fijołek
>Priority: Minor
>
> For a long running stream processing job, absolute numbers of retries don't 
> make much sense: The job will accumulate transient errors over time and will 
> die eventually when thresholds are exceeded. Rate limits are better suited in 
> this scenario: A job should only die, if it fails too often in a given time 
> frame. To better overcome transient errors, retry delays could be used, as 
> suggested in other issues.
> Absolute numbers of retries can still make sense, if failing operators don't 
> make any progress at all. We can measure progress by OperatorState changes 
> and by observing output, as long as the operator in question is not a sink. 
> If operator state changes and/or operator produces output, we can assume it 
> makes progress.
> As an example, let's say we configured a retry rate limit of 10 retries per 
> hour and a non-sink operator A. If the operator fails once every 10 minutes 
> and produces output between failures, it should not lead to job termination. 
> But if the operator fails 11 times in an hour or does not produce output 
> between 11 consecutive failures, job should be terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4184) Ganglia and GraphiteReporter report metric names with invalid characters

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367842#comment-15367842
 ] 

ASF GitHub Bot commented on FLINK-4184:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2220#discussion_r70095846
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
 ---
@@ -84,4 +84,24 @@ public void notifyOfRemovedMetric(Metric metric, String 
metricName, AbstractMetr
protected String replaceInvalidChars(String metricName) {
return metricName;
}
+
+   /**
+* Method which constructs the fully qualified metric name from the 
metric group and the metric
+* name.
+*
+* @param metricName Name of the metric
+* @param group Associated metric group
+* @return Fully qualified metric name
+*/
+   private String constructMetricName(String metricName, 
AbstractMetricGroup group) {
+   StringBuilder builder = new StringBuilder();
+
+   for (String componentName : group.getScopeComponents()) {
+   
builder.append(replaceInvalidChars(componentName)).append(".");
+   }
+
+   builder.append(replaceInvalidChars(metricName));
--- End diff --

the call to replaceInvalidChars is not required; metric names can only 
contain alphanumeric characters. I don't think any reporter will not support 
these.


> Ganglia and GraphiteReporter report metric names with invalid characters
> 
>
> Key: FLINK-4184
> URL: https://issues.apache.org/jira/browse/FLINK-4184
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> Flink's {{GangliaReporter}} and {{GraphiteReporter}} report metrics with 
> names which contain invalid characters. For example, quotes are not filtered 
> out which can be problematic for Ganglia. Moreover, dots are not replaced 
> which causes Graphite to think that an IP address is actually a scoped metric 
> name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2220: [FLINK-4184] [metrics] Replace invalid characters ...

2016-07-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2220#discussion_r70095846
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
 ---
@@ -84,4 +84,24 @@ public void notifyOfRemovedMetric(Metric metric, String 
metricName, AbstractMetr
protected String replaceInvalidChars(String metricName) {
return metricName;
}
+
+   /**
+* Method which constructs the fully qualified metric name from the 
metric group and the metric
+* name.
+*
+* @param metricName Name of the metric
+* @param group Associated metric group
+* @return Fully qualified metric name
+*/
+   private String constructMetricName(String metricName, 
AbstractMetricGroup group) {
+   StringBuilder builder = new StringBuilder();
+
+   for (String componentName : group.getScopeComponents()) {
+   
builder.append(replaceInvalidChars(componentName)).append(".");
+   }
+
+   builder.append(replaceInvalidChars(metricName));
--- End diff --

the call to replaceInvalidChars is not required; metric names can only 
contain alphanumeric characters. I don't think any reporter will not support 
these.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

2016-07-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1954#discussion_r70094658
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/FixedSizeFifoQueue.java
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * Adding element to full queue removes its head and then adds new 
element. It's why size of this queue is fixed.
+ * Example:
+ * 
+ * {@code
+ * Queue q = new FixedSizeFifoQueue(2);
+ * q.add(1); // q = [1]
+ * q.add(2); // q = [1, 2]
+ * q.add(3); // q = [2, 3]
+ * q.peek(); // 2
+ * }
+ * 
+ */
+public class FixedSizeFifoQueue extends ArrayBlockingQueue {
--- End diff --

I think it's not so efficient to use a `ArrayBlockingQueue` as the basis 
for the implementation because we don't need protection against concurrency. I 
think it would be great if we could use `ArrayDeque` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3190) Retry rate limits for DataStream API

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367828#comment-15367828
 ] 

ASF GitHub Bot commented on FLINK-3190:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1954#discussion_r70094658
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/FixedSizeFifoQueue.java
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * Adding element to full queue removes its head and then adds new 
element. It's why size of this queue is fixed.
+ * Example:
+ * 
+ * {@code
+ * Queue q = new FixedSizeFifoQueue(2);
+ * q.add(1); // q = [1]
+ * q.add(2); // q = [1, 2]
+ * q.add(3); // q = [2, 3]
+ * q.peek(); // 2
+ * }
+ * 
+ */
+public class FixedSizeFifoQueue extends ArrayBlockingQueue {
--- End diff --

I think it's not so efficient to use a `ArrayBlockingQueue` as the basis 
for the implementation because we don't need protection against concurrency. I 
think it would be great if we could use `ArrayDeque` instead.


> Retry rate limits for DataStream API
> 
>
> Key: FLINK-3190
> URL: https://issues.apache.org/jira/browse/FLINK-3190
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Michał Fijołek
>Priority: Minor
>
> For a long running stream processing job, absolute numbers of retries don't 
> make much sense: The job will accumulate transient errors over time and will 
> die eventually when thresholds are exceeded. Rate limits are better suited in 
> this scenario: A job should only die, if it fails too often in a given time 
> frame. To better overcome transient errors, retry delays could be used, as 
> suggested in other issues.
> Absolute numbers of retries can still make sense, if failing operators don't 
> make any progress at all. We can measure progress by OperatorState changes 
> and by observing output, as long as the operator in question is not a sink. 
> If operator state changes and/or operator produces output, we can assume it 
> makes progress.
> As an example, let's say we configured a retry rate limit of 10 retries per 
> hour and a non-sink operator A. If the operator fails once every 10 minutes 
> and produces output between failures, it should not lead to job termination. 
> But if the operator fails 11 times in an hour or does not produce output 
> between 11 consecutive failures, job should be terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3190) Retry rate limits for DataStream API

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367819#comment-15367819
 ] 

ASF GitHub Bot commented on FLINK-3190:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1954#discussion_r70094058
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
 ---
@@ -36,19 +36,19 @@
 public class FailureRateRestartStrategy implements RestartStrategy {
private final Duration failuresInterval;
private final Duration delayInterval;
-   private EvictingQueue restartTimestampsQueue;
+   private FixedSizeFifoQueue restartTimestampsQueue;
private boolean disabled = false;
 
public FailureRateRestartStrategy(int maxFailuresPerInterval, Duration 
failuresInterval, Duration delayInterval) {
-   Preconditions.checkArgument(maxFailuresPerInterval > 0, 
"Maximum number of restart attempts per time unit must be greater than 0.");
Preconditions.checkNotNull(failuresInterval, "Failures interval 
cannot be null.");
-   Preconditions.checkNotNull(failuresInterval.length() > 0, 
"Failures interval must be greater than 0 ms.");
Preconditions.checkNotNull(delayInterval, "Delay interval 
cannot be null.");
-   Preconditions.checkNotNull(delayInterval.length() >= 0, "Delay 
interval must be at least 0 ms.");
+   Preconditions.checkArgument(maxFailuresPerInterval > 0, 
"Maximum number of restart attempts per time unit must be greater than 0.");
+   Preconditions.checkArgument(failuresInterval.length() > 0, 
"Failures interval must be greater than 0 ms.");
+   Preconditions.checkArgument(delayInterval.length() >= 0, "Delay 
interval must be at least 0 ms.");
 
this.failuresInterval = failuresInterval;
this.delayInterval = delayInterval;
-   this.restartTimestampsQueue = 
EvictingQueue.create(maxFailuresPerInterval);
+   this.restartTimestampsQueue = new 
FixedSizeFifoQueue<>(maxFailuresPerInterval);
--- End diff --

Can't we simply use `new ArrayDeque(maxFailuresPerInterval)`? Of course, we 
would then allocate 2^(ceil(log(maxFailuresPerInterval)/log(2)) elements, but 
this should be ok. We could then check in the `restart` method via the `size` 
method whether the queue is full or not.


> Retry rate limits for DataStream API
> 
>
> Key: FLINK-3190
> URL: https://issues.apache.org/jira/browse/FLINK-3190
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Michał Fijołek
>Priority: Minor
>
> For a long running stream processing job, absolute numbers of retries don't 
> make much sense: The job will accumulate transient errors over time and will 
> die eventually when thresholds are exceeded. Rate limits are better suited in 
> this scenario: A job should only die, if it fails too often in a given time 
> frame. To better overcome transient errors, retry delays could be used, as 
> suggested in other issues.
> Absolute numbers of retries can still make sense, if failing operators don't 
> make any progress at all. We can measure progress by OperatorState changes 
> and by observing output, as long as the operator in question is not a sink. 
> If operator state changes and/or operator produces output, we can assume it 
> makes progress.
> As an example, let's say we configured a retry rate limit of 10 retries per 
> hour and a non-sink operator A. If the operator fails once every 10 minutes 
> and produces output between failures, it should not lead to job termination. 
> But if the operator fails 11 times in an hour or does not produce output 
> between 11 consecutive failures, job should be terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

2016-07-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1954#discussion_r70094058
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
 ---
@@ -36,19 +36,19 @@
 public class FailureRateRestartStrategy implements RestartStrategy {
private final Duration failuresInterval;
private final Duration delayInterval;
-   private EvictingQueue restartTimestampsQueue;
+   private FixedSizeFifoQueue restartTimestampsQueue;
private boolean disabled = false;
 
public FailureRateRestartStrategy(int maxFailuresPerInterval, Duration 
failuresInterval, Duration delayInterval) {
-   Preconditions.checkArgument(maxFailuresPerInterval > 0, 
"Maximum number of restart attempts per time unit must be greater than 0.");
Preconditions.checkNotNull(failuresInterval, "Failures interval 
cannot be null.");
-   Preconditions.checkNotNull(failuresInterval.length() > 0, 
"Failures interval must be greater than 0 ms.");
Preconditions.checkNotNull(delayInterval, "Delay interval 
cannot be null.");
-   Preconditions.checkNotNull(delayInterval.length() >= 0, "Delay 
interval must be at least 0 ms.");
+   Preconditions.checkArgument(maxFailuresPerInterval > 0, 
"Maximum number of restart attempts per time unit must be greater than 0.");
+   Preconditions.checkArgument(failuresInterval.length() > 0, 
"Failures interval must be greater than 0 ms.");
+   Preconditions.checkArgument(delayInterval.length() >= 0, "Delay 
interval must be at least 0 ms.");
 
this.failuresInterval = failuresInterval;
this.delayInterval = delayInterval;
-   this.restartTimestampsQueue = 
EvictingQueue.create(maxFailuresPerInterval);
+   this.restartTimestampsQueue = new 
FixedSizeFifoQueue<>(maxFailuresPerInterval);
--- End diff --

Can't we simply use `new ArrayDeque(maxFailuresPerInterval)`? Of course, we 
would then allocate 2^(ceil(log(maxFailuresPerInterval)/log(2)) elements, but 
this should be ok. We could then check in the `restart` method via the `size` 
method whether the queue is full or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4180) Create a batch SQL example

2016-07-08 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367808#comment-15367808
 ] 

Jark Wu commented on FLINK-4180:


I'll work on this.

> Create a batch SQL example
> --
>
> Key: FLINK-4180
> URL: https://issues.apache.org/jira/browse/FLINK-4180
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>  Labels: starter
>
> Currently there is no runnable code example in `flink-table` showing a 
> working batch SQL query with the Table API.
> A Scala and Java example should be added.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4181) Add a basic streaming Table API example

2016-07-08 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367810#comment-15367810
 ] 

Jark Wu commented on FLINK-4181:


I'll work on this.

> Add a basic streaming Table API example
> ---
>
> Key: FLINK-4181
> URL: https://issues.apache.org/jira/browse/FLINK-4181
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>  Labels: starter
>
> Although the Table API does not offer much streaming features yet, there 
> should be a runnable example showing how to convert, union, filter and 
> project streams with the Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4181) Add a basic streaming Table API example

2016-07-08 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-4181:
--

Assignee: Jark Wu

> Add a basic streaming Table API example
> ---
>
> Key: FLINK-4181
> URL: https://issues.apache.org/jira/browse/FLINK-4181
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>  Labels: starter
>
> Although the Table API does not offer much streaming features yet, there 
> should be a runnable example showing how to convert, union, filter and 
> project streams with the Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4180) Create a batch SQL example

2016-07-08 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-4180:
--

Assignee: Jark Wu

> Create a batch SQL example
> --
>
> Key: FLINK-4180
> URL: https://issues.apache.org/jira/browse/FLINK-4180
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>  Labels: starter
>
> Currently there is no runnable code example in `flink-table` showing a 
> working batch SQL query with the Table API.
> A Scala and Java example should be added.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

2016-07-08 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2109
  
1) If it is to include it, then I would suggest to just replace the 
DefaultFilter with that, and probably give some initial patterns for e.g. 
hidden files (start with "."). Having two implementations, both serving as 
default implementations in different parts of the code it seems a bit confusing.

2) shouldIgnore() assumes we already have the lock and it is applied 
entirely within it, so it will not harm to remove it.

3) The method to change is the `createFileInput(FileInputFormat 
inputFormat, TypeInformation typeInfo, String sourceName,  
FileProcessingMode watchType, FilePathFilter pathFilter, long interval)` which 
creates the `ContinuousFileMonitoringFunction` .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4184) Ganglia and GraphiteReporter report metric names with invalid characters

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367779#comment-15367779
 ] 

ASF GitHub Bot commented on FLINK-4184:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2220

[FLINK-4184] [metrics] Replace invalid characters in 
ScheduledDropwizardReporter

The GraphiteReporter and GangliaReporter report metric names which can 
contain invalid
characters. These characters include quotes and dots. In order to properly 
report metrics
to these systems, the afore-mentioned characters have to be replaced in 
metric names.

The PR also removes quotes from the garbage collector metric name.

The PR sets the default value for TTL in the GangliaReporter to 1, because 
-1 causes the
reporter to fail.

R @zentol.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixDropwizardReporters

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2220.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2220


commit d63911499aef10f54d7c4c0774f4f22a520bcc66
Author: Till Rohrmann 
Date:   2016-07-08T14:41:39Z

[FLINK-4184] [metrics] Replace invalid characters in 
ScheduledDropwizardReporter

The GraphiteReporter and GangliaReporter report metric names which can 
contain invalid
characters. These characters include quotes and dots. In order to properly 
report metrics
to these systems, the afore-mentioned characters have to be replaced in 
metric names.

The PR also removes quotes from the garbage collector metric name.

The PR sets the default value for TTL in the GangliaReporter to 1, because 
-1 causes the
reporter to fail.




> Ganglia and GraphiteReporter report metric names with invalid characters
> 
>
> Key: FLINK-4184
> URL: https://issues.apache.org/jira/browse/FLINK-4184
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> Flink's {{GangliaReporter}} and {{GraphiteReporter}} report metrics with 
> names which contain invalid characters. For example, quotes are not filtered 
> out which can be problematic for Ganglia. Moreover, dots are not replaced 
> which causes Graphite to think that an IP address is actually a scoped metric 
> name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367780#comment-15367780
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2109
  
1) If it is to include it, then I would suggest to just replace the 
DefaultFilter with that, and probably give some initial patterns for e.g. 
hidden files (start with "."). Having two implementations, both serving as 
default implementations in different parts of the code it seems a bit confusing.

2) shouldIgnore() assumes we already have the lock and it is applied 
entirely within it, so it will not harm to remove it.

3) The method to change is the `createFileInput(FileInputFormat 
inputFormat, TypeInformation typeInfo, String sourceName,  
FileProcessingMode watchType, FilePathFilter pathFilter, long interval)` which 
creates the `ContinuousFileMonitoringFunction` .


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2220: [FLINK-4184] [metrics] Replace invalid characters ...

2016-07-08 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2220

[FLINK-4184] [metrics] Replace invalid characters in 
ScheduledDropwizardReporter

The GraphiteReporter and GangliaReporter report metric names which can 
contain invalid
characters. These characters include quotes and dots. In order to properly 
report metrics
to these systems, the afore-mentioned characters have to be replaced in 
metric names.

The PR also removes quotes from the garbage collector metric name.

The PR sets the default value for TTL in the GangliaReporter to 1, because 
-1 causes the
reporter to fail.

R @zentol.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixDropwizardReporters

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2220.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2220


commit d63911499aef10f54d7c4c0774f4f22a520bcc66
Author: Till Rohrmann 
Date:   2016-07-08T14:41:39Z

[FLINK-4184] [metrics] Replace invalid characters in 
ScheduledDropwizardReporter

The GraphiteReporter and GangliaReporter report metric names which can 
contain invalid
characters. These characters include quotes and dots. In order to properly 
report metrics
to these systems, the afore-mentioned characters have to be replaced in 
metric names.

The PR also removes quotes from the garbage collector metric name.

The PR sets the default value for TTL in the GangliaReporter to 1, because 
-1 causes the
reporter to fail.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4171) StatsD does not accept metrics whose name contains ":"

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1536#comment-1536
 ] 

ASF GitHub Bot commented on FLINK-4171:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2212


> StatsD does not accept metrics whose name contains ":"
> --
>
> Key: FLINK-4171
> URL: https://issues.apache.org/jira/browse/FLINK-4171
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> The {{StatsDReporter}} mustn't send metrics whose name contains a ":" 
> character to {{StatsD}} because these reports are discarded. The reason is 
> that ":" is used to separate the value from the metric name.
> I propose to check the metric names and replace all ":" by a "-" character.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4171) StatsD does not accept metrics whose name contains ":"

2016-07-08 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-4171.

Resolution: Fixed

Fixed via 474ec9660c528af8e19898114656fccb3e4747af

> StatsD does not accept metrics whose name contains ":"
> --
>
> Key: FLINK-4171
> URL: https://issues.apache.org/jira/browse/FLINK-4171
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> The {{StatsDReporter}} mustn't send metrics whose name contains a ":" 
> character to {{StatsD}} because these reports are discarded. The reason is 
> that ":" is used to separate the value from the metric name.
> I propose to check the metric names and replace all ":" by a "-" character.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2212: [FLINK-4171] [metrics] Replace : chars in StatsD m...

2016-07-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2212


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2212: [FLINK-4171] [metrics] Replace : chars in StatsD metric n...

2016-07-08 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2212
  
Thanks for reviewing @zentol. Will be merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4171) StatsD does not accept metrics whose name contains ":"

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367771#comment-15367771
 ] 

ASF GitHub Bot commented on FLINK-4171:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2212
  
Thanks for reviewing @zentol. Will be merging this PR.


> StatsD does not accept metrics whose name contains ":"
> --
>
> Key: FLINK-4171
> URL: https://issues.apache.org/jira/browse/FLINK-4171
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> The {{StatsDReporter}} mustn't send metrics whose name contains a ":" 
> character to {{StatsD}} because these reports are discarded. The reason is 
> that ":" is used to separate the value from the metric name.
> I propose to check the metric names and replace all ":" by a "-" character.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2217: [FLINK-4159] Remove Quickstart exclusions for unus...

2016-07-08 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2217#discussion_r70085553
  
--- Diff: 
flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 ---
@@ -207,15 +206,9 @@ under the License.

com.esotericsoftware.kryo:kryo

com.esotericsoftware.minlog:minlog

org.objenesis:objenesis
-   
com.twitter:chill_*
--- End diff --

I also see that this was not in the list in the Jira for FLINK-4159.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4159) Quickstart poms exclude unused dependencies

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367767#comment-15367767
 ] 

ASF GitHub Bot commented on FLINK-4159:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2217#discussion_r70085553
  
--- Diff: 
flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 ---
@@ -207,15 +206,9 @@ under the License.

com.esotericsoftware.kryo:kryo

com.esotericsoftware.minlog:minlog

org.objenesis:objenesis
-   
com.twitter:chill_*
--- End diff --

I also see that this was not in the list in the Jira for FLINK-4159.


> Quickstart poms exclude unused dependencies
> ---
>
> Key: FLINK-4159
> URL: https://issues.apache.org/jira/browse/FLINK-4159
> Project: Flink
>  Issue Type: Bug
>  Components: Quickstarts
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>
> The Quickstart poms exclude several dependencies from being packaged into the 
> fat-jar, even though they aren't used by Flink according to `mvn 
> dependency:tree`.
> com.amazonaws:aws-java-sdk
> com.twitter:chill-avro_*
> com.twitter:chill-bijection_*
> com.twitter:bijection-core_*
> com.twitter:bijection-avro_*
> de.javakaffee:kryo-serializers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367764#comment-15367764
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2109
  
1. Technically the GlobFilePathFilter is not redundant, since it fulfills 
exactly the requirements of the issue; be able to filter files by specifying a 
regular expression. It also offers functionality that is very basic, so i think 
we should include it.

2. The shouldIgnore method does stuff with locks. Is it safe to just remove 
that? (since the FileInputFormat doesn't do that.)

3. which methods specifically?


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

2016-07-08 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2109
  
1. Technically the GlobFilePathFilter is not redundant, since it fulfills 
exactly the requirements of the issue; be able to filter files by specifying a 
regular expression. It also offers functionality that is very basic, so i think 
we should include it.

2. The shouldIgnore method does stuff with locks. Is it safe to just remove 
that? (since the FileInputFormat doesn't do that.)

3. which methods specifically?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4184) Ganglia and GraphiteReporter report metric names with invalid characters

2016-07-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4184:


 Summary: Ganglia and GraphiteReporter report metric names with 
invalid characters
 Key: FLINK-4184
 URL: https://issues.apache.org/jira/browse/FLINK-4184
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.1.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.1.0


Flink's {{GangliaReporter}} and {{GraphiteReporter}} report metrics with names 
which contain invalid characters. For example, quotes are not filtered out 
which can be problematic for Ganglia. Moreover, dots are not replaced which 
causes Graphite to think that an IP address is actually a scoped metric name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4159) Quickstart poms exclude unused dependencies

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367746#comment-15367746
 ] 

ASF GitHub Bot commented on FLINK-4159:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2217#discussion_r70083520
  
--- Diff: 
flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 ---
@@ -207,15 +206,9 @@ under the License.

com.esotericsoftware.kryo:kryo

com.esotericsoftware.minlog:minlog

org.objenesis:objenesis
-   
com.twitter:chill_*
--- End diff --

This is a dependency of `flink-runtime` which is a dependency of 
`flink-dist`.


> Quickstart poms exclude unused dependencies
> ---
>
> Key: FLINK-4159
> URL: https://issues.apache.org/jira/browse/FLINK-4159
> Project: Flink
>  Issue Type: Bug
>  Components: Quickstarts
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>
> The Quickstart poms exclude several dependencies from being packaged into the 
> fat-jar, even though they aren't used by Flink according to `mvn 
> dependency:tree`.
> com.amazonaws:aws-java-sdk
> com.twitter:chill-avro_*
> com.twitter:chill-bijection_*
> com.twitter:bijection-core_*
> com.twitter:bijection-avro_*
> de.javakaffee:kryo-serializers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4159) Quickstart poms exclude unused dependencies

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367749#comment-15367749
 ] 

ASF GitHub Bot commented on FLINK-4159:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2217#discussion_r70083585
  
--- Diff: 
flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 ---
@@ -207,15 +206,9 @@ under the License.

com.esotericsoftware.kryo:kryo

com.esotericsoftware.minlog:minlog

org.objenesis:objenesis
-   
com.twitter:chill_*

com.twitter:chill-java
-   
com.twitter:chill-avro_*
-   
com.twitter:chill-bijection_*
-   
com.twitter:bijection-core_*
-   
com.twitter:bijection-avro_*

commons-lang:commons-lang

junit:junit
-   
de.javakaffee:kryo-serializers
--- End diff --

This is a dependency of `flink-core` which is a dependency of `flink-dist`.


> Quickstart poms exclude unused dependencies
> ---
>
> Key: FLINK-4159
> URL: https://issues.apache.org/jira/browse/FLINK-4159
> Project: Flink
>  Issue Type: Bug
>  Components: Quickstarts
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>
> The Quickstart poms exclude several dependencies from being packaged into the 
> fat-jar, even though they aren't used by Flink according to `mvn 
> dependency:tree`.
> com.amazonaws:aws-java-sdk
> com.twitter:chill-avro_*
> com.twitter:chill-bijection_*
> com.twitter:bijection-core_*
> com.twitter:bijection-avro_*
> de.javakaffee:kryo-serializers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2217: [FLINK-4159] Remove Quickstart exclusions for unus...

2016-07-08 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2217#discussion_r70083585
  
--- Diff: 
flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 ---
@@ -207,15 +206,9 @@ under the License.

com.esotericsoftware.kryo:kryo

com.esotericsoftware.minlog:minlog

org.objenesis:objenesis
-   
com.twitter:chill_*

com.twitter:chill-java
-   
com.twitter:chill-avro_*
-   
com.twitter:chill-bijection_*
-   
com.twitter:bijection-core_*
-   
com.twitter:bijection-avro_*

commons-lang:commons-lang

junit:junit
-   
de.javakaffee:kryo-serializers
--- End diff --

This is a dependency of `flink-core` which is a dependency of `flink-dist`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2217: [FLINK-4159] Remove Quickstart exclusions for unus...

2016-07-08 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2217#discussion_r70083520
  
--- Diff: 
flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 ---
@@ -207,15 +206,9 @@ under the License.

com.esotericsoftware.kryo:kryo

com.esotericsoftware.minlog:minlog

org.objenesis:objenesis
-   
com.twitter:chill_*
--- End diff --

This is a dependency of `flink-runtime` which is a dependency of 
`flink-dist`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367732#comment-15367732
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2109
  
Hi @mushketyk ,

Sorry for the late reply. 
I have some general comments on the PR.

1) Given the FilePathFilter, I think that the GlobFilePathFilter is 
redundant, right? It is a specific implementation that uses pattern matching. 
This can be provided by the programmer. Given this, in the FileInputFormat, the 
filesFilter (which we could change the name to filePathFilter or sth more 
expressive of its function) becomes a FilePathFilter.

2) Given that the filter is now in the FileInputFormat, then the 
ContinuousFileMonitoringFunction should change, as now it is the format that 
does the filtering. So the filter should be removed from its constructor and 
the shouldIgnore() method becomes redundant.

3) The affected methods in the StreamExecutionEnvironment should change too.

It could help if you shared a design draft before integrating the changes, 
so that we can discuss on them and figure out all the parts in the code that 
change and need testing.

What do you think?



> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

2016-07-08 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2109
  
Hi @mushketyk ,

Sorry for the late reply. 
I have some general comments on the PR.

1) Given the FilePathFilter, I think that the GlobFilePathFilter is 
redundant, right? It is a specific implementation that uses pattern matching. 
This can be provided by the programmer. Given this, in the FileInputFormat, the 
filesFilter (which we could change the name to filePathFilter or sth more 
expressive of its function) becomes a FilePathFilter.

2) Given that the filter is now in the FileInputFormat, then the 
ContinuousFileMonitoringFunction should change, as now it is the format that 
does the filtering. So the filter should be removed from its constructor and 
the shouldIgnore() method becomes redundant.

3) The affected methods in the StreamExecutionEnvironment should change too.

It could help if you shared a design draft before integrating the changes, 
so that we can discuss on them and figure out all the parts in the code that 
change and need testing.

What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4181) Add a basic streaming Table API example

2016-07-08 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-4181:

Description: Although the Table API does not offer much streaming features 
yet, there should be a runnable example showing how to convert, union, filter 
and project streams with the Table API.  (was: Although the Table API does not 
offer much streaming features yet, there should be a runnable example showing 
how to convert, filter and project streams with the Table API.)

> Add a basic streaming Table API example
> ---
>
> Key: FLINK-4181
> URL: https://issues.apache.org/jira/browse/FLINK-4181
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>  Labels: starter
>
> Although the Table API does not offer much streaming features yet, there 
> should be a runnable example showing how to convert, union, filter and 
> project streams with the Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4182) HA recovery not working properly under ApplicationMaster failures.

2016-07-08 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-4182:

Priority: Blocker  (was: Major)

> HA recovery not working properly under ApplicationMaster failures.
> --
>
> Key: FLINK-4182
> URL: https://issues.apache.org/jira/browse/FLINK-4182
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.0.3
>Reporter: Stefan Richter
>Priority: Blocker
>
> When randomly killing TaskManager and ApplicationMaster, a job sometimes does 
> not properly recover in HA mode.
> There can be different symptoms for this. For example, in one case the job is 
> dying with the following exception:
> {code}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Cannot set up the user code libraries: Cannot get library 
> with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
>   at 
> org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
>   at 
> da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set 
> up the user code libraries: Cannot get library with hash 
> 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> 

[jira] [Commented] (FLINK-4182) HA recovery not working properly under ApplicationMaster failures.

2016-07-08 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367727#comment-15367727
 ] 

Aljoscha Krettek commented on FLINK-4182:
-

I think this one's a blocker for the 1.1 release.

> HA recovery not working properly under ApplicationMaster failures.
> --
>
> Key: FLINK-4182
> URL: https://issues.apache.org/jira/browse/FLINK-4182
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.0.3
>Reporter: Stefan Richter
>
> When randomly killing TaskManager and ApplicationMaster, a job sometimes does 
> not properly recover in HA mode.
> There can be different symptoms for this. For example, in one case the job is 
> dying with the following exception:
> {code}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Cannot set up the user code libraries: Cannot get library 
> with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
>   at 
> org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
>   at 
> da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set 
> up the user code libraries: Cannot get library with hash 
> 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>   at 

[jira] [Created] (FLINK-4183) Move checking for StreamTableEnvironment into validation layer

2016-07-08 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4183:
---

 Summary: Move checking for StreamTableEnvironment into validation 
layer
 Key: FLINK-4183
 URL: https://issues.apache.org/jira/browse/FLINK-4183
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Timo Walther
Assignee: Timo Walther
Priority: Minor


Some operators check the environment in `table.scala` instead of doing this 
during the valdation phase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4180) Create a batch SQL example

2016-07-08 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4180:
---

 Summary: Create a batch SQL example
 Key: FLINK-4180
 URL: https://issues.apache.org/jira/browse/FLINK-4180
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: Timo Walther


Currently there is no runnable code example in `flink-table` showing a working 
batch SQL query with the Table API.

A Scala and Java example should be added.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4182) HA recovery not working properly under ApplicationMaster failures.

2016-07-08 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-4182:
--
Description: 
When randomly killing TaskManager and ApplicationMaster, a job sometimes does 
not properly recover in HA mode.

There can be different symptoms for this. For example, in one case the job is 
dying with the following exception:

{code}
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Cannot set up the user code libraries: Cannot get library 
with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
at 
org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
at 
da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set 
up the user code libraries: Cannot get library with hash 
7fafffe9595cd06aff213b81b5da7b1682e1d6b0
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot get library with hash 
7fafffe9595cd06aff213b81b5da7b1682e1d6b0
at 

[jira] [Created] (FLINK-4182) HA recovery not working properly under ApplicationMaster failures.

2016-07-08 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4182:
-

 Summary: HA recovery not working properly under ApplicationMaster 
failures.
 Key: FLINK-4182
 URL: https://issues.apache.org/jira/browse/FLINK-4182
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination, State Backends, Checkpointing
Affects Versions: 1.0.3
Reporter: Stefan Richter


When randomly killing TaskManager and ApplicationMaster, a job sometimes does 
not properly recover in HA mode.

There can be different symptoms for this. For example, in one case the job is 
dying with the following exception:

{code}
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Cannot set up the user code libraries: Cannot get library 
with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
at 
org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
at 
da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set 
up the user code libraries: Cannot get library with hash 
7fafffe9595cd06aff213b81b5da7b1682e1d6b0
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 

[jira] [Updated] (FLINK-4180) Create a batch SQL example

2016-07-08 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-4180:

Labels: starter  (was: )

> Create a batch SQL example
> --
>
> Key: FLINK-4180
> URL: https://issues.apache.org/jira/browse/FLINK-4180
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>  Labels: starter
>
> Currently there is no runnable code example in `flink-table` showing a 
> working batch SQL query with the Table API.
> A Scala and Java example should be added.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4181) Add a basic streaming Table API example

2016-07-08 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-4181:

Labels: starter  (was: )

> Add a basic streaming Table API example
> ---
>
> Key: FLINK-4181
> URL: https://issues.apache.org/jira/browse/FLINK-4181
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>  Labels: starter
>
> Although the Table API does not offer much streaming features yet, there 
> should be a runnable example showing how to convert, filter and project 
> streams with the Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4172) Don't proxy a ProxiedObject

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367713#comment-15367713
 ] 

ASF GitHub Bot commented on FLINK-4172:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2213
  
+1 to merge. The previously not working `TriangleListing` example 
(undirected, rmat, print) is now working with this change.


> Don't proxy a ProxiedObject
> ---
>
> Key: FLINK-4172
> URL: https://issues.apache.org/jira/browse/FLINK-4172
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Some graph algorithms pass through a DataSet unmodified (at least, until we 
> have VertexSet and EdgeSet). We need to prevent a DataSet from being proxied 
> twice. Allowing two methods to own a single object sounds brittle, so we can 
> instead provide access to the original DataSet which can be wrapped in a new 
> proxy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2213: [FLINK-4172] [gelly] Don't proxy a ProxiedObject

2016-07-08 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2213
  
+1 to merge. The previously not working `TriangleListing` example 
(undirected, rmat, print) is now working with this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4181) Add a basic streaming Table API example

2016-07-08 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4181:
---

 Summary: Add a basic streaming Table API example
 Key: FLINK-4181
 URL: https://issues.apache.org/jira/browse/FLINK-4181
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: Timo Walther


Although the Table API does not offer much streaming features yet, there should 
be a runnable example showing how to convert, filter and project streams with 
the Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4179) Update TPCHQuery3Table example

2016-07-08 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-4179:

Labels: starter  (was: )

> Update TPCHQuery3Table example
> --
>
> Key: FLINK-4179
> URL: https://issues.apache.org/jira/browse/FLINK-4179
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>  Labels: starter
>
> The current TPC-H example should be updated to show how dates can be handled 
> easily now. Without using SimpleDateFormat. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4179) Update TPCHQuery3Table example

2016-07-08 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4179:
---

 Summary: Update TPCHQuery3Table example
 Key: FLINK-4179
 URL: https://issues.apache.org/jira/browse/FLINK-4179
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Timo Walther


The current TPC-H example should be updated to show how dates can be handled 
easily now. Without using SimpleDateFormat. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367695#comment-15367695
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2169
  
@mushketyk Thanks for the PR.
I also reviewed the current status. I think it's good to compare your code 
with #2159 before you rework it.


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2169: [FLINK-3943] Add support for EXCEPT operator

2016-07-08 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2169
  
@mushketyk Thanks for the PR.
I also reviewed the current status. I think it's good to compare your code 
with #2159 before you rework it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367692#comment-15367692
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r70075759
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.lang.Iterable
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode which matches along with DataSetOperator.
+  *
+  */
+class DataSetMinus(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+rowType: RelDataType,
+all: Boolean)
+  extends BiRel(cluster, traitSet, left, right)
+with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetMinus(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  rowType,
+  all
+)
+  }
+
+  override def toString: String = {
+s"SetMinus(setMinus: ($setMinusSelectionToString}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw).item("setMinus", setMinusSelectionToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+
+val children = this.getInputs
+val rowCnt = children.foldLeft(0D) { (rows, child) =>
+  rows + metadata.getRowCount(child)
+}
+
+planner.getCostFactory.makeCost(rowCnt, 0, 0)
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+var leftDataSet: DataSet[Any] = null
+var rightDataSet: DataSet[Any] = null
+
+expectedType match {
+  case None =>
+leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+rightDataSet =
+  right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, 
Some(leftDataSet.getType))
+  case _ =>
+leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+}
+
+val leftType = leftDataSet.getType
+val rightType = rightDataSet.getType
+val coGroupedDs = leftDataSet.coGroup(rightDataSet)
+
+// If it is atomic type, the field expression need to be "*".
+// Otherwise, we use int-based field position keys
+val coGroupedPredicateDs =
+if (leftType.isTupleType || leftType.isInstanceOf[CompositeType[Any]]) 
{
+  coGroupedDs.where(0 until left.getRowType.getFieldCount: _*)
+} else {
+  coGroupedDs.where("*")
+}
+
+val coGroupedWithoutFunctionDs =
+if (rightType.isTupleType || 
rightType.isInstanceOf[CompositeType[Any]]) {
  

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-07-08 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r70075759
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.lang.Iterable
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode which matches along with DataSetOperator.
+  *
+  */
+class DataSetMinus(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+rowType: RelDataType,
+all: Boolean)
+  extends BiRel(cluster, traitSet, left, right)
+with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetMinus(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  rowType,
+  all
+)
+  }
+
+  override def toString: String = {
+s"SetMinus(setMinus: ($setMinusSelectionToString}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw).item("setMinus", setMinusSelectionToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+
+val children = this.getInputs
+val rowCnt = children.foldLeft(0D) { (rows, child) =>
+  rows + metadata.getRowCount(child)
+}
+
+planner.getCostFactory.makeCost(rowCnt, 0, 0)
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+var leftDataSet: DataSet[Any] = null
+var rightDataSet: DataSet[Any] = null
+
+expectedType match {
+  case None =>
+leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+rightDataSet =
+  right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, 
Some(leftDataSet.getType))
+  case _ =>
+leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+}
+
+val leftType = leftDataSet.getType
+val rightType = rightDataSet.getType
+val coGroupedDs = leftDataSet.coGroup(rightDataSet)
+
+// If it is atomic type, the field expression need to be "*".
+// Otherwise, we use int-based field position keys
+val coGroupedPredicateDs =
+if (leftType.isTupleType || leftType.isInstanceOf[CompositeType[Any]]) 
{
+  coGroupedDs.where(0 until left.getRowType.getFieldCount: _*)
+} else {
+  coGroupedDs.where("*")
+}
+
+val coGroupedWithoutFunctionDs =
+if (rightType.isTupleType || 
rightType.isInstanceOf[CompositeType[Any]]) {
+  coGroupedPredicateDs.equalTo(0 until right.getRowType.getFieldCount: 
_*)
+} else {
+  coGroupedPredicateDs.equalTo("*")
+}
+
+coGroupedWithoutFunctionDs.`with`(new 

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-07-08 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r70075580
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.lang.Iterable
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode which matches along with DataSetOperator.
+  *
+  */
+class DataSetMinus(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+rowType: RelDataType,
+all: Boolean)
+  extends BiRel(cluster, traitSet, left, right)
+with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetMinus(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  rowType,
+  all
+)
+  }
+
+  override def toString: String = {
+s"SetMinus(setMinus: ($setMinusSelectionToString}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw).item("setMinus", setMinusSelectionToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+
+val children = this.getInputs
+val rowCnt = children.foldLeft(0D) { (rows, child) =>
+  rows + metadata.getRowCount(child)
+}
+
+planner.getCostFactory.makeCost(rowCnt, 0, 0)
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+var leftDataSet: DataSet[Any] = null
+var rightDataSet: DataSet[Any] = null
+
+expectedType match {
+  case None =>
+leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+rightDataSet =
+  right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, 
Some(leftDataSet.getType))
+  case _ =>
+leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+}
+
+val leftType = leftDataSet.getType
+val rightType = rightDataSet.getType
+val coGroupedDs = leftDataSet.coGroup(rightDataSet)
+
+// If it is atomic type, the field expression need to be "*".
+// Otherwise, we use int-based field position keys
+val coGroupedPredicateDs =
--- End diff --

You can use "*" for all types. So the case distinction is not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367690#comment-15367690
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r70075580
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.lang.Iterable
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode which matches along with DataSetOperator.
+  *
+  */
+class DataSetMinus(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+rowType: RelDataType,
+all: Boolean)
+  extends BiRel(cluster, traitSet, left, right)
+with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetMinus(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  rowType,
+  all
+)
+  }
+
+  override def toString: String = {
+s"SetMinus(setMinus: ($setMinusSelectionToString}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw).item("setMinus", setMinusSelectionToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+
+val children = this.getInputs
+val rowCnt = children.foldLeft(0D) { (rows, child) =>
+  rows + metadata.getRowCount(child)
+}
+
+planner.getCostFactory.makeCost(rowCnt, 0, 0)
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+var leftDataSet: DataSet[Any] = null
+var rightDataSet: DataSet[Any] = null
+
+expectedType match {
+  case None =>
+leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+rightDataSet =
+  right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, 
Some(leftDataSet.getType))
+  case _ =>
+leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+}
+
+val leftType = leftDataSet.getType
+val rightType = rightDataSet.getType
+val coGroupedDs = leftDataSet.coGroup(rightDataSet)
+
+// If it is atomic type, the field expression need to be "*".
+// Otherwise, we use int-based field position keys
+val coGroupedPredicateDs =
--- End diff --

You can use "*" for all types. So the case distinction is not necessary.


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-07-08 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r70073020
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.lang.Iterable
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode which matches along with DataSetOperator.
+  *
+  */
+class DataSetMinus(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+rowType: RelDataType,
+all: Boolean)
+  extends BiRel(cluster, traitSet, left, right)
+with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetMinus(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  rowType,
+  all
+)
+  }
+
+  override def toString: String = {
+s"SetMinus(setMinus: ($setMinusSelectionToString}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw).item("setMinus", setMinusSelectionToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+
+val children = this.getInputs
+val rowCnt = children.foldLeft(0D) { (rows, child) =>
+  rows + metadata.getRowCount(child)
+}
+
+planner.getCostFactory.makeCost(rowCnt, 0, 0)
--- End diff --

Shouldn't the cost be similar to an Intersect because both are using a 
CoGroup?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367668#comment-15367668
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r70073020
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.lang.Iterable
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode which matches along with DataSetOperator.
+  *
+  */
+class DataSetMinus(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+rowType: RelDataType,
+all: Boolean)
+  extends BiRel(cluster, traitSet, left, right)
+with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetMinus(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  rowType,
+  all
+)
+  }
+
+  override def toString: String = {
+s"SetMinus(setMinus: ($setMinusSelectionToString}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw).item("setMinus", setMinusSelectionToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+
+val children = this.getInputs
+val rowCnt = children.foldLeft(0D) { (rows, child) =>
+  rows + metadata.getRowCount(child)
+}
+
+planner.getCostFactory.makeCost(rowCnt, 0, 0)
--- End diff --

Shouldn't the cost be similar to an Intersect because both are using a 
CoGroup?


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367659#comment-15367659
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r70071856
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.lang.Iterable
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode which matches along with DataSetOperator.
--- End diff --

This description does not make much sense.


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-07-08 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r70071856
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.lang.Iterable
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode which matches along with DataSetOperator.
--- End diff --

This description does not make much sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367658#comment-15367658
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r70071730
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -236,6 +236,32 @@ case class Aggregate(
   }
 }
 
+case class SetMinus(left: LogicalNode, right: LogicalNode, all: Boolean) 
extends BinaryNode {
--- End diff --

I would still call it `Minus` to be in sync with `Union`, `Intersect`, etc. 
but that's just my personal opinion.


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-07-08 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r70071730
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -236,6 +236,32 @@ case class Aggregate(
   }
 }
 
+case class SetMinus(left: LogicalNode, right: LogicalNode, all: Boolean) 
extends BinaryNode {
--- End diff --

I would still call it `Minus` to be in sync with `Union`, `Intersect`, etc. 
but that's just my personal opinion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367652#comment-15367652
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r70071069
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -18,8 +18,8 @@
 
 package org.apache.flink.api.java;
--- End diff --

I think you did not intend to change this file. Can you remove it from this 
PR?


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-07-08 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r70071069
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -18,8 +18,8 @@
 
 package org.apache.flink.api.java;
--- End diff --

I think you did not intend to change this file. Can you remove it from this 
PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-07-08 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r70070564
  
--- Diff: docs/apis/table.md ---
@@ -695,6 +718,30 @@ val result = left.unionAll(right);
 
 
 
+  Minus
+  
+Similar to a SQL EXCEPT clause. Returns elements from the first 
table that do not exist in the second table. Both tables must have identical 
schema(field names and types).
--- End diff --

Can you copy the description from the Java part also in here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367645#comment-15367645
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r70070564
  
--- Diff: docs/apis/table.md ---
@@ -695,6 +718,30 @@ val result = left.unionAll(right);
 
 
 
+  Minus
+  
+Similar to a SQL EXCEPT clause. Returns elements from the first 
table that do not exist in the second table. Both tables must have identical 
schema(field names and types).
--- End diff --

Can you copy the description from the Java part also in here?


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

2016-07-08 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r70069883
  
--- Diff: docs/apis/table.md ---
@@ -536,6 +536,29 @@ Table result = left.unionAll(right);
 
 
 
+  Minus
+  
+Similar to a SQL EXCEPT clause. Except returns records from the 
first table that do not exist in the second table. Duplicate records in the 
first table are returned exactly once, i.e., duplicates are removed. Both 
tables must have identical schema, i.e., field names and types.
--- End diff --

"Except returns records" should be "Minus returns records" to be 
consistent. I would also use "left/right" table instead of first and second 
according to your example code. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-07-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367640#comment-15367640
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2169#discussion_r70069883
  
--- Diff: docs/apis/table.md ---
@@ -536,6 +536,29 @@ Table result = left.unionAll(right);
 
 
 
+  Minus
+  
+Similar to a SQL EXCEPT clause. Except returns records from the 
first table that do not exist in the second table. Duplicate records in the 
first table are returned exactly once, i.e., duplicates are removed. Both 
tables must have identical schema, i.e., field names and types.
--- End diff --

"Except returns records" should be "Minus returns records" to be 
consistent. I would also use "left/right" table instead of first and second 
according to your example code. 


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4029) Multi-field "sum" function just like "keyBy"

2016-07-08 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367631#comment-15367631
 ] 

Gabor Gevay commented on FLINK-4029:


You might want to reuse the FieldAccessor infrastructure that is in this PR:
https://github.com/apache/flink/pull/2094

> Multi-field "sum" function just like "keyBy"
> 
>
> Key: FLINK-4029
> URL: https://issues.apache.org/jira/browse/FLINK-4029
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Rami
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> I can use keyBy as follows:
> stream.keyBy(“pojo.field1”,”pojo.field2”,…)
> Would make sense that I can use sum for example, to do its job for more than 
> one field:
> stream.sum(“pojo.field1”,”pojo.field2”,…)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3942) Add support for INTERSECT

2016-07-08 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-3942.
-
   Resolution: Fixed
Fix Version/s: 1.1.0

Fixed in 815bc833f2c93227c19574119bcf1d00866030b5.

> Add support for INTERSECT
> -
>
> Key: FLINK-3942
> URL: https://issues.apache.org/jira/browse/FLINK-3942
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>Priority: Minor
> Fix For: 1.1.0
>
>
> Currently, the Table API and SQL do not support INTERSECT.
> INTERSECT can be executed as join on all fields.
> In order to add support for INTERSECT to the Table API and SQL we need to:
> - Implement a {{DataSetIntersect}} class that translates an INTERSECT into a 
> DataSet API program using a join on all fields.
> - Implement a {{DataSetIntersectRule}} that translates a Calcite 
> {{LogicalIntersect}} into a {{DataSetIntersect}}.
> - Extend the Table API (and validation phase) to provide an intersect() 
> method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >