flink git commit: [FLINK-3075] Change Either creation method names and expose Right/Left classes
Repository: flink Updated Branches: refs/heads/master 31a2de86d -> 3e9d33ee5 [FLINK-3075] Change Either creation method names and expose Right/Left classes Closes #1402 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e9d33ee Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e9d33ee Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e9d33ee Branch: refs/heads/master Commit: 3e9d33ee5b15d20bf1d2ee757543806619997b49 Parents: 31a2de8 Author: Gyula ForaAuthored: Sat Nov 28 18:47:02 2015 +0100 Committer: Gyula Fora Committed: Sun Nov 29 22:28:40 2015 +0100 -- .../apache/flink/api/java/typeutils/Either.java | 58 .../typeutils/runtime/EitherSerializer.java | 25 + .../java/type/extractor/TypeExtractorTest.java | 2 +- .../api/java/typeutils/EitherTypeInfoTest.java | 12 ++-- .../typeutils/runtime/EitherSerializerTest.java | 22 5 files changed, 82 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/3e9d33ee/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java index ba446a1..8382831 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java @@ -19,39 +19,45 @@ package org.apache.flink.api.java.typeutils; /** - * This type represents a value of one two possible types, Left or Right - * (a disjoint union), inspired by Scala's Either type. + * This type represents a value of one two possible types, Left or Right (a + * disjoint union), inspired by Scala's Either type. * - * @param the type of Left - * @param the type of Right + * @param + *the type of Left + * @param + *the type of Right */ public abstract class Either { /** * Create a Left value of Either */ - public static Either left(L value) { + public static Either Left(L value) { return new Left (value); } /** * Create a Right value of Either */ - public static Either right(R value) { + public static Either Right(R value) { return new Right (value); } /** * Retrieve the Left value of Either. +* * @return the Left value -* @throws IllegalStateException if called on a Right +* @throws IllegalStateException +* if called on a Right */ public abstract L left() throws IllegalStateException; /** * Retrieve the Right value of Either. +* * @return the Right value -* @throws IllegalStateException if called on a Left +* @throws IllegalStateException +* if called on a Left */ public abstract R right() throws IllegalStateException; @@ -71,7 +77,15 @@ public abstract class Either { return getClass() == Right.class; } - private static class Left extends Either { + /** +* A left value of {@link Either} +* +* @param +*the type of Left +* @param +*the type of Right +*/ + public static class Left extends Either { private final L value; public Left(L value) { @@ -106,9 +120,25 @@ public abstract class Either { public String toString() { return "Left(" + value.toString() + ")"; } + + /** +* Creates a left value of {@link Either} +* +*/ + public static Left of(L left) { + return new Left (left); + } } - private static class Right extends Either { + /** +* A right value of {@link Either} +* +* @param +*the type of Left +* @param +*the type of Right +*/ + public static class Right extends Either { private final R value; public Right(R value) { @@ -143,5 +173,13 @@ public abstract class Either { public String toString() { return "Right(" + value.toString() + ")"; } + +
flink git commit: [FLINK-3081] Properly stop periodic Kafka committer
Repository: flink Updated Branches: refs/heads/master e9a2bc9d0 -> a997dd615 [FLINK-3081] Properly stop periodic Kafka committer This closes #1410 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a997dd61 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a997dd61 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a997dd61 Branch: refs/heads/master Commit: a997dd615598650934f0b785cbe8a6468ea63481 Parents: e9a2bc9 Author: Robert MetzgerAuthored: Thu Nov 26 14:25:54 2015 +0100 Committer: Robert Metzger Committed: Sun Nov 29 15:56:23 2015 +0100 -- .../streaming/connectors/kafka/FlinkKafkaConsumer.java | 12 1 file changed, 8 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a997dd61/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java index 446648f..e42faef 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java @@ -414,14 +414,18 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction // same here. long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "6")); offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this); + offsetCommitter.setDaemon(true); offsetCommitter.start(); LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval); } - fetcher.run(sourceContext, deserializer, lastOffsets); - - if (offsetCommitter != null) { - offsetCommitter.close(); + try { + fetcher.run(sourceContext, deserializer, lastOffsets); + } finally { + if (offsetCommitter != null) { + offsetCommitter.close(); + offsetCommitter.join(); + } } } else {
flink git commit: [FLINK-3081] Properly stop periodic Kafka committer
Repository: flink Updated Branches: refs/heads/release-0.10 d6e118b8e -> 961adea59 [FLINK-3081] Properly stop periodic Kafka committer Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/961adea5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/961adea5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/961adea5 Branch: refs/heads/release-0.10 Commit: 961adea591054ef6dae2e06f76cef7409eff8204 Parents: d6e118b Author: Robert MetzgerAuthored: Sun Nov 29 16:05:00 2015 +0100 Committer: Robert Metzger Committed: Sun Nov 29 16:05:00 2015 +0100 -- .../streaming/connectors/kafka/FlinkKafkaConsumer.java | 12 1 file changed, 8 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/961adea5/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java index e701639..8791fc8 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java @@ -390,14 +390,18 @@ public class FlinkKafkaConsumer extends RichParallelSourceFunction // same here. long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "6")); offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this); + offsetCommitter.setDaemon(true); offsetCommitter.start(); LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval); } - fetcher.run(sourceContext, valueDeserializer, lastOffsets); - - if (offsetCommitter != null) { - offsetCommitter.close(); + try { + fetcher.run(sourceContext, valueDeserializer, lastOffsets); + } finally { + if (offsetCommitter != null) { + offsetCommitter.close(); + offsetCommitter.join(); + } } } else {
flink git commit: [FLINK-2961] [table] Add support for basic type Date in Table API
Repository: flink Updated Branches: refs/heads/master a997dd615 -> 31a2de86d [FLINK-2961] [table] Add support for basic type Date in Table API Fix nullCheck enabled Fix test TableConfig introduced Improvements and bug fixing This closes #1322. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31a2de86 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31a2de86 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31a2de86 Branch: refs/heads/master Commit: 31a2de86d9034def9c58ef7f3d3dcae3d4dafd6f Parents: a997dd6 Author: twalthrAuthored: Tue Nov 3 15:18:19 2015 +0100 Committer: twalthr Committed: Sun Nov 29 16:48:27 2015 +0100 -- docs/libs/table.md | 6 +- .../apache/flink/api/table/TableConfig.scala| 66 + .../table/codegen/ExpressionCodeGenerator.scala | 246 --- .../api/table/codegen/GenerateFilter.scala | 27 +- .../flink/api/table/codegen/GenerateJoin.scala | 37 ++- .../table/codegen/GenerateResultAssembler.scala | 13 +- .../api/table/codegen/GenerateSelect.scala | 20 +- .../flink/api/table/expressions/literals.scala | 2 + .../api/table/parser/ExpressionParser.scala | 3 +- .../runtime/ExpressionFilterFunction.scala | 9 +- .../table/runtime/ExpressionJoinFunction.scala | 9 +- .../runtime/ExpressionSelectFunction.scala | 9 +- .../api/java/table/test/CastingITCase.java | 23 ++ .../api/scala/table/test/CastingITCase.scala| 22 +- .../scala/table/test/ExpressionsITCase.scala| 18 ++ 15 files changed, 440 insertions(+), 70 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/docs/libs/table.md -- diff --git a/docs/libs/table.md b/docs/libs/table.md index be0b45e..d1d42cc 100644 --- a/docs/libs/table.md +++ b/docs/libs/table.md @@ -351,11 +351,11 @@ unary = [ "!" | "-" | "~" ] , suffix ; suffix = atom | aggregation | cast | as | substring ; -aggregation = atom , [ ".sum" | ".min" | ".max" | ".count" | "avg" ] ; +aggregation = atom , [ ".sum" | ".min" | ".max" | ".count" | ".avg" ] ; cast = atom , ".cast(" , data type , ")" ; -data type = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOL" | "BOOLEAN" | "STRING" ; +data type = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOL" | "BOOLEAN" | "STRING" | "DATE" ; as = atom , ".as(" , field reference , ")" ; @@ -372,3 +372,5 @@ atom = ( "(" , single expression , ")" ) | literal | field reference ; Here, `literal` is a valid Java literal and `field reference` specifies a column in the data. The column names follow Java identifier syntax. +Only the types `LONG` and `STRING` can be casted to `DATE` and vice versa. A `LONG` casted to `DATE` must be a milliseconds timestamp. A `STRING` casted to `DATE` must have the format "`-MM-dd HH:mm:ss.SSS`", "`-MM-dd`", "`HH:mm:ss`", or a milliseconds timestamp. By default, all timestamps refer to the UTC timezone beginning from January 1, 1970, 00:00:00 in milliseconds. + http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala -- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala new file mode 100644 index 000..ffa2bec --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +import java.util.TimeZone + +/** + * A config to define the runtime behavior of the Table API. + */ +class TableConfig extends Serializable { + + /** + * Defines the timezone for date/time/timestamp conversions. +
buildbot success in ASF Buildbot on flink-docs-release-0.9
The Buildbot has detected a restored build on builder flink-docs-release-0.9 while building ASF Buildbot. Full details are available at: http://ci.apache.org/builders/flink-docs-release-0.9/builds/157 Buildbot URL: http://ci.apache.org/ Buildslave for this Build: lares_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.9' triggered this build Build Source Stamp: [branch release-0.9] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot