[jira] [Commented] (FLINK-6386) Missing bracket in 'Compiler Limitation' section

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984281#comment-15984281
 ] 

ASF GitHub Bot commented on FLINK-6386:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3775#discussion_r113378988
  
--- Diff: docs/dev/java8.md ---
@@ -104,7 +104,7 @@ input.filter(line -> !line.contains("not"))
 Currently, Flink only supports jobs containing Lambda Expressions 
completely if they are **compiled with the Eclipse JDT compiler contained in 
Eclipse Luna 4.4.2 (and above)**.
 
 Only the Eclipse JDT compiler preserves the generic type information 
necessary to use the entire Lambda Expressions feature type-safely.
-Other compilers such as the OpenJDK's and Oracle JDK's `javac` throw away 
all generic parameters related to Lambda Expressions. This means that types 
such as `Tuple2` or `Collector` declared as a Lambda 
function input or output parameter will be pruned to `Tuple2` or `Collector` in 
the compiled `.class` files, which is too little information for the Flink 
Compiler.
--- End diff --

could you also add a space after the comma `Tuple2<...>` and lower case 
"Compiler" at the very end?


> Missing bracket in 'Compiler Limitation' section
> 
>
> Key: FLINK-6386
> URL: https://issues.apache.org/jira/browse/FLINK-6386
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Trivial
> Fix For: 1.2.2
>
>
> "This means that types such as `Tuple2 declared as..."
> should be 
> "This means that types such as `Tuple2` or 
> `Collector` declared as..."



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6387) Flink UI support access log

2017-04-26 Thread shijinkui (JIRA)
shijinkui created FLINK-6387:


 Summary: Flink UI support access log
 Key: FLINK-6387
 URL: https://issues.apache.org/jira/browse/FLINK-6387
 Project: Flink
  Issue Type: Improvement
  Components: Webfrontend
Reporter: shijinkui
Assignee: shijinkui


Record the use request to the access log. Append use access to the log file.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6356) Make times() eager and enable allowing combinations.

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984278#comment-15984278
 ] 

ASF GitHub Bot commented on FLINK-6356:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/3761#discussion_r113377068
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 ---
@@ -18,51 +18,83 @@
 package org.apache.flink.cep.pattern;
 
 import java.util.EnumSet;
+import java.util.Objects;
 
-public enum Quantifier {
-   ONE,
-   ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, 
QuantifierProperty.EAGER),
-   ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING),
-   ZERO_OR_MORE_EAGER_STRICT(QuantifierProperty.EAGER, 
QuantifierProperty.STRICT, QuantifierProperty.LOOPING),
-   ZERO_OR_MORE_COMBINATIONS_STRICT(QuantifierProperty.STRICT, 
QuantifierProperty.LOOPING),
-   ONE_OR_MORE_EAGER(
-   QuantifierProperty.LOOPING,
-   QuantifierProperty.EAGER,
-   QuantifierProperty.AT_LEAST_ONE),
-   ONE_OR_MORE_EAGER_STRICT(
-   QuantifierProperty.STRICT,
-   QuantifierProperty.LOOPING,
-   QuantifierProperty.EAGER,
-   QuantifierProperty.AT_LEAST_ONE),
-   ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, 
QuantifierProperty.AT_LEAST_ONE),
-   ONE_OR_MORE_COMBINATIONS_STRICT(
-   QuantifierProperty.STRICT,
-   QuantifierProperty.LOOPING,
-   QuantifierProperty.AT_LEAST_ONE),
-   TIMES(QuantifierProperty.TIMES),
-   TIMES_STRICT(QuantifierProperty.TIMES, QuantifierProperty.STRICT),
-   OPTIONAL;
+public class Quantifier {
 
private final EnumSet properties;
 
-   Quantifier(final QuantifierProperty first, final QuantifierProperty... 
rest) {
+   private Quantifier(final QuantifierProperty first, final 
QuantifierProperty... rest) {
this.properties = EnumSet.of(first, rest);
}
 
-   Quantifier() {
-   this.properties = EnumSet.noneOf(QuantifierProperty.class);
+   public static Quantifier ONE() {
+   return new Quantifier(QuantifierProperty.SINGLE);
+   }
+
+   public static Quantifier ONE_OR_MORE() {
+   return new Quantifier(QuantifierProperty.LOOPING, 
QuantifierProperty.EAGER);
+   }
+
+   public static Quantifier TIMES() {
+   return new Quantifier(QuantifierProperty.TIMES, 
QuantifierProperty.EAGER);
}
 
public boolean hasProperty(QuantifierProperty property) {
return properties.contains(property);
}
 
+   public void combinations() {
--- End diff --

The first `if` will be satisfied by `ONE` resulting in misleading 
exception. Because of that also the `else` branch of the second branch will not 
be reachable.


> Make times() eager and enable allowing combinations.
> 
>
> Key: FLINK-6356
> URL: https://issues.apache.org/jira/browse/FLINK-6356
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> This is the PR that addresses it https://github.com/apache/flink/pull/3761



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3761: [FLINK-6356] Make optional available to all patter...

2017-04-26 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/3761#discussion_r113377068
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 ---
@@ -18,51 +18,83 @@
 package org.apache.flink.cep.pattern;
 
 import java.util.EnumSet;
+import java.util.Objects;
 
-public enum Quantifier {
-   ONE,
-   ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, 
QuantifierProperty.EAGER),
-   ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING),
-   ZERO_OR_MORE_EAGER_STRICT(QuantifierProperty.EAGER, 
QuantifierProperty.STRICT, QuantifierProperty.LOOPING),
-   ZERO_OR_MORE_COMBINATIONS_STRICT(QuantifierProperty.STRICT, 
QuantifierProperty.LOOPING),
-   ONE_OR_MORE_EAGER(
-   QuantifierProperty.LOOPING,
-   QuantifierProperty.EAGER,
-   QuantifierProperty.AT_LEAST_ONE),
-   ONE_OR_MORE_EAGER_STRICT(
-   QuantifierProperty.STRICT,
-   QuantifierProperty.LOOPING,
-   QuantifierProperty.EAGER,
-   QuantifierProperty.AT_LEAST_ONE),
-   ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, 
QuantifierProperty.AT_LEAST_ONE),
-   ONE_OR_MORE_COMBINATIONS_STRICT(
-   QuantifierProperty.STRICT,
-   QuantifierProperty.LOOPING,
-   QuantifierProperty.AT_LEAST_ONE),
-   TIMES(QuantifierProperty.TIMES),
-   TIMES_STRICT(QuantifierProperty.TIMES, QuantifierProperty.STRICT),
-   OPTIONAL;
+public class Quantifier {
 
private final EnumSet properties;
 
-   Quantifier(final QuantifierProperty first, final QuantifierProperty... 
rest) {
+   private Quantifier(final QuantifierProperty first, final 
QuantifierProperty... rest) {
this.properties = EnumSet.of(first, rest);
}
 
-   Quantifier() {
-   this.properties = EnumSet.noneOf(QuantifierProperty.class);
+   public static Quantifier ONE() {
+   return new Quantifier(QuantifierProperty.SINGLE);
+   }
+
+   public static Quantifier ONE_OR_MORE() {
+   return new Quantifier(QuantifierProperty.LOOPING, 
QuantifierProperty.EAGER);
+   }
+
+   public static Quantifier TIMES() {
+   return new Quantifier(QuantifierProperty.TIMES, 
QuantifierProperty.EAGER);
}
 
public boolean hasProperty(QuantifierProperty property) {
return properties.contains(property);
}
 
+   public void combinations() {
--- End diff --

The first `if` will be satisfied by `ONE` resulting in misleading 
exception. Because of that also the `else` branch of the second branch will not 
be reachable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3761: [FLINK-6356] Make optional available to all patter...

2017-04-26 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/3761#discussion_r113378455
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 ---
@@ -162,130 +186,114 @@ public int getTimes() {
}
 
/**
-* Appends a new pattern operator to the existing one. The new pattern 
operator enforces strict
-* temporal contiguity. This means that the whole pattern only matches 
if an event which matches
-* this operator directly follows the preceding matching event. Thus, 
there cannot be any
-* events in between two matching events.
+* Appends a new pattern to the existing one. The new pattern enforces 
strict
+* temporal contiguity. This means that the whole pattern sequence 
matches only
+* if an event which matches this pattern directly follows the 
preceding matching
+* event. Thus, there cannot be any events in between two matching 
events.
 *
-* @param name Name of the new pattern operator
-* @return A new pattern operator which is appended to this pattern 
operator
+* @param name Name of the new pattern
+* @return A new pattern which is appended to this one
 */
public Pattern next(final String name) {
return new Pattern(name, this);
}
 
/**
-* Appends a new pattern operator to the existing one. The new pattern 
operator enforces
-* non-strict temporal contiguity. This means that a matching event of 
this operator and the
+* Appends a new pattern to the existing one. The new pattern enforces 
non-strict
+* temporal contiguity. This means that a matching event of this 
pattern and the
 * preceding matching event might be interleaved with other events 
which are ignored.
 *
-* @param name Name of the new pattern operator
-* @return A new pattern operator which is appended to this pattern 
operator
+* @param name Name of the new pattern
+* @return A new pattern which is appended to this one
 */
public FollowedByPattern followedBy(final String name) {
return new FollowedByPattern(name, this);
}
 
/**
-* Starts a new pattern with the initial pattern operator whose name is 
provided. Furthermore,
-* the base type of the event sequence is set.
-*
-* @param name Name of the new pattern operator
-* @param  Base type of the event pattern
-* @return The first pattern operator of a pattern
-*/
-   public static  Pattern begin(final String name) {
-   return new Pattern(name, null);
-   }
-
-   /**
-* Specifies that this pattern can occur zero or more times(kleene 
star).
-* This means any number of events can be matched in this state.
+* Specifies that this pattern is optional for a final match of the 
pattern
+* sequence to happen.
 *
-* @return The same pattern with applied Kleene star operator
-*
-* @throws MalformedPatternException if quantifier already applied
+* @return The same pattern as optional.
+* @throws MalformedPatternException if the quantifier is not 
applicable to this pattern.
 */
-   public Pattern zeroOrMore() {
-   return zeroOrMore(true);
+   public Pattern optional() {
+   quantifier.optional();
+   return this;
}
 
/**
-* Specifies that this pattern can occur zero or more times(kleene 
star).
-* This means any number of events can be matched in this state.
-*
-* If eagerness is enabled for a pattern A*B and sequence A1 A2 B will 
generate patterns:
-* B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.
+* Specifies that this pattern can occur {@code one or more} times.
+* This means at least one and at most infinite number of events can
+* be matched to this pattern.
 *
-* @param eager if true the pattern always consumes earlier events
-* @return The same pattern with applied Kleene star operator
+* If this quantifier is enabled for a
+* pattern {@code A.oneOrMore().followedBy(B)} and a sequence of events
+* {@code A1 A2 B} appears, this will generate patterns:
+* {@code A1 B} and {@code A1 A2 B}. See also {@link 
#allowCombinations()}.
 *
-* @throws MalformedPatternException if quantifier already applied
+* @return The same pattern with a {@link Quantifier#ONE_OR_MORE()} 
quantifier applied.
+* @throws MalformedPatternException if the quantifier is not 
applicable to this pattern.
 */
-   public Pattern zeroOrMore(final boolean eager) {
+   

[GitHub] flink pull request #3775: [FLINK-6386] Missing bracket in 'Compiler Limitati...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3775#discussion_r113378988
  
--- Diff: docs/dev/java8.md ---
@@ -104,7 +104,7 @@ input.filter(line -> !line.contains("not"))
 Currently, Flink only supports jobs containing Lambda Expressions 
completely if they are **compiled with the Eclipse JDT compiler contained in 
Eclipse Luna 4.4.2 (and above)**.
 
 Only the Eclipse JDT compiler preserves the generic type information 
necessary to use the entire Lambda Expressions feature type-safely.
-Other compilers such as the OpenJDK's and Oracle JDK's `javac` throw away 
all generic parameters related to Lambda Expressions. This means that types 
such as `Tuple2` or `Collector` declared as a Lambda 
function input or output parameter will be pruned to `Tuple2` or `Collector` in 
the compiled `.class` files, which is too little information for the Flink 
Compiler.
--- End diff --

could you also add a space after the comma `Tuple2<...>` and lower case 
"Compiler" at the very end?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3761: [FLINK-6356] Make optional available to all patter...

2017-04-26 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/3761#discussion_r113377152
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 ---
@@ -18,51 +18,83 @@
 package org.apache.flink.cep.pattern;
 
 import java.util.EnumSet;
+import java.util.Objects;
 
-public enum Quantifier {
-   ONE,
-   ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, 
QuantifierProperty.EAGER),
-   ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING),
-   ZERO_OR_MORE_EAGER_STRICT(QuantifierProperty.EAGER, 
QuantifierProperty.STRICT, QuantifierProperty.LOOPING),
-   ZERO_OR_MORE_COMBINATIONS_STRICT(QuantifierProperty.STRICT, 
QuantifierProperty.LOOPING),
-   ONE_OR_MORE_EAGER(
-   QuantifierProperty.LOOPING,
-   QuantifierProperty.EAGER,
-   QuantifierProperty.AT_LEAST_ONE),
-   ONE_OR_MORE_EAGER_STRICT(
-   QuantifierProperty.STRICT,
-   QuantifierProperty.LOOPING,
-   QuantifierProperty.EAGER,
-   QuantifierProperty.AT_LEAST_ONE),
-   ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, 
QuantifierProperty.AT_LEAST_ONE),
-   ONE_OR_MORE_COMBINATIONS_STRICT(
-   QuantifierProperty.STRICT,
-   QuantifierProperty.LOOPING,
-   QuantifierProperty.AT_LEAST_ONE),
-   TIMES(QuantifierProperty.TIMES),
-   TIMES_STRICT(QuantifierProperty.TIMES, QuantifierProperty.STRICT),
-   OPTIONAL;
+public class Quantifier {
 
private final EnumSet properties;
 
-   Quantifier(final QuantifierProperty first, final QuantifierProperty... 
rest) {
+   private Quantifier(final QuantifierProperty first, final 
QuantifierProperty... rest) {
this.properties = EnumSet.of(first, rest);
}
 
-   Quantifier() {
-   this.properties = EnumSet.noneOf(QuantifierProperty.class);
+   public static Quantifier ONE() {
+   return new Quantifier(QuantifierProperty.SINGLE);
+   }
+
+   public static Quantifier ONE_OR_MORE() {
+   return new Quantifier(QuantifierProperty.LOOPING, 
QuantifierProperty.EAGER);
+   }
+
+   public static Quantifier TIMES() {
+   return new Quantifier(QuantifierProperty.TIMES, 
QuantifierProperty.EAGER);
}
 
public boolean hasProperty(QuantifierProperty property) {
return properties.contains(property);
}
 
+   public void combinations() {
+   if (!hasProperty(Quantifier.QuantifierProperty.EAGER)) {
+   throw new MalformedPatternException("Combinations 
already allowed!");
+   }
+
+   if (hasProperty(Quantifier.QuantifierProperty.LOOPING) || 
hasProperty(Quantifier.QuantifierProperty.TIMES)) {
+   properties.remove(Quantifier.QuantifierProperty.EAGER);
+   } else {
+   throw new MalformedPatternException("Combinations not 
applicable to " + this + "!");
+   }
+   }
+
+   public void consecutive() {
+   if (hasProperty(Quantifier.QuantifierProperty.CONSECUTIVE)) {
--- End diff --

Same here for the `ONE` quantifier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6356) Make times() eager and enable allowing combinations.

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984279#comment-15984279
 ] 

ASF GitHub Bot commented on FLINK-6356:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/3761#discussion_r113378455
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 ---
@@ -162,130 +186,114 @@ public int getTimes() {
}
 
/**
-* Appends a new pattern operator to the existing one. The new pattern 
operator enforces strict
-* temporal contiguity. This means that the whole pattern only matches 
if an event which matches
-* this operator directly follows the preceding matching event. Thus, 
there cannot be any
-* events in between two matching events.
+* Appends a new pattern to the existing one. The new pattern enforces 
strict
+* temporal contiguity. This means that the whole pattern sequence 
matches only
+* if an event which matches this pattern directly follows the 
preceding matching
+* event. Thus, there cannot be any events in between two matching 
events.
 *
-* @param name Name of the new pattern operator
-* @return A new pattern operator which is appended to this pattern 
operator
+* @param name Name of the new pattern
+* @return A new pattern which is appended to this one
 */
public Pattern next(final String name) {
return new Pattern(name, this);
}
 
/**
-* Appends a new pattern operator to the existing one. The new pattern 
operator enforces
-* non-strict temporal contiguity. This means that a matching event of 
this operator and the
+* Appends a new pattern to the existing one. The new pattern enforces 
non-strict
+* temporal contiguity. This means that a matching event of this 
pattern and the
 * preceding matching event might be interleaved with other events 
which are ignored.
 *
-* @param name Name of the new pattern operator
-* @return A new pattern operator which is appended to this pattern 
operator
+* @param name Name of the new pattern
+* @return A new pattern which is appended to this one
 */
public FollowedByPattern followedBy(final String name) {
return new FollowedByPattern(name, this);
}
 
/**
-* Starts a new pattern with the initial pattern operator whose name is 
provided. Furthermore,
-* the base type of the event sequence is set.
-*
-* @param name Name of the new pattern operator
-* @param  Base type of the event pattern
-* @return The first pattern operator of a pattern
-*/
-   public static  Pattern begin(final String name) {
-   return new Pattern(name, null);
-   }
-
-   /**
-* Specifies that this pattern can occur zero or more times(kleene 
star).
-* This means any number of events can be matched in this state.
+* Specifies that this pattern is optional for a final match of the 
pattern
+* sequence to happen.
 *
-* @return The same pattern with applied Kleene star operator
-*
-* @throws MalformedPatternException if quantifier already applied
+* @return The same pattern as optional.
+* @throws MalformedPatternException if the quantifier is not 
applicable to this pattern.
 */
-   public Pattern zeroOrMore() {
-   return zeroOrMore(true);
+   public Pattern optional() {
+   quantifier.optional();
+   return this;
}
 
/**
-* Specifies that this pattern can occur zero or more times(kleene 
star).
-* This means any number of events can be matched in this state.
-*
-* If eagerness is enabled for a pattern A*B and sequence A1 A2 B will 
generate patterns:
-* B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.
+* Specifies that this pattern can occur {@code one or more} times.
+* This means at least one and at most infinite number of events can
+* be matched to this pattern.
 *
-* @param eager if true the pattern always consumes earlier events
-* @return The same pattern with applied Kleene star operator
+* If this quantifier is enabled for a
+* pattern {@code A.oneOrMore().followedBy(B)} and a sequence of events
+* {@code A1 A2 B} appears, this will generate patterns:
+* {@code A1 B} and {@code A1 A2 B}. See also {@link 
#allowCombinations()}.
 *
-* @throws MalformedPatternException if quantifier already applied
+* @return The same pattern with a 

[jira] [Commented] (FLINK-6356) Make times() eager and enable allowing combinations.

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984280#comment-15984280
 ] 

ASF GitHub Bot commented on FLINK-6356:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/3761#discussion_r113377152
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 ---
@@ -18,51 +18,83 @@
 package org.apache.flink.cep.pattern;
 
 import java.util.EnumSet;
+import java.util.Objects;
 
-public enum Quantifier {
-   ONE,
-   ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, 
QuantifierProperty.EAGER),
-   ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING),
-   ZERO_OR_MORE_EAGER_STRICT(QuantifierProperty.EAGER, 
QuantifierProperty.STRICT, QuantifierProperty.LOOPING),
-   ZERO_OR_MORE_COMBINATIONS_STRICT(QuantifierProperty.STRICT, 
QuantifierProperty.LOOPING),
-   ONE_OR_MORE_EAGER(
-   QuantifierProperty.LOOPING,
-   QuantifierProperty.EAGER,
-   QuantifierProperty.AT_LEAST_ONE),
-   ONE_OR_MORE_EAGER_STRICT(
-   QuantifierProperty.STRICT,
-   QuantifierProperty.LOOPING,
-   QuantifierProperty.EAGER,
-   QuantifierProperty.AT_LEAST_ONE),
-   ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, 
QuantifierProperty.AT_LEAST_ONE),
-   ONE_OR_MORE_COMBINATIONS_STRICT(
-   QuantifierProperty.STRICT,
-   QuantifierProperty.LOOPING,
-   QuantifierProperty.AT_LEAST_ONE),
-   TIMES(QuantifierProperty.TIMES),
-   TIMES_STRICT(QuantifierProperty.TIMES, QuantifierProperty.STRICT),
-   OPTIONAL;
+public class Quantifier {
 
private final EnumSet properties;
 
-   Quantifier(final QuantifierProperty first, final QuantifierProperty... 
rest) {
+   private Quantifier(final QuantifierProperty first, final 
QuantifierProperty... rest) {
this.properties = EnumSet.of(first, rest);
}
 
-   Quantifier() {
-   this.properties = EnumSet.noneOf(QuantifierProperty.class);
+   public static Quantifier ONE() {
+   return new Quantifier(QuantifierProperty.SINGLE);
+   }
+
+   public static Quantifier ONE_OR_MORE() {
+   return new Quantifier(QuantifierProperty.LOOPING, 
QuantifierProperty.EAGER);
+   }
+
+   public static Quantifier TIMES() {
+   return new Quantifier(QuantifierProperty.TIMES, 
QuantifierProperty.EAGER);
}
 
public boolean hasProperty(QuantifierProperty property) {
return properties.contains(property);
}
 
+   public void combinations() {
+   if (!hasProperty(Quantifier.QuantifierProperty.EAGER)) {
+   throw new MalformedPatternException("Combinations 
already allowed!");
+   }
+
+   if (hasProperty(Quantifier.QuantifierProperty.LOOPING) || 
hasProperty(Quantifier.QuantifierProperty.TIMES)) {
+   properties.remove(Quantifier.QuantifierProperty.EAGER);
+   } else {
+   throw new MalformedPatternException("Combinations not 
applicable to " + this + "!");
+   }
+   }
+
+   public void consecutive() {
+   if (hasProperty(Quantifier.QuantifierProperty.CONSECUTIVE)) {
--- End diff --

Same here for the `ONE` quantifier.


> Make times() eager and enable allowing combinations.
> 
>
> Key: FLINK-6356
> URL: https://issues.apache.org/jira/browse/FLINK-6356
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> This is the PR that addresses it https://github.com/apache/flink/pull/3761



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3692: FLINK-5974 Added configurations to support mesos-dns host...

2017-04-26 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3692
  
Sounds good.

I can do a final pass and merge this later this week...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3248: [FLINK-5695] [Table API & SQL] Optimize table type system...

2017-04-26 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3248
  
Hi @sunjincheng121, can you close this PR as well? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5695) Optimize table type systems based on database semantics

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984516#comment-15984516
 ] 

ASF GitHub Bot commented on FLINK-5695:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3248
  
Hi @sunjincheng121, can you close this PR as well? Thanks!


> Optimize table type systems based on database semantics
> ---
>
> Key: FLINK-5695
> URL: https://issues.apache.org/jira/browse/FLINK-5695
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Optimize table type systems based on database semantics.As follows:
> {code}
>  groupBy
>  >
> Table GroupedTable
>   ∧   <   ∧
>| select|
>|   |
>| where|
>| select| groupBy
>| agg   |
>| ...  |
>| window |
>   ∨ -> 
>  TableWindowedTable
>   <-
>   select
> {code}
> What do you think? [~fhueske] 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984308#comment-15984308
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/3733
  
hi @fhueske , thanks a lot for your review and help. I have addressed all 
your comments and updated the PR. All changes have been checked before my 
latest update. 
Thanks,  Hequn


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3732: [FLINK-6250] Distinct procTime with Rows boundaries

2017-04-26 Thread stefanobortoli
Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3732
  
@fhueske @sunjincheng121 @shijinkui @hongyuhong  I have created a PR with 
the latest master with the code generated distinct, #3771 please have a look. 
If we it is fine, we can basically support distinct for all the window types


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984392#comment-15984392
 ] 

ASF GitHub Bot commented on FLINK-6250:
---

Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3732
  
@fhueske @sunjincheng121 @shijinkui @hongyuhong  I have created a PR with 
the latest master with the code generated distinct, #3771 please have a look. 
If we it is fine, we can basically support distinct for all the window types


> Distinct procTime with Rows boundaries
> --
>
> Key: FLINK-6250
> URL: https://issues.apache.org/jira/browse/FLINK-6250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: radu
>Assignee: Stefano Bortoli
>
> Support proctime with rows boundaries
> Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 
> PRECEDING AND CURRENT ROW) FROM stream1`
> Q1.1. `SELECT COUNT(b), SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS 
> BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1`



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984505#comment-15984505
 ] 

ASF GitHub Bot commented on FLINK-5969:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/3778

[FLINK-5969] Add savepoint backwards compatibility tests from 1.2 to 1.3

The binary savepoints and snapshots in the tests were created on the commit 
of the Flink 1.2.0 release, so we test backwards compatibility within the Flink 
1.2.x line. Once this is approved I'll open another PR that transplants these 
commits on the master branch (with the binary snapshots/savepoints done on 
Flink 1.2.0) so that we test migration compatibility between 1.2.0 and what is 
going to be Flink 1.3.x.

I changed the naming of some existing tests so we now have 
`*From11MigrationTest` and `*From12MigrationTest` (and one ITCase). Immediately 
after releasing Flink 1.3.0 we should do the same, i.e. introduce 
`*From13MigrationTest` and ITCase based on the existing tests.

The unit tests are somewhat straightforward: we feed some data into an 
operator using an operator test harness, then we do a snapshot. (This is the 
part that has to be done on the "old" version to generate the binary snapshot 
that goes into the repo). The actual tests restore an operator form that 
snapshot and verify the output.

The ITCase is a bit more involved. We have a complete Job of user-functions 
and custom operators that tries to cover as many state/timer combinations as 
possible. We start the job and, using accumulators, observe the number of 
received elements in the sink. Once we get all elements we perform a savepoint 
and cancel the job. Thus we have all state caused by the elements reflected in 
our savepoint. This has to be done on the "old" version and the savepoint goes 
into the repo. The restoring job is instrumented with code that verifies 
restored state and updates accumulators. We listen on the accumulator changes 
and cancel the job once we have seen all required verifications.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-5969-backwards-compat-12-13-on-release12

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3778.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3778


commit ef9e73a1f8af8903b0689eada2a9d853034fab88
Author: Aljoscha Krettek 
Date:   2017-04-20T12:48:22Z

[FLINK-5969] Augment SavepointMigrationTestBase to catch failed jobs

commit 47143ba424355b7d25e9990bc308ea1744a0f33e
Author: Aljoscha Krettek 
Date:   2017-04-20T15:09:00Z

[FLINK-5969] Add savepoint IT case that checks restore from 1.2

The binary savepoints in this were created on the Flink 1.2.0 release
commit.

commit 3803dc04caae5e57f2cb23df0b6bc4663f8af08e
Author: Aljoscha Krettek 
Date:   2017-04-21T09:43:53Z

[FLINK-6353] Fix legacy user-state restore from 1.2

State that was checkpointed using Checkpointed (on a user function)
could be restored using CheckpointedRestoring when the savepoint was
done on Flink 1.2. The reason was an overzealous check in
AbstractUdfStreamOperator that only restores from "legacy" operator
state using CheckpointedRestoring when the stream is a Migration stream.

This removes that check but we still need to make sure to read away the
byte that indicates whether there is legacy state, which is written when
we're restoring from a Flink 1.1 savepoint.

After this fix, the procedure for a user to migrate a user function away
from the Checkpointed interface is this:

 - Perform savepoint with user function still implementing Checkpointed,
   shutdown job
 - Change user function to implement CheckpointedRestoring
 - Restore from previous savepoint, user function has to somehow move
   the state that is restored using CheckpointedRestoring to another
   type of state, .e.g operator state, using the OperatorStateStore.
 - Perform another savepoint, shutdown job
 - Remove CheckpointedRestoring interface from user function
 - Restore from the second savepoint
 - Done.

If the CheckpointedRestoring interface is not removed as prescribed in
the last steps then a future restore of a new savepoint will fail
because Flink will try to read legacy operator state that is not there
anymore.  The above steps also apply to Flink 1.3, when a user want's to
move away from the Checkpointed interface.

commit f08661adcf3a64daf955ace70683ef2fe14cec2c
Author: Aljoscha Krettek 
Date:   

[GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...

2017-04-26 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/3778

[FLINK-5969] Add savepoint backwards compatibility tests from 1.2 to 1.3

The binary savepoints and snapshots in the tests were created on the commit 
of the Flink 1.2.0 release, so we test backwards compatibility within the Flink 
1.2.x line. Once this is approved I'll open another PR that transplants these 
commits on the master branch (with the binary snapshots/savepoints done on 
Flink 1.2.0) so that we test migration compatibility between 1.2.0 and what is 
going to be Flink 1.3.x.

I changed the naming of some existing tests so we now have 
`*From11MigrationTest` and `*From12MigrationTest` (and one ITCase). Immediately 
after releasing Flink 1.3.0 we should do the same, i.e. introduce 
`*From13MigrationTest` and ITCase based on the existing tests.

The unit tests are somewhat straightforward: we feed some data into an 
operator using an operator test harness, then we do a snapshot. (This is the 
part that has to be done on the "old" version to generate the binary snapshot 
that goes into the repo). The actual tests restore an operator form that 
snapshot and verify the output.

The ITCase is a bit more involved. We have a complete Job of user-functions 
and custom operators that tries to cover as many state/timer combinations as 
possible. We start the job and, using accumulators, observe the number of 
received elements in the sink. Once we get all elements we perform a savepoint 
and cancel the job. Thus we have all state caused by the elements reflected in 
our savepoint. This has to be done on the "old" version and the savepoint goes 
into the repo. The restoring job is instrumented with code that verifies 
restored state and updates accumulators. We listen on the accumulator changes 
and cancel the job once we have seen all required verifications.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-5969-backwards-compat-12-13-on-release12

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3778.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3778


commit ef9e73a1f8af8903b0689eada2a9d853034fab88
Author: Aljoscha Krettek 
Date:   2017-04-20T12:48:22Z

[FLINK-5969] Augment SavepointMigrationTestBase to catch failed jobs

commit 47143ba424355b7d25e9990bc308ea1744a0f33e
Author: Aljoscha Krettek 
Date:   2017-04-20T15:09:00Z

[FLINK-5969] Add savepoint IT case that checks restore from 1.2

The binary savepoints in this were created on the Flink 1.2.0 release
commit.

commit 3803dc04caae5e57f2cb23df0b6bc4663f8af08e
Author: Aljoscha Krettek 
Date:   2017-04-21T09:43:53Z

[FLINK-6353] Fix legacy user-state restore from 1.2

State that was checkpointed using Checkpointed (on a user function)
could be restored using CheckpointedRestoring when the savepoint was
done on Flink 1.2. The reason was an overzealous check in
AbstractUdfStreamOperator that only restores from "legacy" operator
state using CheckpointedRestoring when the stream is a Migration stream.

This removes that check but we still need to make sure to read away the
byte that indicates whether there is legacy state, which is written when
we're restoring from a Flink 1.1 savepoint.

After this fix, the procedure for a user to migrate a user function away
from the Checkpointed interface is this:

 - Perform savepoint with user function still implementing Checkpointed,
   shutdown job
 - Change user function to implement CheckpointedRestoring
 - Restore from previous savepoint, user function has to somehow move
   the state that is restored using CheckpointedRestoring to another
   type of state, .e.g operator state, using the OperatorStateStore.
 - Perform another savepoint, shutdown job
 - Remove CheckpointedRestoring interface from user function
 - Restore from the second savepoint
 - Done.

If the CheckpointedRestoring interface is not removed as prescribed in
the last steps then a future restore of a new savepoint will fail
because Flink will try to read legacy operator state that is not there
anymore.  The above steps also apply to Flink 1.3, when a user want's to
move away from the Checkpointed interface.

commit f08661adcf3a64daf955ace70683ef2fe14cec2c
Author: Aljoscha Krettek 
Date:   2017-04-24T09:25:32Z

[FLINK-5969] Add ContinuousFileProcessingFrom12MigrationTest

The binary snapshots were created on the Flink 1.2 branch.

commit e70424eb6c9861e89c78f12143f319ce6eea49c1
Author: Aljoscha Krettek 

[jira] [Commented] (FLINK-6356) Make times() eager and enable allowing combinations.

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984512#comment-15984512
 ] 

ASF GitHub Bot commented on FLINK-6356:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/3761
  
@kl0u I don't have any more comments, so for me it LGTM.

 I've also rebased the skip-till-next on top of this one, so would be nice 
to merge it and start reviewing #3698


> Make times() eager and enable allowing combinations.
> 
>
> Key: FLINK-6356
> URL: https://issues.apache.org/jira/browse/FLINK-6356
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> This is the PR that addresses it https://github.com/apache/flink/pull/3761



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6107) Add custom checkstyle for flink-streaming-java

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984523#comment-15984523
 ] 

ASF GitHub Bot commented on FLINK-6107:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3567


> Add custom checkstyle for flink-streaming-java
> --
>
> Key: FLINK-6107
> URL: https://issues.apache.org/jira/browse/FLINK-6107
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> There was some consensus on the ML 
> (https://lists.apache.org/thread.html/94c8c5186b315c58c3f8aaf536501b99e8b92ee97b0034dee295ff6a@%3Cdev.flink.apache.org%3E)
>  that we want to have a more uniform code style. We should start 
> module-by-module and by introducing increasingly stricter rules. We have to 
> be aware of the PR situation and ensure that we have minimal breakage for 
> contributors.
> This issue aims at adding a custom checkstyle.xml for 
> {{flink-streaming-java}} that is based on our current checkstyle.xml but adds 
> these checks for Javadocs:
> {code}
> 
> 
> 
> 
>   
>   
>   
>   
>   
>   
>   
>   
> 
> 
> 
> 
>   
>   
>   
> 
> 
>   
>   
> 
> {code}
> This checks:
>  - Every type has a type-level Javadoc
>  - Proper use of {{}} in Javadocs
>  - First sentence must end with a proper punctuation mark
>  - Proper use (including closing) of HTML tags



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6387) Flink UI support access log

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984522#comment-15984522
 ] 

ASF GitHub Bot commented on FLINK-6387:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3777
  
@zentol @shijinkui Ah, forget what I said, I only read the description and 
thought it was something different ;-)

I thought this was about forbidding UI users to access the logs (which 
seems valid since the logs may contain sensitive information).

This seems to be about auditing, so completely different...


> Flink UI support access log
> ---
>
> Key: FLINK-6387
> URL: https://issues.apache.org/jira/browse/FLINK-6387
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: shijinkui
>Assignee: shijinkui
>
> Record the use request to the access log. Append use access to the log file.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3733: [FLINK-6091] [table] Implement and turn on retraction for...

2017-04-26 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3733
  
Thanks for the update @hequn8128. The PR looks really good, IMO.
Will do a couple of minor refactorings and merge it to `table-retraction` 
branch.

Best, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984524#comment-15984524
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3733
  
Thanks for the update @hequn8128. The PR looks really good, IMO.
Will do a couple of minor refactorings and merge it to `table-retraction` 
branch.

Best, Fabian


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics reporter

2017-04-26 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3736
  
+1 from my side
@zentol any further comments/concerns?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984387#comment-15984387
 ] 

ASF GitHub Bot commented on FLINK-6013:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3736
  
+1 from my side
@zentol any further comments/concerns?


> Add Datadog HTTP metrics reporter
> -
>
> Key: FLINK-6013
> URL: https://issues.apache.org/jira/browse/FLINK-6013
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.0
>
>
> We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a 
> lot other companies also do.
> Flink right now only has a StatsD metrics reporter, and users have to set up 
> Datadog Agent in order to receive metrics from StatsD and transport them to 
> Datadog. We don't like this approach.
> We prefer to have a Datadog metrics reporter directly contacting Datadog http 
> endpoint.
> I'll take this ticket myself.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5869) ExecutionGraph use FailoverCoordinator to manage the failover of execution vertexes

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984396#comment-15984396
 ] 

ASF GitHub Bot commented on FLINK-5869:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3772#discussion_r113399653
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java 
---
@@ -72,6 +72,13 @@
.defaultValue(16)

.withDeprecatedKeys("job-manager.max-attempts-history-size");
 
+   /**
+* The maximum number of prior execution attempts kept in history.
+*/
+   public static final ConfigOption EXECUTION_FAILOVER_STRATEGY =
+   key("jobmanager.execution.failover-strategy")
+   .defaultValue("full");
--- End diff --

I don't think we have done that yet, no...

There is a PR to extend the config options to include a description (and 
generate docs), it could be an extension there.


> ExecutionGraph use FailoverCoordinator to manage the failover of execution 
> vertexes
> ---
>
> Key: FLINK-5869
> URL: https://issues.apache.org/jira/browse/FLINK-5869
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>
> Execution graph doesn't manage the failover of executions. It only care for 
> the state of the whole job, which is CREATED, RUNNING, FAILED, FINISHED, or 
> SUSPEND. 
> For execution failure, it will notice the FailoverCoordinator to do failover.
> It only record the finished job vertex and changes to FINISHED after all 
> vertexes finished.
> It will change to final fail if restart strategy fail or meet unrecoverable 
> exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3772: [FLINK-5869] [flip-1] Introduce abstraction for Fa...

2017-04-26 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3772#discussion_r113399653
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java 
---
@@ -72,6 +72,13 @@
.defaultValue(16)

.withDeprecatedKeys("job-manager.max-attempts-history-size");
 
+   /**
+* The maximum number of prior execution attempts kept in history.
+*/
+   public static final ConfigOption EXECUTION_FAILOVER_STRATEGY =
+   key("jobmanager.execution.failover-strategy")
+   .defaultValue("full");
--- End diff --

I don't think we have done that yet, no...

There is a PR to extend the config options to include a description (and 
generate docs), it could be an extension there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6387) Flink UI support access log

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984464#comment-15984464
 ] 

ASF GitHub Bot commented on FLINK-6387:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3777#discussion_r113406502
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
 ---
@@ -183,4 +194,28 @@ else if (currentDecoder != null && msg instanceof 
HttpContent) {
}
}
}
+
+  /**
+   * Record the access log if enable configure of
+   * {@link 
org.apache.flink.configuration.ConfigConstants#JOB_MANAGER_WEB_ACCESSLOG_ENABLE}.
+   * record format:
+   * remote_addr - [time_local] "request_method URI protocolVersion" 
"http_referer" "http_user_agent"
+   */
+   private void accesslog(ChannelHandlerContext ctx, HttpRequest req) {
--- End diff --

this should be renamed to `logAccess`.


> Flink UI support access log
> ---
>
> Key: FLINK-6387
> URL: https://issues.apache.org/jira/browse/FLINK-6387
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: shijinkui
>Assignee: shijinkui
>
> Record the use request to the access log. Append use access to the log file.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6387) Flink UI support access log

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984465#comment-15984465
 ] 

ASF GitHub Bot commented on FLINK-6387:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3777#discussion_r113406464
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -645,6 +645,10 @@
@Deprecated
public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = 
"jobmanager.web.log.path";
 
+   /** Config parameter indicating whether jobs can be uploaded and run 
from the web-frontend. */
--- End diff --

This doc isn't in sync with what the implementation actually does. It 
doesn't restrict anything but only logs all requests.


> Flink UI support access log
> ---
>
> Key: FLINK-6387
> URL: https://issues.apache.org/jira/browse/FLINK-6387
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: shijinkui
>Assignee: shijinkui
>
> Record the use request to the access log. Append use access to the log file.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6387) Flink UI support access log

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984463#comment-15984463
 ] 

ASF GitHub Bot commented on FLINK-6387:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3777#discussion_r113406982
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -645,6 +645,10 @@
@Deprecated
public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = 
"jobmanager.web.log.path";
 
+   /** Config parameter indicating whether jobs can be uploaded and run 
from the web-frontend. */
+   public static final ConfigOption 
JOB_MANAGER_WEB_ACCESSLOG_ENABLE =
+   
key("jobmanager.web.accesslog.enable").defaultValue(false);
--- End diff --

please move the `.defaultValue` call into a new line. It should also be 
moved to the JobManagerOptions class.


> Flink UI support access log
> ---
>
> Key: FLINK-6387
> URL: https://issues.apache.org/jira/browse/FLINK-6387
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: shijinkui
>Assignee: shijinkui
>
> Record the use request to the access log. Append use access to the log file.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6387) Flink UI support access log

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984466#comment-15984466
 ] 

ASF GitHub Bot commented on FLINK-6387:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3777#discussion_r113406843
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
 ---
@@ -183,4 +194,28 @@ else if (currentDecoder != null && msg instanceof 
HttpContent) {
}
}
}
+
+  /**
+   * Record the access log if enable configure of
+   * {@link 
org.apache.flink.configuration.ConfigConstants#JOB_MANAGER_WEB_ACCESSLOG_ENABLE}.
+   * record format:
+   * remote_addr - [time_local] "request_method URI protocolVersion" 
"http_referer" "http_user_agent"
+   */
+   private void accesslog(ChannelHandlerContext ctx, HttpRequest req) {
+   HttpHeaders headers = req.headers();
+   if (headers != null) {
+   String line = ctx.channel().remoteAddress() + " - [" + 
new Date() + "] \""
--- End diff --

Please merge this string generation into the log statement.


> Flink UI support access log
> ---
>
> Key: FLINK-6387
> URL: https://issues.apache.org/jira/browse/FLINK-6387
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: shijinkui
>Assignee: shijinkui
>
> Record the use request to the access log. Append use access to the log file.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984491#comment-15984491
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113409179
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * A cassandra  {@link StreamTableSink}.
+ *
+ */
+class CassandraTableSink implements StreamTableSink {
+   private final ClusterBuilder builder;
+   private final String cql;
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+   private final Properties properties;
+
+   public CassandraTableSink(ClusterBuilder builder, String cql, 
Properties properties) {
+   this.builder = Preconditions.checkNotNull(builder, "builder");
--- End diff --

Please include a slightly longer exception message, i.e "ClusterBuilder 
must not be null." The same applies to other calls to 
`Preconditions#checkNotNull()`.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984500#comment-15984500
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113410392
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder 
builder) {
private static final String SELECT_DATA_QUERY = "SELECT * FROM 
flink.test;";
 
private static final ArrayList> 
collection = new ArrayList<>(20);
+   private static final ArrayList 
rowCollection = new ArrayList<>(20);
+
+   private static final String[] FIELD_NAMES = new String[] {"id", 
"counter", "batch_id"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};
--- End diff --

missing indentation.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984501#comment-15984501
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113410447
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder 
builder) {
private static final String SELECT_DATA_QUERY = "SELECT * FROM 
flink.test;";
 
private static final ArrayList> 
collection = new ArrayList<>(20);
+   private static final ArrayList 
rowCollection = new ArrayList<>(20);
+
+   private static final String[] FIELD_NAMES = new String[] {"id", 
"counter", "batch_id"};
--- End diff --

remove the space after `[]`.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984499#comment-15984499
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113410291
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder 
builder) {
private static final String SELECT_DATA_QUERY = "SELECT * FROM 
flink.test;";
 
private static final ArrayList> 
collection = new ArrayList<>(20);
+   private static final ArrayList 
rowCollection = new ArrayList<>(20);
+
+   private static final String[] FIELD_NAMES = new String[] {"id", 
"counter", "batch_id"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};
 
static {
for (int i = 0; i < 20; i++) {
collection.add(new 
Tuple3<>(UUID.randomUUID().toString(), i, 0));
+   
rowCollection.add(org.apache.flink.types.Row.of(UUID.randomUUID().toString(), 
i, 0));
--- End diff --

same as above regarding the import


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984492#comment-15984492
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113408171
  
--- Diff: flink-connectors/flink-connector-cassandra/pom.xml ---
@@ -176,5 +176,10 @@ under the License.



+   
+   org.apache.flink
+   flink-table_2.10
+   1.3-SNAPSHOT
--- End diff --

should be set to provided.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984528#comment-15984528
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
The new API should in a new class, such as FlinkKafkaPartitioner. For the 
older KafkaPartitioner implementation, it will be delegated by 
FlinkKafkaAdpterPartitioner extends FlinkKafkaPartitioner, in which contains 
defaultTopicId/partitions and KafkaPartitioner delegate, and map the new api to 
the current one in terms. Of course, as it is now, default topic's partitions 
will be used for the diffrent target topics in FlinkKafkaAdpterPartitioner.


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-04-26 Thread fanyon
Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
The new API should in a new class, such as FlinkKafkaPartitioner. For the 
older KafkaPartitioner implementation, it will be delegated by 
FlinkKafkaAdpterPartitioner extends FlinkKafkaPartitioner, in which contains 
defaultTopicId/partitions and KafkaPartitioner delegate, and map the new api to 
the current one in terms. Of course, as it is now, default topic's partitions 
will be used for the diffrent target topics in FlinkKafkaAdpterPartitioner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113415863
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
--- End diff --

Thanks I added the case for the reverse order. This code is needed for 
nullable records in order to access the schema.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please

[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984537#comment-15984537
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113415863
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
--- End diff --

Thanks I added the case for the reverse order. This code is needed for 
nullable records in order to access the 

[GitHub] flink issue #3777: [FLINK-6387] [webfrontend]Flink UI support access log

2017-04-26 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3777
  
I think this is a good issue.
I am wondering if we may want to approach this a bit broader even and 
define certain "access levels" to the UI:

  - view jobs only
  - view jobs and logs
  - view jobs, logs, cancel jobs
  - view jobs, logs, cancel jobs, submit new jobs

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6387) Flink UI support access log

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984391#comment-15984391
 ] 

ASF GitHub Bot commented on FLINK-6387:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3777
  
I think this is a good issue.
I am wondering if we may want to approach this a bit broader even and 
define certain "access levels" to the UI:

  - view jobs only
  - view jobs and logs
  - view jobs, logs, cancel jobs
  - view jobs, logs, cancel jobs, submit new jobs

What do you think?


> Flink UI support access log
> ---
>
> Key: FLINK-6387
> URL: https://issues.apache.org/jira/browse/FLINK-6387
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: shijinkui
>Assignee: shijinkui
>
> Record the use request to the access log. Append use access to the log file.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984441#comment-15984441
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3766
  
How would this new API map to the current one in terms of backwards 
compatibility?


> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984517#comment-15984517
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113410993
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.types.Row;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class JDBCTableSinkTest {
+   private static final String[] FIELD_NAMES = new String[] {"foo"};
--- End diff --

remove space after `[]`.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3777: [FLINK-6387] [webfrontend]Flink UI support access log

2017-04-26 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3777
  
@zentol @shijinkui Ah, forget what I said, I only read the description and 
thought it was something different ;-)

I thought this was about forbidding UI users to access the logs (which 
seems valid since the logs may contain sensitive information).

This seems to be about auditing, so completely different...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3567: [FLINK-6107] Add custom checkstyle for flink-strea...

2017-04-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3567


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984520#comment-15984520
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113412006
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class JDBCTableSink extends GenericWriteAheadSink implements 
StreamTableSink {
+   private final JDBCOutputFormat outputFormat;
+   private final CheckpointCommitter committer;
+   private final String[] fieldNames;
+   private final TypeInformation[] fieldTypes;
+
+   public JDBCTableSink(CheckpointCommitter committer, TypeSerializer 
serializer,
+   JDBCOutputFormat outputFormat, String[] 
fieldNames,
+   TypeInformation[] fieldTypes) throws Exception {
+   super(committer, serializer, 
UUID.randomUUID().toString().replace("-", "_"));
+   this.outputFormat = outputFormat;
+   this.committer = committer;
+   this.fieldNames = fieldNames;
+   this.fieldTypes = fieldTypes;
+   }
+
+   @Override
+   public void emitDataStream(DataStream dataStream) {
+   dataStream.transform("JDBC Sink", getOutputType(), this);
+   }
+
+   @Override
+   public TypeInformation getOutputType() {
+   return new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   try {
+   return new JDBCTableSink(committer, serializer, 
outputFormat, fieldNames, fieldTypes);
+   } catch (Exception e) {
+   LOG.warn("Failed to create a copy of the sink.", e);
+   return null;
+   }
+   }
+
+   @Override
+   protected boolean sendValues(Iterable value, long timestamp) 
throws Exception {
+   for (Row r : value) {
+   try {
+   outputFormat.writeRecord(r);
--- End diff --

This doesn't guarantee in any way  that the values are actually being sent; 
you need some kind of flushing functionality for this to work properly.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6281) Create TableSink for JDBC

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984519#comment-15984519
 ] 

ASF GitHub Bot commented on FLINK-6281:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113410894
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class JDBCTableSink extends GenericWriteAheadSink implements 
StreamTableSink {
+   private final JDBCOutputFormat outputFormat;
+   private final CheckpointCommitter committer;
+   private final String[] fieldNames;
+   private final TypeInformation[] fieldTypes;
+
+   public JDBCTableSink(CheckpointCommitter committer, TypeSerializer 
serializer,
+   JDBCOutputFormat outputFormat, String[] 
fieldNames,
+   TypeInformation[] fieldTypes) throws Exception {
--- End diff --

like the cassandra sink the `fieldNames/Types` should be removed to provide 
a clean API to the user.


> Create TableSink for JDBC
> -
>
> Key: FLINK-6281
> URL: https://issues.apache.org/jira/browse/FLINK-6281
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be nice to integrate the table APIs with the JDBC connectors so that 
> the rows in the tables can be directly pushed into JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6356) Make times() eager and enable allowing combinations.

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984364#comment-15984364
 ] 

ASF GitHub Bot commented on FLINK-6356:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/3761
  
Thanks for the review @dawidwys ! Thanks a lot for the review.
I integrated your comments. Let me know when you are done so that I can 
merge this.


> Make times() eager and enable allowing combinations.
> 
>
> Key: FLINK-6356
> URL: https://issues.apache.org/jira/browse/FLINK-6356
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> This is the PR that addresses it https://github.com/apache/flink/pull/3761



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3761: [FLINK-6356] Make optional available to all patterns/time...

2017-04-26 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/3761
  
Thanks for the review @dawidwys ! Thanks a lot for the review.
I integrated your comments. Let me know when you are done so that I can 
merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6336) Placement Constraints for Mesos

2017-04-26 Thread Stephen Gran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984362#comment-15984362
 ] 

Stephen Gran commented on FLINK-6336:
-

Hi,

Any chance this can make the 1.3.0 release?


Thanks!

> Placement Constraints for Mesos
> ---
>
> Key: FLINK-6336
> URL: https://issues.apache.org/jira/browse/FLINK-6336
> Project: Flink
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 1.2.0
>Reporter: Stephen Gran
>Priority: Minor
>
> Fenzo supports placement constraints for tasks, and operators expose agent 
> attributes to frameworks in the form of attributes about the agent offer.
> It would be extremely helpful in our multi-tenant cluster to be able to make 
> use of this facility.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984402#comment-15984402
 ] 

ASF GitHub Bot commented on FLINK-6288:
---

Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
@gyfora Thanks for your comment. And right, your question is very good. I 
originally thought that the user must be sure to know all the output topic when 
the job is submitted, but in the real business scenario, the data may be output 
to the dynamically generated topic. 

For the requirementof generate dynamic topic, I propose to adjust the open 
and partition api of KafkaPartitioner as follows:
1. The open method, remove the parameter int[] partitions, and will be 
opend once for each partitioner
public void open(int parallelInstanceId, int parallelInstances)
2. The partition method, add int[] partitions and target topic parameters
public int partition(T next, byte[] serializedKey, byte[] serializedValue, 
String topic, int[] partitions)

@gyfora @tzulitai What do you think of this? Please feel free to give any 
suggestions, thanks!



> FlinkKafkaProducer's custom Partitioner is always invoked with number of 
> partitions of default topic
> 
>
> Key: FLINK-6288
> URL: https://issues.apache.org/jira/browse/FLINK-6288
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> The {{FlinkKafkaProducerBase}} supports routing records to topics besides the 
> default topic, but the custom {{Partitioner}} interface does not follow this 
> semantic.
> The partitioner is always invoked the {{partition}} method with the number of 
> partitions in the default topic, and not the number of partitions of the 
> current {{targetTopic}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...

2017-04-26 Thread fanyon
Github user fanyon commented on the issue:

https://github.com/apache/flink/pull/3766
  
@gyfora Thanks for your comment. And right, your question is very good. I 
originally thought that the user must be sure to know all the output topic when 
the job is submitted, but in the real business scenario, the data may be output 
to the dynamically generated topic. 

For the requirementof generate dynamic topic, I propose to adjust the open 
and partition api of KafkaPartitioner as follows:
1. The open method, remove the parameter int[] partitions, and will be 
opend once for each partitioner
public void open(int parallelInstanceId, int parallelInstances)
2. The partition method, add int[] partitions and target topic parameters
public int partition(T next, byte[] serializedKey, byte[] serializedValue, 
String topic, int[] partitions)

@gyfora @tzulitai What do you think of this? Please feel free to give any 
suggestions, thanks!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984497#comment-15984497
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113410371
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder 
builder) {
private static final String SELECT_DATA_QUERY = "SELECT * FROM 
flink.test;";
 
private static final ArrayList> 
collection = new ArrayList<>(20);
+   private static final ArrayList 
rowCollection = new ArrayList<>(20);
+
+   private static final String[] FIELD_NAMES = new String[] {"id", 
"counter", "batch_id"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO,
--- End diff --

Move `STRING_TYPE_INFO` to the next line.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984496#comment-15984496
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113410046
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -86,11 +83,15 @@
 
private static EmbeddedCassandraService cassandra;
 
+   private static String HOST = "127.0.0.1";
+
+   private static int PORT = 9042;
--- End diff --

should be final


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984494#comment-15984494
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113410687
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -438,6 +461,27 @@ public void cancel() {
}
 
@Test
+   public void testCassandraTableSink() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+
+   DataStreamSource source = 
env.fromCollection(rowCollection);
+   CassandraTableSink cassandraTableSink = new 
CassandraTableSink(new ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder.addContactPointsWithPorts(new 
InetSocketAddress(HOST, PORT)).build();
+   }
+   }, INSERT_DATA_QUERY, new Properties());
+   cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES);
--- End diff --

Shouldn't we assign the returned value to `cassandraTableSink`?


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984495#comment-15984495
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113410244
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder 
builder) {
private static final String SELECT_DATA_QUERY = "SELECT * FROM 
flink.test;";
 
private static final ArrayList> 
collection = new ArrayList<>(20);
+   private static final ArrayList 
rowCollection = new ArrayList<>(20);
--- End diff --

Please add an import for `Row` instead of using the qualified name.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984493#comment-15984493
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113409930
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * A cassandra  {@link StreamTableSink}.
+ *
--- End diff --

remove this line.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113410291
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder 
builder) {
private static final String SELECT_DATA_QUERY = "SELECT * FROM 
flink.test;";
 
private static final ArrayList> 
collection = new ArrayList<>(20);
+   private static final ArrayList 
rowCollection = new ArrayList<>(20);
+
+   private static final String[] FIELD_NAMES = new String[] {"id", 
"counter", "batch_id"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};
 
static {
for (int i = 0; i < 20; i++) {
collection.add(new 
Tuple3<>(UUID.randomUUID().toString(), i, 0));
+   
rowCollection.add(org.apache.flink.types.Row.of(UUID.randomUUID().toString(), 
i, 0));
--- End diff --

same as above regarding the import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113410371
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder 
builder) {
private static final String SELECT_DATA_QUERY = "SELECT * FROM 
flink.test;";
 
private static final ArrayList> 
collection = new ArrayList<>(20);
+   private static final ArrayList 
rowCollection = new ArrayList<>(20);
+
+   private static final String[] FIELD_NAMES = new String[] {"id", 
"counter", "batch_id"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO,
--- End diff --

Move `STRING_TYPE_INFO` to the next line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113410392
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder 
builder) {
private static final String SELECT_DATA_QUERY = "SELECT * FROM 
flink.test;";
 
private static final ArrayList> 
collection = new ArrayList<>(20);
+   private static final ArrayList 
rowCollection = new ArrayList<>(20);
+
+   private static final String[] FIELD_NAMES = new String[] {"id", 
"counter", "batch_id"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};
--- End diff --

missing indentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113410244
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder 
builder) {
private static final String SELECT_DATA_QUERY = "SELECT * FROM 
flink.test;";
 
private static final ArrayList> 
collection = new ArrayList<>(20);
+   private static final ArrayList 
rowCollection = new ArrayList<>(20);
--- End diff --

Please add an import for `Row` instead of using the qualified name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113410447
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder 
builder) {
private static final String SELECT_DATA_QUERY = "SELECT * FROM 
flink.test;";
 
private static final ArrayList> 
collection = new ArrayList<>(20);
+   private static final ArrayList 
rowCollection = new ArrayList<>(20);
+
+   private static final String[] FIELD_NAMES = new String[] {"id", 
"counter", "batch_id"};
--- End diff --

remove the space after `[]`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113410023
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -86,11 +83,15 @@
 
private static EmbeddedCassandraService cassandra;
 
+   private static String HOST = "127.0.0.1";
--- End diff --

should be final


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113408171
  
--- Diff: flink-connectors/flink-connector-cassandra/pom.xml ---
@@ -176,5 +176,10 @@ under the License.



+   
+   org.apache.flink
+   flink-table_2.10
+   1.3-SNAPSHOT
--- End diff --

should be set to provided.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113409930
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * A cassandra  {@link StreamTableSink}.
+ *
--- End diff --

remove this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113410687
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -438,6 +461,27 @@ public void cancel() {
}
 
@Test
+   public void testCassandraTableSink() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+
+   DataStreamSource source = 
env.fromCollection(rowCollection);
+   CassandraTableSink cassandraTableSink = new 
CassandraTableSink(new ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return builder.addContactPointsWithPorts(new 
InetSocketAddress(HOST, PORT)).build();
+   }
+   }, INSERT_DATA_QUERY, new Properties());
+   cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES);
--- End diff --

Shouldn't we assign the returned value to `cassandraTableSink`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984498#comment-15984498
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113410023
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -86,11 +83,15 @@
 
private static EmbeddedCassandraService cassandra;
 
+   private static String HOST = "127.0.0.1";
--- End diff --

should be final


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113410046
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -86,11 +83,15 @@
 
private static EmbeddedCassandraService cassandra;
 
+   private static String HOST = "127.0.0.1";
+
+   private static int PORT = 9042;
--- End diff --

should be final


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3748#discussion_r113409179
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * A cassandra  {@link StreamTableSink}.
+ *
+ */
+class CassandraTableSink implements StreamTableSink {
+   private final ClusterBuilder builder;
+   private final String cql;
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+   private final Properties properties;
+
+   public CassandraTableSink(ClusterBuilder builder, String cql, 
Properties properties) {
+   this.builder = Preconditions.checkNotNull(builder, "builder");
--- End diff --

Please include a slightly longer exception message, i.e "ClusterBuilder 
must not be null." The same applies to other calls to 
`Preconditions#checkNotNull()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3777: [FLINK-6387] [webfrontend]Flink UI support access ...

2017-04-26 Thread shijinkui
GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/3777

[FLINK-6387] [webfrontend]Flink UI support access log

Record the use request to the access log. Append use access to the log file.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-6387] 
[webfrontend]Flink UI support access log")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hwstreaming/flink access_log_support

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3777.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3777


commit 0d19fb95072c90125152513c9b2a07b518d16b27
Author: shijinkui 
Date:   2017-02-23T12:06:43Z

[FLINK-6387] [webfrontend]Flink UI support access log




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6387) Flink UI support access log

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984332#comment-15984332
 ] 

ASF GitHub Bot commented on FLINK-6387:
---

GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/3777

[FLINK-6387] [webfrontend]Flink UI support access log

Record the use request to the access log. Append use access to the log file.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-6387] 
[webfrontend]Flink UI support access log")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hwstreaming/flink access_log_support

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3777.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3777


commit 0d19fb95072c90125152513c9b2a07b518d16b27
Author: shijinkui 
Date:   2017-02-23T12:06:43Z

[FLINK-6387] [webfrontend]Flink UI support access log




> Flink UI support access log
> ---
>
> Key: FLINK-6387
> URL: https://issues.apache.org/jira/browse/FLINK-6387
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: shijinkui
>Assignee: shijinkui
>
> Record the use request to the access log. Append use access to the log file.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r113405541
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, Scheduled {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+
+   // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+   private final Map gauges = new ConcurrentHashMap<>();
+   private final Map counters = new 
ConcurrentHashMap<>();
+   private final Map meters = new ConcurrentHashMap<>();
+
+   private DatadogHttpClient client;
+   private List configTags;
+
+   public static final String API_KEY = "apikey";
--- End diff --

You could define these as a `ConfigOption` which would make it more obvious 
what the default value is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984448#comment-15984448
 ] 

ASF GitHub Bot commented on FLINK-6013:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r113405034
  
--- Diff: flink-metrics/flink-metrics-datadog/pom.xml ---
@@ -0,0 +1,109 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-metrics-datadog
+   flink-metrics-datadog
+
+   
+   
+   org.apache.flink
+   flink-metrics-core
+   ${project.version}
+   provided
+   
+
+   
+   com.fasterxml.jackson.core
+   jackson-databind
+   provided
+   
+
+   
+   com.squareup.okhttp3
+   okhttp
+   3.6.0
+   
+
+   
+   com.squareup.okio
+   okio
+   1.11.0
+   
+
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-test-utils-junit
+   ${project.version}
+   test
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   package
+   
+   shade
+   
+   
+   
jar-with-dependencies
--- End diff --

I would remove this and adjust the `opt.xml` again; this makes it more 
consistent with other shaded jars that we produce.


> Add Datadog HTTP metrics reporter
> -
>
> Key: FLINK-6013
> URL: https://issues.apache.org/jira/browse/FLINK-6013
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.0
>
>
> We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a 
> lot other companies also do.
> Flink right now only has a StatsD metrics reporter, and users have to set up 
> Datadog Agent in order to receive metrics from StatsD and transport them to 
> Datadog. We don't like this approach.
> We prefer to have a Datadog metrics reporter directly contacting Datadog http 
> endpoint.
> I'll take this ticket myself.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984447#comment-15984447
 ] 

ASF GitHub Bot commented on FLINK-6013:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r113405541
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, Scheduled {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+
+   // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+   private final Map gauges = new ConcurrentHashMap<>();
+   private final Map counters = new 
ConcurrentHashMap<>();
+   private final Map meters = new ConcurrentHashMap<>();
+
+   private DatadogHttpClient client;
+   private List configTags;
+
+   public static final String API_KEY = "apikey";
--- End diff --

You could define these as a `ConfigOption` which would make it more obvious 
what the default value is.


> Add Datadog HTTP metrics reporter
> -
>
> Key: FLINK-6013
> URL: https://issues.apache.org/jira/browse/FLINK-6013
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.0
>
>
> We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a 
> lot other companies also do.
> Flink right now only has a StatsD metrics reporter, and users have to set up 
> Datadog Agent in order to receive metrics from StatsD and transport them to 
> Datadog. We don't like this approach.
> We prefer to have a Datadog metrics reporter directly contacting Datadog http 
> endpoint.
> I'll take this ticket myself.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r113405034
  
--- Diff: flink-metrics/flink-metrics-datadog/pom.xml ---
@@ -0,0 +1,109 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   1.3-SNAPSHOT
+   ..
+   
+
+   flink-metrics-datadog
+   flink-metrics-datadog
+
+   
+   
+   org.apache.flink
+   flink-metrics-core
+   ${project.version}
+   provided
+   
+
+   
+   com.fasterxml.jackson.core
+   jackson-databind
+   provided
+   
+
+   
+   com.squareup.okhttp3
+   okhttp
+   3.6.0
+   
+
+   
+   com.squareup.okio
+   okio
+   1.11.0
+   
+
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-test-utils-junit
+   ${project.version}
+   test
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   package
+   
+   shade
+   
+   
+   
jar-with-dependencies
--- End diff --

I would remove this and adjust the `opt.xml` again; this makes it more 
consistent with other shaded jars that we produce.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3777: [FLINK-6387] [webfrontend]Flink UI support access ...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3777#discussion_r113406843
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
 ---
@@ -183,4 +194,28 @@ else if (currentDecoder != null && msg instanceof 
HttpContent) {
}
}
}
+
+  /**
+   * Record the access log if enable configure of
+   * {@link 
org.apache.flink.configuration.ConfigConstants#JOB_MANAGER_WEB_ACCESSLOG_ENABLE}.
+   * record format:
+   * remote_addr - [time_local] "request_method URI protocolVersion" 
"http_referer" "http_user_agent"
+   */
+   private void accesslog(ChannelHandlerContext ctx, HttpRequest req) {
+   HttpHeaders headers = req.headers();
+   if (headers != null) {
+   String line = ctx.channel().remoteAddress() + " - [" + 
new Date() + "] \""
--- End diff --

Please merge this string generation into the log statement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3777: [FLINK-6387] [webfrontend]Flink UI support access ...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3777#discussion_r113406502
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
 ---
@@ -183,4 +194,28 @@ else if (currentDecoder != null && msg instanceof 
HttpContent) {
}
}
}
+
+  /**
+   * Record the access log if enable configure of
+   * {@link 
org.apache.flink.configuration.ConfigConstants#JOB_MANAGER_WEB_ACCESSLOG_ENABLE}.
+   * record format:
+   * remote_addr - [time_local] "request_method URI protocolVersion" 
"http_referer" "http_user_agent"
+   */
+   private void accesslog(ChannelHandlerContext ctx, HttpRequest req) {
--- End diff --

this should be renamed to `logAccess`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3777: [FLINK-6387] [webfrontend]Flink UI support access ...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3777#discussion_r113406464
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -645,6 +645,10 @@
@Deprecated
public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = 
"jobmanager.web.log.path";
 
+   /** Config parameter indicating whether jobs can be uploaded and run 
from the web-frontend. */
--- End diff --

This doc isn't in sync with what the implementation actually does. It 
doesn't restrict anything but only logs all requests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3777: [FLINK-6387] [webfrontend]Flink UI support access ...

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3777#discussion_r113406982
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -645,6 +645,10 @@
@Deprecated
public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = 
"jobmanager.web.log.path";
 
+   /** Config parameter indicating whether jobs can be uploaded and run 
from the web-frontend. */
+   public static final ConfigOption 
JOB_MANAGER_WEB_ACCESSLOG_ENABLE =
+   
key("jobmanager.web.accesslog.enable").defaultValue(false);
--- End diff --

please move the `.defaultValue` call into a new line. It should also be 
moved to the JobManagerOptions class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113411053
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.types.Row;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class JDBCTableSinkTest {
+   private static final String[] FIELD_NAMES = new String[] {"foo"};
+   private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[] {
--- End diff --

remove space after `[]`. move `STRING_TYPE_INFO` to this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113412806
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class JDBCTableSink extends GenericWriteAheadSink implements 
StreamTableSink {
+   private final JDBCOutputFormat outputFormat;
+   private final CheckpointCommitter committer;
+   private final String[] fieldNames;
+   private final TypeInformation[] fieldTypes;
+
+   public JDBCTableSink(CheckpointCommitter committer, TypeSerializer 
serializer,
--- End diff --

I would propose either adding a JDBCCheckpointCommitter that cooperates 
with the sink (as seen in this 
[prototype](https://github.com/zentol/flink/commit/92e878b59a7371ac9cad402d0b009c7439cd1900)
 or omitting the `CheckpointCommitter` argument and providing a dummy to the 
`GenericWriteAheadSink`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113410993
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.types.Row;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class JDBCTableSinkTest {
+   private static final String[] FIELD_NAMES = new String[] {"foo"};
--- End diff --

remove space after `[]`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113410894
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class JDBCTableSink extends GenericWriteAheadSink implements 
StreamTableSink {
+   private final JDBCOutputFormat outputFormat;
+   private final CheckpointCommitter committer;
+   private final String[] fieldNames;
+   private final TypeInformation[] fieldTypes;
+
+   public JDBCTableSink(CheckpointCommitter committer, TypeSerializer 
serializer,
+   JDBCOutputFormat outputFormat, String[] 
fieldNames,
+   TypeInformation[] fieldTypes) throws Exception {
--- End diff --

like the cassandra sink the `fieldNames/Types` should be removed to provide 
a clean API to the user.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.

2017-04-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3712#discussion_r113412006
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class JDBCTableSink extends GenericWriteAheadSink implements 
StreamTableSink {
+   private final JDBCOutputFormat outputFormat;
+   private final CheckpointCommitter committer;
+   private final String[] fieldNames;
+   private final TypeInformation[] fieldTypes;
+
+   public JDBCTableSink(CheckpointCommitter committer, TypeSerializer 
serializer,
+   JDBCOutputFormat outputFormat, String[] 
fieldNames,
+   TypeInformation[] fieldTypes) throws Exception {
+   super(committer, serializer, 
UUID.randomUUID().toString().replace("-", "_"));
+   this.outputFormat = outputFormat;
+   this.committer = committer;
+   this.fieldNames = fieldNames;
+   this.fieldTypes = fieldTypes;
+   }
+
+   @Override
+   public void emitDataStream(DataStream dataStream) {
+   dataStream.transform("JDBC Sink", getOutputType(), this);
+   }
+
+   @Override
+   public TypeInformation getOutputType() {
+   return new RowTypeInfo(fieldTypes, fieldNames);
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   try {
+   return new JDBCTableSink(committer, serializer, 
outputFormat, fieldNames, fieldTypes);
+   } catch (Exception e) {
+   LOG.warn("Failed to create a copy of the sink.", e);
+   return null;
+   }
+   }
+
+   @Override
+   protected boolean sendValues(Iterable value, long timestamp) 
throws Exception {
+   for (Row r : value) {
+   try {
+   outputFormat.writeRecord(r);
--- End diff --

This doesn't guarantee in any way  that the values are actually being sent; 
you need some kind of flushing functionality for this to work properly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3780: [FLINK-6274] Replaces usages of org.codehaus.jacks...

2017-04-26 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3780

[FLINK-6274] Replaces usages of org.codehaus.jackson

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 6274_replace_codehaus

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3780.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3780


commit 84ca3a6297e50609211efef5ac22d1bae58a28c1
Author: zentol 
Date:   2017-04-06T12:33:47Z

[FLINK-6274] Replaces usages of org.codehaus.jackson




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4973) Flakey Yarn tests due to recently added latency marker

2017-04-26 Thread Andrey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984659#comment-15984659
 ] 

Andrey commented on FLINK-4973:
---

Here is more logs. Job was executed for some time and then was cancelled from 
the cli:
{code}
2017-04-26 11:26:44,209 INFO  org.apache.flink.runtime.taskmanager.Task 
- Attempting to cancel task Source: Custom Source (2/2) 
(62b7b08d204abbb64f03325032b6d7b5). [flink-akka.actor.default-dispatcher-14]
2017-04-26 11:26:44,209 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source (2/2) (62b7b08d204abbb64f03325032b6d7b5) 
switched from RUNNING to CANCELING. [flink-akka.actor.default-dispatcher-14]
2017-04-26 11:26:44,213 INFO  org.apache.flink.runtime.taskmanager.Task 
- Triggering cancellation of task code Source: Custom Source (2/2) 
(62b7b08d204abbb64f03325032b6d7b5). [flink-akka.actor.default-dispatcher-14]
2017-04-26 11:26:44,216 INFO  org.apache.flink.runtime.taskmanager.Task 
- Attempting to cancel task Map (2/2) 
(32731388b322028553940b6c65216398). [flink-akka.actor.default-dispatcher-14]
2017-04-26 11:26:44,216 INFO  org.apache.flink.runtime.taskmanager.Task 
- Map (2/2) (32731388b322028553940b6c65216398) switched from 
RUNNING to CANCELING. [flink-akka.actor.default-dispatcher-14]
2017-04-26 11:26:44,218 INFO  org.apache.flink.runtime.taskmanager.Task 
- Triggering cancellation of task code Map (2/2) 
(32731388b322028553940b6c65216398). [flink-akka.actor.default-dispatcher-14]
2017-04-26 11:26:44,222 WARN  
org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Error while 
emitting latency marker. [Time Trigger for Source: Custom Source (2/2)]
java.lang.RuntimeException: Buffer pool is destroyed.
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:99)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:791)
at 
org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:142)
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:256)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:149)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:138)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:106)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:104)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:96)
... 10 more
2017-04-26 11:26:44,320 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source (2/2) (62b7b08d204abbb64f03325032b6d7b5) 
switched from CANCELING to CANCELED. [Source: Custom Source (2/2)]
2017-04-26 11:26:44,320 INFO  org.apache.flink.runtime.taskmanager.Task 
- Map (2/2) (32731388b322028553940b6c65216398) switched from 
CANCELING to CANCELED. [Map (2/2)]
{code}

> Flakey Yarn tests due to recently added latency marker
> --
>
> Key: FLINK-4973
> URL: https://issues.apache.org/jira/browse/FLINK-4973
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.2.0
>
>
> The newly introduced {{LatencyMarksEmitter}} emits latency marker on the 
> {{Output}}. This can still happen after the underlying {{BufferPool}} has 
> been destroyed. The occurring exception is then logged:
> {code}
> 

[GitHub] flink issue #3769: [FLINK-6367] support custom header settings of allow orig...

2017-04-26 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3769
  
Looks good, +1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-6093) Implement and turn on retraction for table sink

2017-04-26 Thread Hequn Cheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng reassigned FLINK-6093:
--

Assignee: Hequn Cheng  (was: Shaoxuan Wang)

> Implement and turn on retraction for table sink 
> 
>
> Key: FLINK-6093
> URL: https://issues.apache.org/jira/browse/FLINK-6093
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Add sink tableInsert and NeedRetract property, and consider table sink in 
> optimizer RetractionRule



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6248) Make the optional() available to all offered patterns.

2017-04-26 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-6248.
-
Resolution: Fixed

Merged at b3de48d0e2c1fec6ccab3a8dd0f02b711dc0ae0a

> Make the optional() available to all offered patterns.
> --
>
> Key: FLINK-6248
> URL: https://issues.apache.org/jira/browse/FLINK-6248
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> Currently the {{optional()}} quantifier is available as a separate pattern. 
> This issue proposes to make it available as a flag to all patterns.
> This implies that: 
> 1) a singleton pattern with {{optional=true}} will become the current 
> {{OPTIONAL}}, 
> 2) a {{oneToMany}} will become {{zeroToMany}}, 
> 3) the {{zeroToMany}} will not exist as a direct option in the {{Pattern}} 
> class, and 
> 4) the {{times()}} will require some changes in the {{NFACompiler}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6356) Make times() eager and enable allowing combinations.

2017-04-26 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-6356.
-
Resolution: Fixed

Merged at 0d856b34f90c0775b7c9d5e88f5ed6cdf6090851

> Make times() eager and enable allowing combinations.
> 
>
> Key: FLINK-6356
> URL: https://issues.apache.org/jira/browse/FLINK-6356
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> This is the PR that addresses it https://github.com/apache/flink/pull/3761



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984700#comment-15984700
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113437414
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumReader = new ReflectDatumReader<>(schema);
+   this.record = new GenericData.Record(schema);
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   // read record
+   try {
+   inputStream.setBuffer(message);
+   this.record = datumReader.read(record, decoder);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
+   }
+
+   // convert to row
+   final Object row = convertToRow(schema, record);
+   return (Row) row;
+   }
+
+   /**
+* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
+* Avro's {@link Utf8} fields are converted into regular Java strings.
+*/
+   private static Object convertToRow(Schema schema, Object recordObj) {
+   if (recordObj instanceof GenericRecord) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = 

[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984544#comment-15984544
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113416949
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+   schema = types.get(1);
+   }
+   else {
+  

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113416754
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
--- End diff --

Ah, OK. I see. Then let's keep it :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with 

[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...

2017-04-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113416949
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+   schema = types.get(1);
+   }
+   else {
+   throw new RuntimeException("Currently 
we only support schemas of the following form: UNION[null, RECORD]. Given: " + 
schema);
+   }
+   } else if 

[jira] [Commented] (FLINK-6382) Support all numeric types for generated graphs in Gelly examples

2017-04-26 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984626#comment-15984626
 ] 

Greg Hogan commented on FLINK-6382:
---

Hi [~heytitle], it would be great if you could review the PR. I will use this 
PR to test FLINK-3722 in a local branch so NormalizedKeySorter changes can be 
merged and FLINK-5734 rebased.

> Support all numeric types for generated graphs in Gelly examples
> 
>
> Key: FLINK-6382
> URL: https://issues.apache.org/jira/browse/FLINK-6382
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.3.0
>
>
> The Gelly examples current support {{IntValue}}, {{LongValue}}, and 
> {{StringValue}} for {{RMatGraph}}. Allow transformations and tests for all 
> generated graphs for {{ByteValue}}, {{Byte}}, {{ShortValue}}, {{Short}}, 
> {{CharValue}}, {{Character}}, {{Integer}}, {{Long}}, and {{String}}.
> This is additionally of interest for benchmarking and testing modifications 
> to Flink's internal sort.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984638#comment-15984638
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3733
  
Hi @hequn8128, I merged the PR to `table-retraction`. Can you close it?

Thanks, Fabian


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6093) Implement and turn on retraction for table sink

2017-04-26 Thread Hequn Cheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng updated FLINK-6093:
---
Description: Add needsUpdatesAsRetraction property to TableSink, and take 
TableSink into consideration during RetractionRule  (was: Add sink tableInsert 
and NeedRetract property, and consider table sink in optimizer RetractionRule)

> Implement and turn on retraction for table sink 
> 
>
> Key: FLINK-6093
> URL: https://issues.apache.org/jira/browse/FLINK-6093
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Add needsUpdatesAsRetraction property to TableSink, and take TableSink into 
> consideration during RetractionRule



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984722#comment-15984722
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113439992
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the byte[] messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   /**
+* Schema for deterministic field order.
+*/
+   private final Schema schema;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private final DatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private final MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data
+*/
+   private final Decoder decoder;
+
+   /**
+* Record to deserialize byte array to.
+*/
+   private GenericRecord record;
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumReader = new ReflectDatumReader<>(schema);
+   this.record = new GenericData.Record(schema);
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   // read record
+   try {
+   inputStream.setBuffer(message);
+   this.record = datumReader.read(record, decoder);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
+   }
+
+   // convert to row
+   final Object row = convertToRow(schema, record);
+   return (Row) row;
+   }
+
+   /**
+* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
+* Avro's {@link Utf8} fields are converted into regular Java strings.
+*/
+   private static Object convertToRow(Schema schema, Object recordObj) {
+   if (recordObj instanceof GenericRecord) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = 

[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984542#comment-15984542
 ] 

ASF GitHub Bot commented on FLINK-3871:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3663#discussion_r113416754
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements 
SerializationSchema {
+
+   /**
+* Avro serialization schema.
+*/
+   private final Schema schema;
+
+   /**
+* Writer to serialize Avro record into a byte array.
+*/
+   private final DatumWriter datumWriter;
+
+   /**
+* Output stream to serialize records into byte array.
+*/
+   private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+   /**
+* Low-level class for serialization of Avro values.
+*/
+   private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+   /**
+* Creates a Avro serialization schema for the given schema.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+*/
+   @SuppressWarnings("unchecked")
+   public AvroRowSerializationSchema(Class 
recordClazz) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.schema = SpecificData.get().getSchema(recordClazz);
+   this.datumWriter = new ReflectDatumWriter<>(schema);
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public byte[] serialize(Row row) {
+   // convert to record
+   final Object record = convertToRecord(schema, row);
+
+   // write
+   try {
+   arrayOutputStream.reset();
+   datumWriter.write((GenericRecord) record, encoder);
+   encoder.flush();
+   return arrayOutputStream.toByteArray();
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to serialize Row.", 
e);
+   }
+   }
+
+   /**
+* Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+* Strings are converted into Avro's {@link Utf8} fields.
+*/
+   private static Object convertToRecord(Schema schema, Object rowObj) {
+   if (rowObj instanceof Row) {
+   // records can be wrapped in a union
+   if (schema.getType() == Schema.Type.UNION) {
+   final List types = schema.getTypes();
+   if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
--- End diff --

Ah, OK. I see. Then let's keep it :-)


> Add Kafka TableSource with Avro serialization
> 

[GitHub] flink pull request #3779: [FLINK-6382] [gelly] Support all numeric types for...

2017-04-26 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/3779

[FLINK-6382] [gelly] Support all numeric types for generated graphs in 
Gelly examples

The Gelly examples current support IntValue, LongValue, and StringValue for 
RMatGraph. Allow transformations and tests for all generated graphs for 
ByteValue, Byte, ShortValue, Short, CharValue, Character, Integer, Long, and 
String.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
6382_support_all_numeric_types_for_generated_graphs_in_gelly_examples

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3779.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3779


commit c1ef02c4df46b127788cee63d080b588b731e696
Author: Greg Hogan 
Date:   2017-04-25T15:36:08Z

[FLINK-6382] [gelly] Support all numeric types for generated graphs in 
Gelly examples

The Gelly examples current support IntValue, LongValue, and StringValue
for RMatGraph. Allow transformations and tests for all generated graphs
for ByteValue, Byte, ShortValue, Short, CharValue, Character, Integer,
Long, and String.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   >