[jira] [Commented] (FLINK-6386) Missing bracket in 'Compiler Limitation' section
[ https://issues.apache.org/jira/browse/FLINK-6386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984281#comment-15984281 ] ASF GitHub Bot commented on FLINK-6386: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3775#discussion_r113378988 --- Diff: docs/dev/java8.md --- @@ -104,7 +104,7 @@ input.filter(line -> !line.contains("not")) Currently, Flink only supports jobs containing Lambda Expressions completely if they are **compiled with the Eclipse JDT compiler contained in Eclipse Luna 4.4.2 (and above)**. Only the Eclipse JDT compiler preserves the generic type information necessary to use the entire Lambda Expressions feature type-safely. -Other compilers such as the OpenJDK's and Oracle JDK's `javac` throw away all generic parameters related to Lambda Expressions. This means that types such as `Tuple2` or `Collector` declared as a Lambda function input or output parameter will be pruned to `Tuple2` or `Collector` in the compiled `.class` files, which is too little information for the Flink Compiler. --- End diff -- could you also add a space after the comma `Tuple2<...>` and lower case "Compiler" at the very end? > Missing bracket in 'Compiler Limitation' section > > > Key: FLINK-6386 > URL: https://issues.apache.org/jira/browse/FLINK-6386 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.2.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Trivial > Fix For: 1.2.2 > > > "This means that types such as `Tuple2 declared as..." > should be > "This means that types such as `Tuple2 ` or > `Collector` declared as..." -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6387) Flink UI support access log
shijinkui created FLINK-6387: Summary: Flink UI support access log Key: FLINK-6387 URL: https://issues.apache.org/jira/browse/FLINK-6387 Project: Flink Issue Type: Improvement Components: Webfrontend Reporter: shijinkui Assignee: shijinkui Record the use request to the access log. Append use access to the log file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6356) Make times() eager and enable allowing combinations.
[ https://issues.apache.org/jira/browse/FLINK-6356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984278#comment-15984278 ] ASF GitHub Bot commented on FLINK-6356: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3761#discussion_r113377068 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java --- @@ -18,51 +18,83 @@ package org.apache.flink.cep.pattern; import java.util.EnumSet; +import java.util.Objects; -public enum Quantifier { - ONE, - ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, QuantifierProperty.EAGER), - ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING), - ZERO_OR_MORE_EAGER_STRICT(QuantifierProperty.EAGER, QuantifierProperty.STRICT, QuantifierProperty.LOOPING), - ZERO_OR_MORE_COMBINATIONS_STRICT(QuantifierProperty.STRICT, QuantifierProperty.LOOPING), - ONE_OR_MORE_EAGER( - QuantifierProperty.LOOPING, - QuantifierProperty.EAGER, - QuantifierProperty.AT_LEAST_ONE), - ONE_OR_MORE_EAGER_STRICT( - QuantifierProperty.STRICT, - QuantifierProperty.LOOPING, - QuantifierProperty.EAGER, - QuantifierProperty.AT_LEAST_ONE), - ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, QuantifierProperty.AT_LEAST_ONE), - ONE_OR_MORE_COMBINATIONS_STRICT( - QuantifierProperty.STRICT, - QuantifierProperty.LOOPING, - QuantifierProperty.AT_LEAST_ONE), - TIMES(QuantifierProperty.TIMES), - TIMES_STRICT(QuantifierProperty.TIMES, QuantifierProperty.STRICT), - OPTIONAL; +public class Quantifier { private final EnumSet properties; - Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) { + private Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) { this.properties = EnumSet.of(first, rest); } - Quantifier() { - this.properties = EnumSet.noneOf(QuantifierProperty.class); + public static Quantifier ONE() { + return new Quantifier(QuantifierProperty.SINGLE); + } + + public static Quantifier ONE_OR_MORE() { + return new Quantifier(QuantifierProperty.LOOPING, QuantifierProperty.EAGER); + } + + public static Quantifier TIMES() { + return new Quantifier(QuantifierProperty.TIMES, QuantifierProperty.EAGER); } public boolean hasProperty(QuantifierProperty property) { return properties.contains(property); } + public void combinations() { --- End diff -- The first `if` will be satisfied by `ONE` resulting in misleading exception. Because of that also the `else` branch of the second branch will not be reachable. > Make times() eager and enable allowing combinations. > > > Key: FLINK-6356 > URL: https://issues.apache.org/jira/browse/FLINK-6356 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > This is the PR that addresses it https://github.com/apache/flink/pull/3761 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3761: [FLINK-6356] Make optional available to all patter...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3761#discussion_r113377068 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java --- @@ -18,51 +18,83 @@ package org.apache.flink.cep.pattern; import java.util.EnumSet; +import java.util.Objects; -public enum Quantifier { - ONE, - ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, QuantifierProperty.EAGER), - ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING), - ZERO_OR_MORE_EAGER_STRICT(QuantifierProperty.EAGER, QuantifierProperty.STRICT, QuantifierProperty.LOOPING), - ZERO_OR_MORE_COMBINATIONS_STRICT(QuantifierProperty.STRICT, QuantifierProperty.LOOPING), - ONE_OR_MORE_EAGER( - QuantifierProperty.LOOPING, - QuantifierProperty.EAGER, - QuantifierProperty.AT_LEAST_ONE), - ONE_OR_MORE_EAGER_STRICT( - QuantifierProperty.STRICT, - QuantifierProperty.LOOPING, - QuantifierProperty.EAGER, - QuantifierProperty.AT_LEAST_ONE), - ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, QuantifierProperty.AT_LEAST_ONE), - ONE_OR_MORE_COMBINATIONS_STRICT( - QuantifierProperty.STRICT, - QuantifierProperty.LOOPING, - QuantifierProperty.AT_LEAST_ONE), - TIMES(QuantifierProperty.TIMES), - TIMES_STRICT(QuantifierProperty.TIMES, QuantifierProperty.STRICT), - OPTIONAL; +public class Quantifier { private final EnumSet properties; - Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) { + private Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) { this.properties = EnumSet.of(first, rest); } - Quantifier() { - this.properties = EnumSet.noneOf(QuantifierProperty.class); + public static Quantifier ONE() { + return new Quantifier(QuantifierProperty.SINGLE); + } + + public static Quantifier ONE_OR_MORE() { + return new Quantifier(QuantifierProperty.LOOPING, QuantifierProperty.EAGER); + } + + public static Quantifier TIMES() { + return new Quantifier(QuantifierProperty.TIMES, QuantifierProperty.EAGER); } public boolean hasProperty(QuantifierProperty property) { return properties.contains(property); } + public void combinations() { --- End diff -- The first `if` will be satisfied by `ONE` resulting in misleading exception. Because of that also the `else` branch of the second branch will not be reachable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3761: [FLINK-6356] Make optional available to all patter...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3761#discussion_r113378455 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java --- @@ -162,130 +186,114 @@ public int getTimes() { } /** -* Appends a new pattern operator to the existing one. The new pattern operator enforces strict -* temporal contiguity. This means that the whole pattern only matches if an event which matches -* this operator directly follows the preceding matching event. Thus, there cannot be any -* events in between two matching events. +* Appends a new pattern to the existing one. The new pattern enforces strict +* temporal contiguity. This means that the whole pattern sequence matches only +* if an event which matches this pattern directly follows the preceding matching +* event. Thus, there cannot be any events in between two matching events. * -* @param name Name of the new pattern operator -* @return A new pattern operator which is appended to this pattern operator +* @param name Name of the new pattern +* @return A new pattern which is appended to this one */ public Patternnext(final String name) { return new Pattern (name, this); } /** -* Appends a new pattern operator to the existing one. The new pattern operator enforces -* non-strict temporal contiguity. This means that a matching event of this operator and the +* Appends a new pattern to the existing one. The new pattern enforces non-strict +* temporal contiguity. This means that a matching event of this pattern and the * preceding matching event might be interleaved with other events which are ignored. * -* @param name Name of the new pattern operator -* @return A new pattern operator which is appended to this pattern operator +* @param name Name of the new pattern +* @return A new pattern which is appended to this one */ public FollowedByPattern followedBy(final String name) { return new FollowedByPattern (name, this); } /** -* Starts a new pattern with the initial pattern operator whose name is provided. Furthermore, -* the base type of the event sequence is set. -* -* @param name Name of the new pattern operator -* @param Base type of the event pattern -* @return The first pattern operator of a pattern -*/ - public static Pattern begin(final String name) { - return new Pattern (name, null); - } - - /** -* Specifies that this pattern can occur zero or more times(kleene star). -* This means any number of events can be matched in this state. +* Specifies that this pattern is optional for a final match of the pattern +* sequence to happen. * -* @return The same pattern with applied Kleene star operator -* -* @throws MalformedPatternException if quantifier already applied +* @return The same pattern as optional. +* @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ - public Pattern zeroOrMore() { - return zeroOrMore(true); + public Pattern optional() { + quantifier.optional(); + return this; } /** -* Specifies that this pattern can occur zero or more times(kleene star). -* This means any number of events can be matched in this state. -* -* If eagerness is enabled for a pattern A*B and sequence A1 A2 B will generate patterns: -* B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B. +* Specifies that this pattern can occur {@code one or more} times. +* This means at least one and at most infinite number of events can +* be matched to this pattern. * -* @param eager if true the pattern always consumes earlier events -* @return The same pattern with applied Kleene star operator +* If this quantifier is enabled for a +* pattern {@code A.oneOrMore().followedBy(B)} and a sequence of events +* {@code A1 A2 B} appears, this will generate patterns: +* {@code A1 B} and {@code A1 A2 B}. See also {@link #allowCombinations()}. * -* @throws MalformedPatternException if quantifier already applied +* @return The same pattern with a {@link Quantifier#ONE_OR_MORE()} quantifier applied. +* @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ - public Pattern zeroOrMore(final boolean eager) { +
[GitHub] flink pull request #3775: [FLINK-6386] Missing bracket in 'Compiler Limitati...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3775#discussion_r113378988 --- Diff: docs/dev/java8.md --- @@ -104,7 +104,7 @@ input.filter(line -> !line.contains("not")) Currently, Flink only supports jobs containing Lambda Expressions completely if they are **compiled with the Eclipse JDT compiler contained in Eclipse Luna 4.4.2 (and above)**. Only the Eclipse JDT compiler preserves the generic type information necessary to use the entire Lambda Expressions feature type-safely. -Other compilers such as the OpenJDK's and Oracle JDK's `javac` throw away all generic parameters related to Lambda Expressions. This means that types such as `Tuple2` or `Collector` declared as a Lambda function input or output parameter will be pruned to `Tuple2` or `Collector` in the compiled `.class` files, which is too little information for the Flink Compiler. --- End diff -- could you also add a space after the comma `Tuple2<...>` and lower case "Compiler" at the very end? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3761: [FLINK-6356] Make optional available to all patter...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3761#discussion_r113377152 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java --- @@ -18,51 +18,83 @@ package org.apache.flink.cep.pattern; import java.util.EnumSet; +import java.util.Objects; -public enum Quantifier { - ONE, - ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, QuantifierProperty.EAGER), - ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING), - ZERO_OR_MORE_EAGER_STRICT(QuantifierProperty.EAGER, QuantifierProperty.STRICT, QuantifierProperty.LOOPING), - ZERO_OR_MORE_COMBINATIONS_STRICT(QuantifierProperty.STRICT, QuantifierProperty.LOOPING), - ONE_OR_MORE_EAGER( - QuantifierProperty.LOOPING, - QuantifierProperty.EAGER, - QuantifierProperty.AT_LEAST_ONE), - ONE_OR_MORE_EAGER_STRICT( - QuantifierProperty.STRICT, - QuantifierProperty.LOOPING, - QuantifierProperty.EAGER, - QuantifierProperty.AT_LEAST_ONE), - ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, QuantifierProperty.AT_LEAST_ONE), - ONE_OR_MORE_COMBINATIONS_STRICT( - QuantifierProperty.STRICT, - QuantifierProperty.LOOPING, - QuantifierProperty.AT_LEAST_ONE), - TIMES(QuantifierProperty.TIMES), - TIMES_STRICT(QuantifierProperty.TIMES, QuantifierProperty.STRICT), - OPTIONAL; +public class Quantifier { private final EnumSet properties; - Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) { + private Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) { this.properties = EnumSet.of(first, rest); } - Quantifier() { - this.properties = EnumSet.noneOf(QuantifierProperty.class); + public static Quantifier ONE() { + return new Quantifier(QuantifierProperty.SINGLE); + } + + public static Quantifier ONE_OR_MORE() { + return new Quantifier(QuantifierProperty.LOOPING, QuantifierProperty.EAGER); + } + + public static Quantifier TIMES() { + return new Quantifier(QuantifierProperty.TIMES, QuantifierProperty.EAGER); } public boolean hasProperty(QuantifierProperty property) { return properties.contains(property); } + public void combinations() { + if (!hasProperty(Quantifier.QuantifierProperty.EAGER)) { + throw new MalformedPatternException("Combinations already allowed!"); + } + + if (hasProperty(Quantifier.QuantifierProperty.LOOPING) || hasProperty(Quantifier.QuantifierProperty.TIMES)) { + properties.remove(Quantifier.QuantifierProperty.EAGER); + } else { + throw new MalformedPatternException("Combinations not applicable to " + this + "!"); + } + } + + public void consecutive() { + if (hasProperty(Quantifier.QuantifierProperty.CONSECUTIVE)) { --- End diff -- Same here for the `ONE` quantifier. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6356) Make times() eager and enable allowing combinations.
[ https://issues.apache.org/jira/browse/FLINK-6356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984279#comment-15984279 ] ASF GitHub Bot commented on FLINK-6356: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3761#discussion_r113378455 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java --- @@ -162,130 +186,114 @@ public int getTimes() { } /** -* Appends a new pattern operator to the existing one. The new pattern operator enforces strict -* temporal contiguity. This means that the whole pattern only matches if an event which matches -* this operator directly follows the preceding matching event. Thus, there cannot be any -* events in between two matching events. +* Appends a new pattern to the existing one. The new pattern enforces strict +* temporal contiguity. This means that the whole pattern sequence matches only +* if an event which matches this pattern directly follows the preceding matching +* event. Thus, there cannot be any events in between two matching events. * -* @param name Name of the new pattern operator -* @return A new pattern operator which is appended to this pattern operator +* @param name Name of the new pattern +* @return A new pattern which is appended to this one */ public Patternnext(final String name) { return new Pattern (name, this); } /** -* Appends a new pattern operator to the existing one. The new pattern operator enforces -* non-strict temporal contiguity. This means that a matching event of this operator and the +* Appends a new pattern to the existing one. The new pattern enforces non-strict +* temporal contiguity. This means that a matching event of this pattern and the * preceding matching event might be interleaved with other events which are ignored. * -* @param name Name of the new pattern operator -* @return A new pattern operator which is appended to this pattern operator +* @param name Name of the new pattern +* @return A new pattern which is appended to this one */ public FollowedByPattern followedBy(final String name) { return new FollowedByPattern (name, this); } /** -* Starts a new pattern with the initial pattern operator whose name is provided. Furthermore, -* the base type of the event sequence is set. -* -* @param name Name of the new pattern operator -* @param Base type of the event pattern -* @return The first pattern operator of a pattern -*/ - public static Pattern begin(final String name) { - return new Pattern (name, null); - } - - /** -* Specifies that this pattern can occur zero or more times(kleene star). -* This means any number of events can be matched in this state. +* Specifies that this pattern is optional for a final match of the pattern +* sequence to happen. * -* @return The same pattern with applied Kleene star operator -* -* @throws MalformedPatternException if quantifier already applied +* @return The same pattern as optional. +* @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ - public Pattern zeroOrMore() { - return zeroOrMore(true); + public Pattern optional() { + quantifier.optional(); + return this; } /** -* Specifies that this pattern can occur zero or more times(kleene star). -* This means any number of events can be matched in this state. -* -* If eagerness is enabled for a pattern A*B and sequence A1 A2 B will generate patterns: -* B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B. +* Specifies that this pattern can occur {@code one or more} times. +* This means at least one and at most infinite number of events can +* be matched to this pattern. * -* @param eager if true the pattern always consumes earlier events -* @return The same pattern with applied Kleene star operator +* If this quantifier is enabled for a +* pattern {@code A.oneOrMore().followedBy(B)} and a sequence of events +* {@code A1 A2 B} appears, this will generate patterns: +* {@code A1 B} and {@code A1 A2 B}. See also {@link #allowCombinations()}. * -* @throws MalformedPatternException if quantifier already applied +* @return The same pattern with a
[jira] [Commented] (FLINK-6356) Make times() eager and enable allowing combinations.
[ https://issues.apache.org/jira/browse/FLINK-6356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984280#comment-15984280 ] ASF GitHub Bot commented on FLINK-6356: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3761#discussion_r113377152 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java --- @@ -18,51 +18,83 @@ package org.apache.flink.cep.pattern; import java.util.EnumSet; +import java.util.Objects; -public enum Quantifier { - ONE, - ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, QuantifierProperty.EAGER), - ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING), - ZERO_OR_MORE_EAGER_STRICT(QuantifierProperty.EAGER, QuantifierProperty.STRICT, QuantifierProperty.LOOPING), - ZERO_OR_MORE_COMBINATIONS_STRICT(QuantifierProperty.STRICT, QuantifierProperty.LOOPING), - ONE_OR_MORE_EAGER( - QuantifierProperty.LOOPING, - QuantifierProperty.EAGER, - QuantifierProperty.AT_LEAST_ONE), - ONE_OR_MORE_EAGER_STRICT( - QuantifierProperty.STRICT, - QuantifierProperty.LOOPING, - QuantifierProperty.EAGER, - QuantifierProperty.AT_LEAST_ONE), - ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, QuantifierProperty.AT_LEAST_ONE), - ONE_OR_MORE_COMBINATIONS_STRICT( - QuantifierProperty.STRICT, - QuantifierProperty.LOOPING, - QuantifierProperty.AT_LEAST_ONE), - TIMES(QuantifierProperty.TIMES), - TIMES_STRICT(QuantifierProperty.TIMES, QuantifierProperty.STRICT), - OPTIONAL; +public class Quantifier { private final EnumSet properties; - Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) { + private Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) { this.properties = EnumSet.of(first, rest); } - Quantifier() { - this.properties = EnumSet.noneOf(QuantifierProperty.class); + public static Quantifier ONE() { + return new Quantifier(QuantifierProperty.SINGLE); + } + + public static Quantifier ONE_OR_MORE() { + return new Quantifier(QuantifierProperty.LOOPING, QuantifierProperty.EAGER); + } + + public static Quantifier TIMES() { + return new Quantifier(QuantifierProperty.TIMES, QuantifierProperty.EAGER); } public boolean hasProperty(QuantifierProperty property) { return properties.contains(property); } + public void combinations() { + if (!hasProperty(Quantifier.QuantifierProperty.EAGER)) { + throw new MalformedPatternException("Combinations already allowed!"); + } + + if (hasProperty(Quantifier.QuantifierProperty.LOOPING) || hasProperty(Quantifier.QuantifierProperty.TIMES)) { + properties.remove(Quantifier.QuantifierProperty.EAGER); + } else { + throw new MalformedPatternException("Combinations not applicable to " + this + "!"); + } + } + + public void consecutive() { + if (hasProperty(Quantifier.QuantifierProperty.CONSECUTIVE)) { --- End diff -- Same here for the `ONE` quantifier. > Make times() eager and enable allowing combinations. > > > Key: FLINK-6356 > URL: https://issues.apache.org/jira/browse/FLINK-6356 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > This is the PR that addresses it https://github.com/apache/flink/pull/3761 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3692: FLINK-5974 Added configurations to support mesos-dns host...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3692 Sounds good. I can do a final pass and merge this later this week... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3248: [FLINK-5695] [Table API & SQL] Optimize table type system...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3248 Hi @sunjincheng121, can you close this PR as well? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5695) Optimize table type systems based on database semantics
[ https://issues.apache.org/jira/browse/FLINK-5695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984516#comment-15984516 ] ASF GitHub Bot commented on FLINK-5695: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3248 Hi @sunjincheng121, can you close this PR as well? Thanks! > Optimize table type systems based on database semantics > --- > > Key: FLINK-5695 > URL: https://issues.apache.org/jira/browse/FLINK-5695 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Optimize table type systems based on database semantics.As follows: > {code} > groupBy > > > Table GroupedTable > ∧ < ∧ >| select| >| | >| where| >| select| groupBy >| agg | >| ... | >| window | > ∨ -> > TableWindowedTable > <- > select > {code} > What do you think? [~fhueske] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984308#comment-15984308 ] ASF GitHub Bot commented on FLINK-6091: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/3733 hi @fhueske , thanks a lot for your review and help. I have addressed all your comments and updated the PR. All changes have been checked before my latest update. Thanks, Hequn > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3732: [FLINK-6250] Distinct procTime with Rows boundaries
Github user stefanobortoli commented on the issue: https://github.com/apache/flink/pull/3732 @fhueske @sunjincheng121 @shijinkui @hongyuhong I have created a PR with the latest master with the code generated distinct, #3771 please have a look. If we it is fine, we can basically support distinct for all the window types --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries
[ https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984392#comment-15984392 ] ASF GitHub Bot commented on FLINK-6250: --- Github user stefanobortoli commented on the issue: https://github.com/apache/flink/pull/3732 @fhueske @sunjincheng121 @shijinkui @hongyuhong I have created a PR with the latest master with the code generated distinct, #3771 please have a look. If we it is fine, we can basically support distinct for all the window types > Distinct procTime with Rows boundaries > -- > > Key: FLINK-6250 > URL: https://issues.apache.org/jira/browse/FLINK-6250 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: radu >Assignee: Stefano Bortoli > > Support proctime with rows boundaries > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS > BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5969) Add savepoint backwards compatibility tests from 1.2 to 1.3
[ https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984505#comment-15984505 ] ASF GitHub Bot commented on FLINK-5969: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3778 [FLINK-5969] Add savepoint backwards compatibility tests from 1.2 to 1.3 The binary savepoints and snapshots in the tests were created on the commit of the Flink 1.2.0 release, so we test backwards compatibility within the Flink 1.2.x line. Once this is approved I'll open another PR that transplants these commits on the master branch (with the binary snapshots/savepoints done on Flink 1.2.0) so that we test migration compatibility between 1.2.0 and what is going to be Flink 1.3.x. I changed the naming of some existing tests so we now have `*From11MigrationTest` and `*From12MigrationTest` (and one ITCase). Immediately after releasing Flink 1.3.0 we should do the same, i.e. introduce `*From13MigrationTest` and ITCase based on the existing tests. The unit tests are somewhat straightforward: we feed some data into an operator using an operator test harness, then we do a snapshot. (This is the part that has to be done on the "old" version to generate the binary snapshot that goes into the repo). The actual tests restore an operator form that snapshot and verify the output. The ITCase is a bit more involved. We have a complete Job of user-functions and custom operators that tries to cover as many state/timer combinations as possible. We start the job and, using accumulators, observe the number of received elements in the sink. Once we get all elements we perform a savepoint and cancel the job. Thus we have all state caused by the elements reflected in our savepoint. This has to be done on the "old" version and the savepoint goes into the repo. The restoring job is instrumented with code that verifies restored state and updates accumulators. We listen on the accumulator changes and cancel the job once we have seen all required verifications. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-5969-backwards-compat-12-13-on-release12 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3778.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3778 commit ef9e73a1f8af8903b0689eada2a9d853034fab88 Author: Aljoscha KrettekDate: 2017-04-20T12:48:22Z [FLINK-5969] Augment SavepointMigrationTestBase to catch failed jobs commit 47143ba424355b7d25e9990bc308ea1744a0f33e Author: Aljoscha Krettek Date: 2017-04-20T15:09:00Z [FLINK-5969] Add savepoint IT case that checks restore from 1.2 The binary savepoints in this were created on the Flink 1.2.0 release commit. commit 3803dc04caae5e57f2cb23df0b6bc4663f8af08e Author: Aljoscha Krettek Date: 2017-04-21T09:43:53Z [FLINK-6353] Fix legacy user-state restore from 1.2 State that was checkpointed using Checkpointed (on a user function) could be restored using CheckpointedRestoring when the savepoint was done on Flink 1.2. The reason was an overzealous check in AbstractUdfStreamOperator that only restores from "legacy" operator state using CheckpointedRestoring when the stream is a Migration stream. This removes that check but we still need to make sure to read away the byte that indicates whether there is legacy state, which is written when we're restoring from a Flink 1.1 savepoint. After this fix, the procedure for a user to migrate a user function away from the Checkpointed interface is this: - Perform savepoint with user function still implementing Checkpointed, shutdown job - Change user function to implement CheckpointedRestoring - Restore from previous savepoint, user function has to somehow move the state that is restored using CheckpointedRestoring to another type of state, .e.g operator state, using the OperatorStateStore. - Perform another savepoint, shutdown job - Remove CheckpointedRestoring interface from user function - Restore from the second savepoint - Done. If the CheckpointedRestoring interface is not removed as prescribed in the last steps then a future restore of a new savepoint will fail because Flink will try to read legacy operator state that is not there anymore. The above steps also apply to Flink 1.3, when a user want's to move away from the Checkpointed interface. commit f08661adcf3a64daf955ace70683ef2fe14cec2c Author: Aljoscha Krettek Date:
[GitHub] flink pull request #3778: [FLINK-5969] Add savepoint backwards compatibility...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3778 [FLINK-5969] Add savepoint backwards compatibility tests from 1.2 to 1.3 The binary savepoints and snapshots in the tests were created on the commit of the Flink 1.2.0 release, so we test backwards compatibility within the Flink 1.2.x line. Once this is approved I'll open another PR that transplants these commits on the master branch (with the binary snapshots/savepoints done on Flink 1.2.0) so that we test migration compatibility between 1.2.0 and what is going to be Flink 1.3.x. I changed the naming of some existing tests so we now have `*From11MigrationTest` and `*From12MigrationTest` (and one ITCase). Immediately after releasing Flink 1.3.0 we should do the same, i.e. introduce `*From13MigrationTest` and ITCase based on the existing tests. The unit tests are somewhat straightforward: we feed some data into an operator using an operator test harness, then we do a snapshot. (This is the part that has to be done on the "old" version to generate the binary snapshot that goes into the repo). The actual tests restore an operator form that snapshot and verify the output. The ITCase is a bit more involved. We have a complete Job of user-functions and custom operators that tries to cover as many state/timer combinations as possible. We start the job and, using accumulators, observe the number of received elements in the sink. Once we get all elements we perform a savepoint and cancel the job. Thus we have all state caused by the elements reflected in our savepoint. This has to be done on the "old" version and the savepoint goes into the repo. The restoring job is instrumented with code that verifies restored state and updates accumulators. We listen on the accumulator changes and cancel the job once we have seen all required verifications. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-5969-backwards-compat-12-13-on-release12 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3778.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3778 commit ef9e73a1f8af8903b0689eada2a9d853034fab88 Author: Aljoscha KrettekDate: 2017-04-20T12:48:22Z [FLINK-5969] Augment SavepointMigrationTestBase to catch failed jobs commit 47143ba424355b7d25e9990bc308ea1744a0f33e Author: Aljoscha Krettek Date: 2017-04-20T15:09:00Z [FLINK-5969] Add savepoint IT case that checks restore from 1.2 The binary savepoints in this were created on the Flink 1.2.0 release commit. commit 3803dc04caae5e57f2cb23df0b6bc4663f8af08e Author: Aljoscha Krettek Date: 2017-04-21T09:43:53Z [FLINK-6353] Fix legacy user-state restore from 1.2 State that was checkpointed using Checkpointed (on a user function) could be restored using CheckpointedRestoring when the savepoint was done on Flink 1.2. The reason was an overzealous check in AbstractUdfStreamOperator that only restores from "legacy" operator state using CheckpointedRestoring when the stream is a Migration stream. This removes that check but we still need to make sure to read away the byte that indicates whether there is legacy state, which is written when we're restoring from a Flink 1.1 savepoint. After this fix, the procedure for a user to migrate a user function away from the Checkpointed interface is this: - Perform savepoint with user function still implementing Checkpointed, shutdown job - Change user function to implement CheckpointedRestoring - Restore from previous savepoint, user function has to somehow move the state that is restored using CheckpointedRestoring to another type of state, .e.g operator state, using the OperatorStateStore. - Perform another savepoint, shutdown job - Remove CheckpointedRestoring interface from user function - Restore from the second savepoint - Done. If the CheckpointedRestoring interface is not removed as prescribed in the last steps then a future restore of a new savepoint will fail because Flink will try to read legacy operator state that is not there anymore. The above steps also apply to Flink 1.3, when a user want's to move away from the Checkpointed interface. commit f08661adcf3a64daf955ace70683ef2fe14cec2c Author: Aljoscha Krettek Date: 2017-04-24T09:25:32Z [FLINK-5969] Add ContinuousFileProcessingFrom12MigrationTest The binary snapshots were created on the Flink 1.2 branch. commit e70424eb6c9861e89c78f12143f319ce6eea49c1 Author: Aljoscha Krettek
[jira] [Commented] (FLINK-6356) Make times() eager and enable allowing combinations.
[ https://issues.apache.org/jira/browse/FLINK-6356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984512#comment-15984512 ] ASF GitHub Bot commented on FLINK-6356: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/3761 @kl0u I don't have any more comments, so for me it LGTM. I've also rebased the skip-till-next on top of this one, so would be nice to merge it and start reviewing #3698 > Make times() eager and enable allowing combinations. > > > Key: FLINK-6356 > URL: https://issues.apache.org/jira/browse/FLINK-6356 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > This is the PR that addresses it https://github.com/apache/flink/pull/3761 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6107) Add custom checkstyle for flink-streaming-java
[ https://issues.apache.org/jira/browse/FLINK-6107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984523#comment-15984523 ] ASF GitHub Bot commented on FLINK-6107: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3567 > Add custom checkstyle for flink-streaming-java > -- > > Key: FLINK-6107 > URL: https://issues.apache.org/jira/browse/FLINK-6107 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > There was some consensus on the ML > (https://lists.apache.org/thread.html/94c8c5186b315c58c3f8aaf536501b99e8b92ee97b0034dee295ff6a@%3Cdev.flink.apache.org%3E) > that we want to have a more uniform code style. We should start > module-by-module and by introducing increasingly stricter rules. We have to > be aware of the PR situation and ensure that we have minimal breakage for > contributors. > This issue aims at adding a custom checkstyle.xml for > {{flink-streaming-java}} that is based on our current checkstyle.xml but adds > these checks for Javadocs: > {code} > > > > > > > > > > > > > > > > > > > > > > > > > {code} > This checks: > - Every type has a type-level Javadoc > - Proper use of {{}} in Javadocs > - First sentence must end with a proper punctuation mark > - Proper use (including closing) of HTML tags -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6387) Flink UI support access log
[ https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984522#comment-15984522 ] ASF GitHub Bot commented on FLINK-6387: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3777 @zentol @shijinkui Ah, forget what I said, I only read the description and thought it was something different ;-) I thought this was about forbidding UI users to access the logs (which seems valid since the logs may contain sensitive information). This seems to be about auditing, so completely different... > Flink UI support access log > --- > > Key: FLINK-6387 > URL: https://issues.apache.org/jira/browse/FLINK-6387 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: shijinkui >Assignee: shijinkui > > Record the use request to the access log. Append use access to the log file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3733: [FLINK-6091] [table] Implement and turn on retraction for...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3733 Thanks for the update @hequn8128. The PR looks really good, IMO. Will do a couple of minor refactorings and merge it to `table-retraction` branch. Best, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984524#comment-15984524 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3733 Thanks for the update @hequn8128. The PR looks really good, IMO. Will do a couple of minor refactorings and merge it to `table-retraction` branch. Best, Fabian > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics reporter
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3736 +1 from my side @zentol any further comments/concerns? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984387#comment-15984387 ] ASF GitHub Bot commented on FLINK-6013: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3736 +1 from my side @zentol any further comments/concerns? > Add Datadog HTTP metrics reporter > - > > Key: FLINK-6013 > URL: https://issues.apache.org/jira/browse/FLINK-6013 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.0 > > > We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a > lot other companies also do. > Flink right now only has a StatsD metrics reporter, and users have to set up > Datadog Agent in order to receive metrics from StatsD and transport them to > Datadog. We don't like this approach. > We prefer to have a Datadog metrics reporter directly contacting Datadog http > endpoint. > I'll take this ticket myself. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5869) ExecutionGraph use FailoverCoordinator to manage the failover of execution vertexes
[ https://issues.apache.org/jira/browse/FLINK-5869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984396#comment-15984396 ] ASF GitHub Bot commented on FLINK-5869: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3772#discussion_r113399653 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java --- @@ -72,6 +72,13 @@ .defaultValue(16) .withDeprecatedKeys("job-manager.max-attempts-history-size"); + /** +* The maximum number of prior execution attempts kept in history. +*/ + public static final ConfigOption EXECUTION_FAILOVER_STRATEGY = + key("jobmanager.execution.failover-strategy") + .defaultValue("full"); --- End diff -- I don't think we have done that yet, no... There is a PR to extend the config options to include a description (and generate docs), it could be an extension there. > ExecutionGraph use FailoverCoordinator to manage the failover of execution > vertexes > --- > > Key: FLINK-5869 > URL: https://issues.apache.org/jira/browse/FLINK-5869 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu > > Execution graph doesn't manage the failover of executions. It only care for > the state of the whole job, which is CREATED, RUNNING, FAILED, FINISHED, or > SUSPEND. > For execution failure, it will notice the FailoverCoordinator to do failover. > It only record the finished job vertex and changes to FINISHED after all > vertexes finished. > It will change to final fail if restart strategy fail or meet unrecoverable > exceptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3772: [FLINK-5869] [flip-1] Introduce abstraction for Fa...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3772#discussion_r113399653 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java --- @@ -72,6 +72,13 @@ .defaultValue(16) .withDeprecatedKeys("job-manager.max-attempts-history-size"); + /** +* The maximum number of prior execution attempts kept in history. +*/ + public static final ConfigOption EXECUTION_FAILOVER_STRATEGY = + key("jobmanager.execution.failover-strategy") + .defaultValue("full"); --- End diff -- I don't think we have done that yet, no... There is a PR to extend the config options to include a description (and generate docs), it could be an extension there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6387) Flink UI support access log
[ https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984464#comment-15984464 ] ASF GitHub Bot commented on FLINK-6387: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3777#discussion_r113406502 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java --- @@ -183,4 +194,28 @@ else if (currentDecoder != null && msg instanceof HttpContent) { } } } + + /** + * Record the access log if enable configure of + * {@link org.apache.flink.configuration.ConfigConstants#JOB_MANAGER_WEB_ACCESSLOG_ENABLE}. + * record format: + * remote_addr - [time_local] "request_method URI protocolVersion" "http_referer" "http_user_agent" + */ + private void accesslog(ChannelHandlerContext ctx, HttpRequest req) { --- End diff -- this should be renamed to `logAccess`. > Flink UI support access log > --- > > Key: FLINK-6387 > URL: https://issues.apache.org/jira/browse/FLINK-6387 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: shijinkui >Assignee: shijinkui > > Record the use request to the access log. Append use access to the log file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6387) Flink UI support access log
[ https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984465#comment-15984465 ] ASF GitHub Bot commented on FLINK-6387: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3777#discussion_r113406464 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java --- @@ -645,6 +645,10 @@ @Deprecated public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.log.path"; + /** Config parameter indicating whether jobs can be uploaded and run from the web-frontend. */ --- End diff -- This doc isn't in sync with what the implementation actually does. It doesn't restrict anything but only logs all requests. > Flink UI support access log > --- > > Key: FLINK-6387 > URL: https://issues.apache.org/jira/browse/FLINK-6387 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: shijinkui >Assignee: shijinkui > > Record the use request to the access log. Append use access to the log file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6387) Flink UI support access log
[ https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984463#comment-15984463 ] ASF GitHub Bot commented on FLINK-6387: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3777#discussion_r113406982 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java --- @@ -645,6 +645,10 @@ @Deprecated public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.log.path"; + /** Config parameter indicating whether jobs can be uploaded and run from the web-frontend. */ + public static final ConfigOption JOB_MANAGER_WEB_ACCESSLOG_ENABLE = + key("jobmanager.web.accesslog.enable").defaultValue(false); --- End diff -- please move the `.defaultValue` call into a new line. It should also be moved to the JobManagerOptions class. > Flink UI support access log > --- > > Key: FLINK-6387 > URL: https://issues.apache.org/jira/browse/FLINK-6387 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: shijinkui >Assignee: shijinkui > > Record the use request to the access log. Append use access to the log file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6387) Flink UI support access log
[ https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984466#comment-15984466 ] ASF GitHub Bot commented on FLINK-6387: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3777#discussion_r113406843 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java --- @@ -183,4 +194,28 @@ else if (currentDecoder != null && msg instanceof HttpContent) { } } } + + /** + * Record the access log if enable configure of + * {@link org.apache.flink.configuration.ConfigConstants#JOB_MANAGER_WEB_ACCESSLOG_ENABLE}. + * record format: + * remote_addr - [time_local] "request_method URI protocolVersion" "http_referer" "http_user_agent" + */ + private void accesslog(ChannelHandlerContext ctx, HttpRequest req) { + HttpHeaders headers = req.headers(); + if (headers != null) { + String line = ctx.channel().remoteAddress() + " - [" + new Date() + "] \"" --- End diff -- Please merge this string generation into the log statement. > Flink UI support access log > --- > > Key: FLINK-6387 > URL: https://issues.apache.org/jira/browse/FLINK-6387 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: shijinkui >Assignee: shijinkui > > Record the use request to the access log. Append use access to the log file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984491#comment-15984491 ] ASF GitHub Bot commented on FLINK-6225: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113409179 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +/** + * A cassandra {@link StreamTableSink}. + * + */ +class CassandraTableSink implements StreamTableSink { + private final ClusterBuilder builder; + private final String cql; + private String[] fieldNames; + private TypeInformation[] fieldTypes; + private final Properties properties; + + public CassandraTableSink(ClusterBuilder builder, String cql, Properties properties) { + this.builder = Preconditions.checkNotNull(builder, "builder"); --- End diff -- Please include a slightly longer exception message, i.e "ClusterBuilder must not be null." The same applies to other calls to `Preconditions#checkNotNull()`. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984500#comment-15984500 ] ASF GitHub Bot commented on FLINK-6225: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113410392 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; private static final ArrayList> collection = new ArrayList<>(20); + private static final ArrayList rowCollection = new ArrayList<>(20); + + private static final String[] FIELD_NAMES = new String[] {"id", "counter", "batch_id"}; + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}; --- End diff -- missing indentation. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984501#comment-15984501 ] ASF GitHub Bot commented on FLINK-6225: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113410447 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; private static final ArrayList> collection = new ArrayList<>(20); + private static final ArrayList rowCollection = new ArrayList<>(20); + + private static final String[] FIELD_NAMES = new String[] {"id", "counter", "batch_id"}; --- End diff -- remove the space after `[]`. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984499#comment-15984499 ] ASF GitHub Bot commented on FLINK-6225: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113410291 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; private static final ArrayList> collection = new ArrayList<>(20); + private static final ArrayList rowCollection = new ArrayList<>(20); + + private static final String[] FIELD_NAMES = new String[] {"id", "counter", "batch_id"}; + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}; static { for (int i = 0; i < 20; i++) { collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0)); + rowCollection.add(org.apache.flink.types.Row.of(UUID.randomUUID().toString(), i, 0)); --- End diff -- same as above regarding the import > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984492#comment-15984492 ] ASF GitHub Bot commented on FLINK-6225: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113408171 --- Diff: flink-connectors/flink-connector-cassandra/pom.xml --- @@ -176,5 +176,10 @@ under the License. + + org.apache.flink + flink-table_2.10 + 1.3-SNAPSHOT --- End diff -- should be set to provided. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984528#comment-15984528 ] ASF GitHub Bot commented on FLINK-6288: --- Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 The new API should in a new class, such as FlinkKafkaPartitioner. For the older KafkaPartitioner implementation, it will be delegated by FlinkKafkaAdpterPartitioner extends FlinkKafkaPartitioner, in which contains defaultTopicId/partitions and KafkaPartitioner delegate, and map the new api to the current one in terms. Of course, as it is now, default topic's partitions will be used for the diffrent target topics in FlinkKafkaAdpterPartitioner. > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...
Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 The new API should in a new class, such as FlinkKafkaPartitioner. For the older KafkaPartitioner implementation, it will be delegated by FlinkKafkaAdpterPartitioner extends FlinkKafkaPartitioner, in which contains defaultTopicId/partitions and KafkaPartitioner delegate, and map the new api to the current one in terms. Of course, as it is now, default topic's partitions will be used for the diffrent target topics in FlinkKafkaAdpterPartitioner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113415863 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema { + + /** +* Avro serialization schema. +*/ + private final Schema schema; + + /** +* Writer to serialize Avro record into a byte array. +*/ + private final DatumWriter datumWriter; + + /** +* Output stream to serialize records into byte array. +*/ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** +* Low-level class for serialization of Avro values. +*/ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** +* Creates a Avro serialization schema for the given schema. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** +* Converts a (nested) Flink Row into Avro's {@link GenericRecord}. +* Strings are converted into Avro's {@link Utf8} fields. +*/ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { --- End diff -- Thanks I added the case for the reverse order. This code is needed for nullable records in order to access the schema. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984537#comment-15984537 ] ASF GitHub Bot commented on FLINK-3871: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113415863 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema { + + /** +* Avro serialization schema. +*/ + private final Schema schema; + + /** +* Writer to serialize Avro record into a byte array. +*/ + private final DatumWriter datumWriter; + + /** +* Output stream to serialize records into byte array. +*/ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** +* Low-level class for serialization of Avro values. +*/ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** +* Creates a Avro serialization schema for the given schema. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** +* Converts a (nested) Flink Row into Avro's {@link GenericRecord}. +* Strings are converted into Avro's {@link Utf8} fields. +*/ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { --- End diff -- Thanks I added the case for the reverse order. This code is needed for nullable records in order to access the
[GitHub] flink issue #3777: [FLINK-6387] [webfrontend]Flink UI support access log
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3777 I think this is a good issue. I am wondering if we may want to approach this a bit broader even and define certain "access levels" to the UI: - view jobs only - view jobs and logs - view jobs, logs, cancel jobs - view jobs, logs, cancel jobs, submit new jobs What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6387) Flink UI support access log
[ https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984391#comment-15984391 ] ASF GitHub Bot commented on FLINK-6387: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3777 I think this is a good issue. I am wondering if we may want to approach this a bit broader even and define certain "access levels" to the UI: - view jobs only - view jobs and logs - view jobs, logs, cancel jobs - view jobs, logs, cancel jobs, submit new jobs What do you think? > Flink UI support access log > --- > > Key: FLINK-6387 > URL: https://issues.apache.org/jira/browse/FLINK-6387 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: shijinkui >Assignee: shijinkui > > Record the use request to the access log. Append use access to the log file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984441#comment-15984441 ] ASF GitHub Bot commented on FLINK-6288: --- Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3766 How would this new API map to the current one in terms of backwards compatibility? > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6281) Create TableSink for JDBC
[ https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984517#comment-15984517 ] ASF GitHub Bot commented on FLINK-6281: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r113410993 --- Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.types.Row; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class JDBCTableSinkTest { + private static final String[] FIELD_NAMES = new String[] {"foo"}; --- End diff -- remove space after `[]`. > Create TableSink for JDBC > - > > Key: FLINK-6281 > URL: https://issues.apache.org/jira/browse/FLINK-6281 > Project: Flink > Issue Type: Improvement >Reporter: Haohui Mai >Assignee: Haohui Mai > > It would be nice to integrate the table APIs with the JDBC connectors so that > the rows in the tables can be directly pushed into JDBC. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3777: [FLINK-6387] [webfrontend]Flink UI support access log
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3777 @zentol @shijinkui Ah, forget what I said, I only read the description and thought it was something different ;-) I thought this was about forbidding UI users to access the logs (which seems valid since the logs may contain sensitive information). This seems to be about auditing, so completely different... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3567: [FLINK-6107] Add custom checkstyle for flink-strea...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3567 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6281) Create TableSink for JDBC
[ https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984520#comment-15984520 ] ASF GitHub Bot commented on FLINK-6281: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r113412006 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.UUID; + +public class JDBCTableSink extends GenericWriteAheadSink implements StreamTableSink { + private final JDBCOutputFormat outputFormat; + private final CheckpointCommitter committer; + private final String[] fieldNames; + private final TypeInformation[] fieldTypes; + + public JDBCTableSink(CheckpointCommitter committer, TypeSerializer serializer, + JDBCOutputFormat outputFormat, String[] fieldNames, + TypeInformation[] fieldTypes) throws Exception { + super(committer, serializer, UUID.randomUUID().toString().replace("-", "_")); + this.outputFormat = outputFormat; + this.committer = committer; + this.fieldNames = fieldNames; + this.fieldTypes = fieldTypes; + } + + @Override + public void emitDataStream(DataStream dataStream) { + dataStream.transform("JDBC Sink", getOutputType(), this); + } + + @Override + public TypeInformation getOutputType() { + return new RowTypeInfo(fieldTypes, fieldNames); + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation[] getFieldTypes() { + return fieldTypes; + } + + @Override + public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + try { + return new JDBCTableSink(committer, serializer, outputFormat, fieldNames, fieldTypes); + } catch (Exception e) { + LOG.warn("Failed to create a copy of the sink.", e); + return null; + } + } + + @Override + protected boolean sendValues(Iterable value, long timestamp) throws Exception { + for (Row r : value) { + try { + outputFormat.writeRecord(r); --- End diff -- This doesn't guarantee in any way that the values are actually being sent; you need some kind of flushing functionality for this to work properly. > Create TableSink for JDBC > - > > Key: FLINK-6281 > URL: https://issues.apache.org/jira/browse/FLINK-6281 > Project: Flink > Issue Type: Improvement >Reporter: Haohui Mai >Assignee: Haohui Mai > > It would be nice to integrate the table APIs with the JDBC connectors so that > the rows in the tables can be directly pushed into JDBC. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6281) Create TableSink for JDBC
[ https://issues.apache.org/jira/browse/FLINK-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984519#comment-15984519 ] ASF GitHub Bot commented on FLINK-6281: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r113410894 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.UUID; + +public class JDBCTableSink extends GenericWriteAheadSink implements StreamTableSink { + private final JDBCOutputFormat outputFormat; + private final CheckpointCommitter committer; + private final String[] fieldNames; + private final TypeInformation[] fieldTypes; + + public JDBCTableSink(CheckpointCommitter committer, TypeSerializer serializer, + JDBCOutputFormat outputFormat, String[] fieldNames, + TypeInformation[] fieldTypes) throws Exception { --- End diff -- like the cassandra sink the `fieldNames/Types` should be removed to provide a clean API to the user. > Create TableSink for JDBC > - > > Key: FLINK-6281 > URL: https://issues.apache.org/jira/browse/FLINK-6281 > Project: Flink > Issue Type: Improvement >Reporter: Haohui Mai >Assignee: Haohui Mai > > It would be nice to integrate the table APIs with the JDBC connectors so that > the rows in the tables can be directly pushed into JDBC. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6356) Make times() eager and enable allowing combinations.
[ https://issues.apache.org/jira/browse/FLINK-6356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984364#comment-15984364 ] ASF GitHub Bot commented on FLINK-6356: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3761 Thanks for the review @dawidwys ! Thanks a lot for the review. I integrated your comments. Let me know when you are done so that I can merge this. > Make times() eager and enable allowing combinations. > > > Key: FLINK-6356 > URL: https://issues.apache.org/jira/browse/FLINK-6356 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > This is the PR that addresses it https://github.com/apache/flink/pull/3761 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3761: [FLINK-6356] Make optional available to all patterns/time...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3761 Thanks for the review @dawidwys ! Thanks a lot for the review. I integrated your comments. Let me know when you are done so that I can merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6336) Placement Constraints for Mesos
[ https://issues.apache.org/jira/browse/FLINK-6336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984362#comment-15984362 ] Stephen Gran commented on FLINK-6336: - Hi, Any chance this can make the 1.3.0 release? Thanks! > Placement Constraints for Mesos > --- > > Key: FLINK-6336 > URL: https://issues.apache.org/jira/browse/FLINK-6336 > Project: Flink > Issue Type: New Feature > Components: Mesos >Affects Versions: 1.2.0 >Reporter: Stephen Gran >Priority: Minor > > Fenzo supports placement constraints for tasks, and operators expose agent > attributes to frameworks in the form of attributes about the agent offer. > It would be extremely helpful in our multi-tenant cluster to be able to make > use of this facility. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6288) FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
[ https://issues.apache.org/jira/browse/FLINK-6288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984402#comment-15984402 ] ASF GitHub Bot commented on FLINK-6288: --- Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 @gyfora Thanks for your comment. And right, your question is very good. I originally thought that the user must be sure to know all the output topic when the job is submitted, but in the real business scenario, the data may be output to the dynamically generated topic. For the requirementof generate dynamic topic, I propose to adjust the open and partition api of KafkaPartitioner as follows: 1. The open method, remove the parameter int[] partitions, and will be opend once for each partitioner public void open(int parallelInstanceId, int parallelInstances) 2. The partition method, add int[] partitions and target topic parameters public int partition(T next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) @gyfora @tzulitai What do you think of this? Please feel free to give any suggestions, thanks! > FlinkKafkaProducer's custom Partitioner is always invoked with number of > partitions of default topic > > > Key: FLINK-6288 > URL: https://issues.apache.org/jira/browse/FLINK-6288 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Fang Yong > > The {{FlinkKafkaProducerBase}} supports routing records to topics besides the > default topic, but the custom {{Partitioner}} interface does not follow this > semantic. > The partitioner is always invoked the {{partition}} method with the number of > partitions in the default topic, and not the number of partitions of the > current {{targetTopic}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3766: [FLINK-6288] fix FlinkKafkaProducer's custom Partitioner ...
Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 @gyfora Thanks for your comment. And right, your question is very good. I originally thought that the user must be sure to know all the output topic when the job is submitted, but in the real business scenario, the data may be output to the dynamically generated topic. For the requirementof generate dynamic topic, I propose to adjust the open and partition api of KafkaPartitioner as follows: 1. The open method, remove the parameter int[] partitions, and will be opend once for each partitioner public void open(int parallelInstanceId, int parallelInstances) 2. The partition method, add int[] partitions and target topic parameters public int partition(T next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) @gyfora @tzulitai What do you think of this? Please feel free to give any suggestions, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984497#comment-15984497 ] ASF GitHub Bot commented on FLINK-6225: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113410371 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; private static final ArrayList> collection = new ArrayList<>(20); + private static final ArrayList rowCollection = new ArrayList<>(20); + + private static final String[] FIELD_NAMES = new String[] {"id", "counter", "batch_id"}; + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO, --- End diff -- Move `STRING_TYPE_INFO` to the next line. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984496#comment-15984496 ] ASF GitHub Bot commented on FLINK-6225: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113410046 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -86,11 +83,15 @@ private static EmbeddedCassandraService cassandra; + private static String HOST = "127.0.0.1"; + + private static int PORT = 9042; --- End diff -- should be final > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984494#comment-15984494 ] ASF GitHub Bot commented on FLINK-6225: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113410687 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -438,6 +461,27 @@ public void cancel() { } @Test + public void testCassandraTableSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStreamSource source = env.fromCollection(rowCollection); + CassandraTableSink cassandraTableSink = new CassandraTableSink(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, PORT)).build(); + } + }, INSERT_DATA_QUERY, new Properties()); + cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES); --- End diff -- Shouldn't we assign the returned value to `cassandraTableSink`? > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984495#comment-15984495 ] ASF GitHub Bot commented on FLINK-6225: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113410244 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; private static final ArrayList> collection = new ArrayList<>(20); + private static final ArrayList rowCollection = new ArrayList<>(20); --- End diff -- Please add an import for `Row` instead of using the qualified name. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984493#comment-15984493 ] ASF GitHub Bot commented on FLINK-6225: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113409930 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +/** + * A cassandra {@link StreamTableSink}. + * --- End diff -- remove this line. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113410291 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; private static final ArrayList> collection = new ArrayList<>(20); + private static final ArrayList rowCollection = new ArrayList<>(20); + + private static final String[] FIELD_NAMES = new String[] {"id", "counter", "batch_id"}; + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}; static { for (int i = 0; i < 20; i++) { collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0)); + rowCollection.add(org.apache.flink.types.Row.of(UUID.randomUUID().toString(), i, 0)); --- End diff -- same as above regarding the import --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113410371 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; private static final ArrayList> collection = new ArrayList<>(20); + private static final ArrayList rowCollection = new ArrayList<>(20); + + private static final String[] FIELD_NAMES = new String[] {"id", "counter", "batch_id"}; + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO, --- End diff -- Move `STRING_TYPE_INFO` to the next line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113410392 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; private static final ArrayList> collection = new ArrayList<>(20); + private static final ArrayList rowCollection = new ArrayList<>(20); + + private static final String[] FIELD_NAMES = new String[] {"id", "counter", "batch_id"}; + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}; --- End diff -- missing indentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113410244 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; private static final ArrayList> collection = new ArrayList<>(20); + private static final ArrayList rowCollection = new ArrayList<>(20); --- End diff -- Please add an import for `Row` instead of using the qualified name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113410447 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -108,10 +109,16 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; private static final ArrayList> collection = new ArrayList<>(20); + private static final ArrayList rowCollection = new ArrayList<>(20); + + private static final String[] FIELD_NAMES = new String[] {"id", "counter", "batch_id"}; --- End diff -- remove the space after `[]`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113410023 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -86,11 +83,15 @@ private static EmbeddedCassandraService cassandra; + private static String HOST = "127.0.0.1"; --- End diff -- should be final --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113408171 --- Diff: flink-connectors/flink-connector-cassandra/pom.xml --- @@ -176,5 +176,10 @@ under the License. + + org.apache.flink + flink-table_2.10 + 1.3-SNAPSHOT --- End diff -- should be set to provided. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113409930 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +/** + * A cassandra {@link StreamTableSink}. + * --- End diff -- remove this line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113410687 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -438,6 +461,27 @@ public void cancel() { } @Test + public void testCassandraTableSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStreamSource source = env.fromCollection(rowCollection); + CassandraTableSink cassandraTableSink = new CassandraTableSink(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, PORT)).build(); + } + }, INSERT_DATA_QUERY, new Properties()); + cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES); --- End diff -- Shouldn't we assign the returned value to `cassandraTableSink`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984498#comment-15984498 ] ASF GitHub Bot commented on FLINK-6225: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113410023 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -86,11 +83,15 @@ private static EmbeddedCassandraService cassandra; + private static String HOST = "127.0.0.1"; --- End diff -- should be final > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113410046 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -86,11 +83,15 @@ private static EmbeddedCassandraService cassandra; + private static String HOST = "127.0.0.1"; + + private static int PORT = 9042; --- End diff -- should be final --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r113409179 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +/** + * A cassandra {@link StreamTableSink}. + * + */ +class CassandraTableSink implements StreamTableSink { + private final ClusterBuilder builder; + private final String cql; + private String[] fieldNames; + private TypeInformation[] fieldTypes; + private final Properties properties; + + public CassandraTableSink(ClusterBuilder builder, String cql, Properties properties) { + this.builder = Preconditions.checkNotNull(builder, "builder"); --- End diff -- Please include a slightly longer exception message, i.e "ClusterBuilder must not be null." The same applies to other calls to `Preconditions#checkNotNull()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3777: [FLINK-6387] [webfrontend]Flink UI support access ...
GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/3777 [FLINK-6387] [webfrontend]Flink UI support access log Record the use request to the access log. Append use access to the log file. - [X] General - The pull request references the related JIRA issue ("[FLINK-6387] [webfrontend]Flink UI support access log") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/hwstreaming/flink access_log_support Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3777.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3777 commit 0d19fb95072c90125152513c9b2a07b518d16b27 Author: shijinkuiDate: 2017-02-23T12:06:43Z [FLINK-6387] [webfrontend]Flink UI support access log --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6387) Flink UI support access log
[ https://issues.apache.org/jira/browse/FLINK-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984332#comment-15984332 ] ASF GitHub Bot commented on FLINK-6387: --- GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/3777 [FLINK-6387] [webfrontend]Flink UI support access log Record the use request to the access log. Append use access to the log file. - [X] General - The pull request references the related JIRA issue ("[FLINK-6387] [webfrontend]Flink UI support access log") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/hwstreaming/flink access_log_support Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3777.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3777 commit 0d19fb95072c90125152513c9b2a07b518d16b27 Author: shijinkuiDate: 2017-02-23T12:06:43Z [FLINK-6387] [webfrontend]Flink UI support access log > Flink UI support access log > --- > > Key: FLINK-6387 > URL: https://issues.apache.org/jira/browse/FLINK-6387 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: shijinkui >Assignee: shijinkui > > Record the use request to the access log. Append use access to the log file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r113405541 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Mapgauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; --- End diff -- You could define these as a `ConfigOption` which would make it more obvious what the default value is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984448#comment-15984448 ] ASF GitHub Bot commented on FLINK-6013: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r113405034 --- Diff: flink-metrics/flink-metrics-datadog/pom.xml --- @@ -0,0 +1,109 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + + org.apache.flink + flink-metrics + 1.3-SNAPSHOT + .. + + + flink-metrics-datadog + flink-metrics-datadog + + + + org.apache.flink + flink-metrics-core + ${project.version} + provided + + + + com.fasterxml.jackson.core + jackson-databind + provided + + + + com.squareup.okhttp3 + okhttp + 3.6.0 + + + + com.squareup.okio + okio + 1.11.0 + + + + + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + test + + + + org.apache.flink + flink-test-utils-junit + ${project.version} + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + jar-with-dependencies --- End diff -- I would remove this and adjust the `opt.xml` again; this makes it more consistent with other shaded jars that we produce. > Add Datadog HTTP metrics reporter > - > > Key: FLINK-6013 > URL: https://issues.apache.org/jira/browse/FLINK-6013 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.0 > > > We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a > lot other companies also do. > Flink right now only has a StatsD metrics reporter, and users have to set up > Datadog Agent in order to receive metrics from StatsD and transport them to > Datadog. We don't like this approach. > We prefer to have a Datadog metrics reporter directly contacting Datadog http > endpoint. > I'll take this ticket myself. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984447#comment-15984447 ] ASF GitHub Bot commented on FLINK-6013: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r113405541 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Mapgauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; --- End diff -- You could define these as a `ConfigOption` which would make it more obvious what the default value is. > Add Datadog HTTP metrics reporter > - > > Key: FLINK-6013 > URL: https://issues.apache.org/jira/browse/FLINK-6013 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.0 > > > We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a > lot other companies also do. > Flink right now only has a StatsD metrics reporter, and users have to set up > Datadog Agent in order to receive metrics from StatsD and transport them to > Datadog. We don't like this approach. > We prefer to have a Datadog metrics reporter directly contacting Datadog http > endpoint. > I'll take this ticket myself. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r113405034 --- Diff: flink-metrics/flink-metrics-datadog/pom.xml --- @@ -0,0 +1,109 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + + org.apache.flink + flink-metrics + 1.3-SNAPSHOT + .. + + + flink-metrics-datadog + flink-metrics-datadog + + + + org.apache.flink + flink-metrics-core + ${project.version} + provided + + + + com.fasterxml.jackson.core + jackson-databind + provided + + + + com.squareup.okhttp3 + okhttp + 3.6.0 + + + + com.squareup.okio + okio + 1.11.0 + + + + + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + test + + + + org.apache.flink + flink-test-utils-junit + ${project.version} + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + jar-with-dependencies --- End diff -- I would remove this and adjust the `opt.xml` again; this makes it more consistent with other shaded jars that we produce. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3777: [FLINK-6387] [webfrontend]Flink UI support access ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3777#discussion_r113406843 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java --- @@ -183,4 +194,28 @@ else if (currentDecoder != null && msg instanceof HttpContent) { } } } + + /** + * Record the access log if enable configure of + * {@link org.apache.flink.configuration.ConfigConstants#JOB_MANAGER_WEB_ACCESSLOG_ENABLE}. + * record format: + * remote_addr - [time_local] "request_method URI protocolVersion" "http_referer" "http_user_agent" + */ + private void accesslog(ChannelHandlerContext ctx, HttpRequest req) { + HttpHeaders headers = req.headers(); + if (headers != null) { + String line = ctx.channel().remoteAddress() + " - [" + new Date() + "] \"" --- End diff -- Please merge this string generation into the log statement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3777: [FLINK-6387] [webfrontend]Flink UI support access ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3777#discussion_r113406502 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java --- @@ -183,4 +194,28 @@ else if (currentDecoder != null && msg instanceof HttpContent) { } } } + + /** + * Record the access log if enable configure of + * {@link org.apache.flink.configuration.ConfigConstants#JOB_MANAGER_WEB_ACCESSLOG_ENABLE}. + * record format: + * remote_addr - [time_local] "request_method URI protocolVersion" "http_referer" "http_user_agent" + */ + private void accesslog(ChannelHandlerContext ctx, HttpRequest req) { --- End diff -- this should be renamed to `logAccess`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3777: [FLINK-6387] [webfrontend]Flink UI support access ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3777#discussion_r113406464 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java --- @@ -645,6 +645,10 @@ @Deprecated public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.log.path"; + /** Config parameter indicating whether jobs can be uploaded and run from the web-frontend. */ --- End diff -- This doc isn't in sync with what the implementation actually does. It doesn't restrict anything but only logs all requests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3777: [FLINK-6387] [webfrontend]Flink UI support access ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3777#discussion_r113406982 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java --- @@ -645,6 +645,10 @@ @Deprecated public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.log.path"; + /** Config parameter indicating whether jobs can be uploaded and run from the web-frontend. */ + public static final ConfigOption JOB_MANAGER_WEB_ACCESSLOG_ENABLE = + key("jobmanager.web.accesslog.enable").defaultValue(false); --- End diff -- please move the `.defaultValue` call into a new line. It should also be moved to the JobManagerOptions class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r113411053 --- Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.types.Row; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class JDBCTableSinkTest { + private static final String[] FIELD_NAMES = new String[] {"foo"}; + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { --- End diff -- remove space after `[]`. move `STRING_TYPE_INFO` to this line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r113412806 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.UUID; + +public class JDBCTableSink extends GenericWriteAheadSink implements StreamTableSink { + private final JDBCOutputFormat outputFormat; + private final CheckpointCommitter committer; + private final String[] fieldNames; + private final TypeInformation[] fieldTypes; + + public JDBCTableSink(CheckpointCommitter committer, TypeSerializer serializer, --- End diff -- I would propose either adding a JDBCCheckpointCommitter that cooperates with the sink (as seen in this [prototype](https://github.com/zentol/flink/commit/92e878b59a7371ac9cad402d0b009c7439cd1900) or omitting the `CheckpointCommitter` argument and providing a dummy to the `GenericWriteAheadSink`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r113410993 --- Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.types.Row; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class JDBCTableSinkTest { + private static final String[] FIELD_NAMES = new String[] {"foo"}; --- End diff -- remove space after `[]`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r113410894 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.UUID; + +public class JDBCTableSink extends GenericWriteAheadSink implements StreamTableSink { + private final JDBCOutputFormat outputFormat; + private final CheckpointCommitter committer; + private final String[] fieldNames; + private final TypeInformation[] fieldTypes; + + public JDBCTableSink(CheckpointCommitter committer, TypeSerializer serializer, + JDBCOutputFormat outputFormat, String[] fieldNames, + TypeInformation[] fieldTypes) throws Exception { --- End diff -- like the cassandra sink the `fieldNames/Types` should be removed to provide a clean API to the user. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3712: [FLINK-6281] Create TableSink for JDBC.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r113412006 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.UUID; + +public class JDBCTableSink extends GenericWriteAheadSink implements StreamTableSink { + private final JDBCOutputFormat outputFormat; + private final CheckpointCommitter committer; + private final String[] fieldNames; + private final TypeInformation[] fieldTypes; + + public JDBCTableSink(CheckpointCommitter committer, TypeSerializer serializer, + JDBCOutputFormat outputFormat, String[] fieldNames, + TypeInformation[] fieldTypes) throws Exception { + super(committer, serializer, UUID.randomUUID().toString().replace("-", "_")); + this.outputFormat = outputFormat; + this.committer = committer; + this.fieldNames = fieldNames; + this.fieldTypes = fieldTypes; + } + + @Override + public void emitDataStream(DataStream dataStream) { + dataStream.transform("JDBC Sink", getOutputType(), this); + } + + @Override + public TypeInformation getOutputType() { + return new RowTypeInfo(fieldTypes, fieldNames); + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation[] getFieldTypes() { + return fieldTypes; + } + + @Override + public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + try { + return new JDBCTableSink(committer, serializer, outputFormat, fieldNames, fieldTypes); + } catch (Exception e) { + LOG.warn("Failed to create a copy of the sink.", e); + return null; + } + } + + @Override + protected boolean sendValues(Iterable value, long timestamp) throws Exception { + for (Row r : value) { + try { + outputFormat.writeRecord(r); --- End diff -- This doesn't guarantee in any way that the values are actually being sent; you need some kind of flushing functionality for this to work properly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3780: [FLINK-6274] Replaces usages of org.codehaus.jacks...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3780 [FLINK-6274] Replaces usages of org.codehaus.jackson Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 6274_replace_codehaus Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3780.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3780 commit 84ca3a6297e50609211efef5ac22d1bae58a28c1 Author: zentolDate: 2017-04-06T12:33:47Z [FLINK-6274] Replaces usages of org.codehaus.jackson --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4973) Flakey Yarn tests due to recently added latency marker
[ https://issues.apache.org/jira/browse/FLINK-4973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984659#comment-15984659 ] Andrey commented on FLINK-4973: --- Here is more logs. Job was executed for some time and then was cancelled from the cli: {code} 2017-04-26 11:26:44,209 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Source: Custom Source (2/2) (62b7b08d204abbb64f03325032b6d7b5). [flink-akka.actor.default-dispatcher-14] 2017-04-26 11:26:44,209 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (2/2) (62b7b08d204abbb64f03325032b6d7b5) switched from RUNNING to CANCELING. [flink-akka.actor.default-dispatcher-14] 2017-04-26 11:26:44,213 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Source: Custom Source (2/2) (62b7b08d204abbb64f03325032b6d7b5). [flink-akka.actor.default-dispatcher-14] 2017-04-26 11:26:44,216 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Map (2/2) (32731388b322028553940b6c65216398). [flink-akka.actor.default-dispatcher-14] 2017-04-26 11:26:44,216 INFO org.apache.flink.runtime.taskmanager.Task - Map (2/2) (32731388b322028553940b6c65216398) switched from RUNNING to CANCELING. [flink-akka.actor.default-dispatcher-14] 2017-04-26 11:26:44,218 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Map (2/2) (32731388b322028553940b6c65216398). [flink-akka.actor.default-dispatcher-14] 2017-04-26 11:26:44,222 WARN org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while emitting latency marker. [Time Trigger for Source: Custom Source (2/2)] java.lang.RuntimeException: Buffer pool is destroyed. at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:99) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:791) at org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:142) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:256) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:149) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:138) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:106) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:104) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:96) ... 10 more 2017-04-26 11:26:44,320 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (2/2) (62b7b08d204abbb64f03325032b6d7b5) switched from CANCELING to CANCELED. [Source: Custom Source (2/2)] 2017-04-26 11:26:44,320 INFO org.apache.flink.runtime.taskmanager.Task - Map (2/2) (32731388b322028553940b6c65216398) switched from CANCELING to CANCELED. [Map (2/2)] {code} > Flakey Yarn tests due to recently added latency marker > -- > > Key: FLINK-4973 > URL: https://issues.apache.org/jira/browse/FLINK-4973 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.2.0 > > > The newly introduced {{LatencyMarksEmitter}} emits latency marker on the > {{Output}}. This can still happen after the underlying {{BufferPool}} has > been destroyed. The occurring exception is then logged: > {code} >
[GitHub] flink issue #3769: [FLINK-6367] support custom header settings of allow orig...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3769 Looks good, +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-6093) Implement and turn on retraction for table sink
[ https://issues.apache.org/jira/browse/FLINK-6093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng reassigned FLINK-6093: -- Assignee: Hequn Cheng (was: Shaoxuan Wang) > Implement and turn on retraction for table sink > > > Key: FLINK-6093 > URL: https://issues.apache.org/jira/browse/FLINK-6093 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Add sink tableInsert and NeedRetract property, and consider table sink in > optimizer RetractionRule -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6248) Make the optional() available to all offered patterns.
[ https://issues.apache.org/jira/browse/FLINK-6248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-6248. - Resolution: Fixed Merged at b3de48d0e2c1fec6ccab3a8dd0f02b711dc0ae0a > Make the optional() available to all offered patterns. > -- > > Key: FLINK-6248 > URL: https://issues.apache.org/jira/browse/FLINK-6248 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > Currently the {{optional()}} quantifier is available as a separate pattern. > This issue proposes to make it available as a flag to all patterns. > This implies that: > 1) a singleton pattern with {{optional=true}} will become the current > {{OPTIONAL}}, > 2) a {{oneToMany}} will become {{zeroToMany}}, > 3) the {{zeroToMany}} will not exist as a direct option in the {{Pattern}} > class, and > 4) the {{times()}} will require some changes in the {{NFACompiler}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6356) Make times() eager and enable allowing combinations.
[ https://issues.apache.org/jira/browse/FLINK-6356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-6356. - Resolution: Fixed Merged at 0d856b34f90c0775b7c9d5e88f5ed6cdf6090851 > Make times() eager and enable allowing combinations. > > > Key: FLINK-6356 > URL: https://issues.apache.org/jira/browse/FLINK-6356 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > This is the PR that addresses it https://github.com/apache/flink/pull/3761 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984700#comment-15984700 ] ASF GitHub Bot commented on FLINK-3871: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113437414 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the byte[] messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { + + /** +* Schema for deterministic field order. +*/ + private final Schema schema; + + /** +* Reader that deserializes byte array into a record. +*/ + private final DatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private final MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data +*/ + private final Decoder decoder; + + /** +* Record to deserialize byte array to. +*/ + private GenericRecord record; + + /** +* Creates a Avro deserialization schema for the given record. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowDeserializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + // read record + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + + // convert to row + final Object row = convertToRow(schema, record); + return (Row) row; + } + + /** +* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. +* Avro's {@link Utf8} fields are converted into regular Java strings. +*/ + private static Object convertToRow(Schema schema, Object recordObj) { + if (recordObj instanceof GenericRecord) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types =
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984544#comment-15984544 ] ASF GitHub Bot commented on FLINK-3871: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113416949 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema { + + /** +* Avro serialization schema. +*/ + private final Schema schema; + + /** +* Writer to serialize Avro record into a byte array. +*/ + private final DatumWriter datumWriter; + + /** +* Output stream to serialize records into byte array. +*/ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** +* Low-level class for serialization of Avro values. +*/ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** +* Creates a Avro serialization schema for the given schema. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** +* Converts a (nested) Flink Row into Avro's {@link GenericRecord}. +* Strings are converted into Avro's {@link Utf8} fields. +*/ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); + } + else { +
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113416754 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema { + + /** +* Avro serialization schema. +*/ + private final Schema schema; + + /** +* Writer to serialize Avro record into a byte array. +*/ + private final DatumWriter datumWriter; + + /** +* Output stream to serialize records into byte array. +*/ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** +* Low-level class for serialization of Avro values. +*/ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** +* Creates a Avro serialization schema for the given schema. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** +* Converts a (nested) Flink Row into Avro's {@link GenericRecord}. +* Strings are converted into Avro's {@link Utf8} fields. +*/ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { --- End diff -- Ah, OK. I see. Then let's keep it :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113416949 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema { + + /** +* Avro serialization schema. +*/ + private final Schema schema; + + /** +* Writer to serialize Avro record into a byte array. +*/ + private final DatumWriter datumWriter; + + /** +* Output stream to serialize records into byte array. +*/ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** +* Low-level class for serialization of Avro values. +*/ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** +* Creates a Avro serialization schema for the given schema. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** +* Converts a (nested) Flink Row into Avro's {@link GenericRecord}. +* Strings are converted into Avro's {@link Utf8} fields. +*/ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); + } + else { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + } + } else if
[jira] [Commented] (FLINK-6382) Support all numeric types for generated graphs in Gelly examples
[ https://issues.apache.org/jira/browse/FLINK-6382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984626#comment-15984626 ] Greg Hogan commented on FLINK-6382: --- Hi [~heytitle], it would be great if you could review the PR. I will use this PR to test FLINK-3722 in a local branch so NormalizedKeySorter changes can be merged and FLINK-5734 rebased. > Support all numeric types for generated graphs in Gelly examples > > > Key: FLINK-6382 > URL: https://issues.apache.org/jira/browse/FLINK-6382 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.3.0 > > > The Gelly examples current support {{IntValue}}, {{LongValue}}, and > {{StringValue}} for {{RMatGraph}}. Allow transformations and tests for all > generated graphs for {{ByteValue}}, {{Byte}}, {{ShortValue}}, {{Short}}, > {{CharValue}}, {{Character}}, {{Integer}}, {{Long}}, and {{String}}. > This is additionally of interest for benchmarking and testing modifications > to Flink's internal sort. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984638#comment-15984638 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3733 Hi @hequn8128, I merged the PR to `table-retraction`. Can you close it? Thanks, Fabian > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6093) Implement and turn on retraction for table sink
[ https://issues.apache.org/jira/browse/FLINK-6093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng updated FLINK-6093: --- Description: Add needsUpdatesAsRetraction property to TableSink, and take TableSink into consideration during RetractionRule (was: Add sink tableInsert and NeedRetract property, and consider table sink in optimizer RetractionRule) > Implement and turn on retraction for table sink > > > Key: FLINK-6093 > URL: https://issues.apache.org/jira/browse/FLINK-6093 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Add needsUpdatesAsRetraction property to TableSink, and take TableSink into > consideration during RetractionRule -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984722#comment-15984722 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113439992 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the byte[] messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { + + /** +* Schema for deterministic field order. +*/ + private final Schema schema; + + /** +* Reader that deserializes byte array into a record. +*/ + private final DatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private final MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data +*/ + private final Decoder decoder; + + /** +* Record to deserialize byte array to. +*/ + private GenericRecord record; + + /** +* Creates a Avro deserialization schema for the given record. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowDeserializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new ReflectDatumReader<>(schema); + this.record = new GenericData.Record(schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + // read record + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + + // convert to row + final Object row = convertToRow(schema, record); + return (Row) row; + } + + /** +* Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. +* Avro's {@link Utf8} fields are converted into regular Java strings. +*/ + private static Object convertToRow(Schema schema, Object recordObj) { + if (recordObj instanceof GenericRecord) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types =
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984542#comment-15984542 ] ASF GitHub Bot commented on FLINK-3871: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3663#discussion_r113416754 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema { + + /** +* Avro serialization schema. +*/ + private final Schema schema; + + /** +* Writer to serialize Avro record into a byte array. +*/ + private final DatumWriter datumWriter; + + /** +* Output stream to serialize records into byte array. +*/ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** +* Low-level class for serialization of Avro values. +*/ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** +* Creates a Avro serialization schema for the given schema. +* +* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row +*/ + @SuppressWarnings("unchecked") + public AvroRowSerializationSchema(Class recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new ReflectDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** +* Converts a (nested) Flink Row into Avro's {@link GenericRecord}. +* Strings are converted into Avro's {@link Utf8} fields. +*/ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { --- End diff -- Ah, OK. I see. Then let's keep it :-) > Add Kafka TableSource with Avro serialization >
[GitHub] flink pull request #3779: [FLINK-6382] [gelly] Support all numeric types for...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/3779 [FLINK-6382] [gelly] Support all numeric types for generated graphs in Gelly examples The Gelly examples current support IntValue, LongValue, and StringValue for RMatGraph. Allow transformations and tests for all generated graphs for ByteValue, Byte, ShortValue, Short, CharValue, Character, Integer, Long, and String. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 6382_support_all_numeric_types_for_generated_graphs_in_gelly_examples Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3779.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3779 commit c1ef02c4df46b127788cee63d080b588b731e696 Author: Greg HoganDate: 2017-04-25T15:36:08Z [FLINK-6382] [gelly] Support all numeric types for generated graphs in Gelly examples The Gelly examples current support IntValue, LongValue, and StringValue for RMatGraph. Allow transformations and tests for all generated graphs for ByteValue, Byte, ShortValue, Short, CharValue, Character, Integer, Long, and String. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---