[5/5] flink git commit: [FLINK-3265] [rabbitmq] Fix concurrency bug in RabbitMQ
[FLINK-3265] [rabbitmq] Fix concurrency bug in RabbitMQ This closes #1534 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b01a890 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b01a890 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b01a890 Branch: refs/heads/master Commit: 6b01a89020f2de3f7710cf72336291b1e8ca8562 Parents: d97fcda Author: Robert MetzgerAuthored: Thu Jan 21 12:22:21 2016 +0100 Committer: Stephan Ewen Committed: Thu Jan 28 14:43:03 2016 +0100 -- .../connectors/rabbitmq/RMQSource.java | 4 +- .../connectors/rabbitmq/RMQSourceTest.java | 79 .../source/MessageAcknowledgingSourceBase.java | 51 +++-- ...ltipleIdsMessageAcknowledgingSourceBase.java | 24 +++--- 4 files changed, 124 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6b01a890/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java -- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java index 09bb07c..59bc057 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java @@ -196,7 +196,9 @@ public class RMQSource extends MultipleIdsMessageAcknowledgingSourceBasehttp://git-wip-us.apache.org/repos/asf/flink/blob/6b01a890/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java -- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java index aa19e5d..0a3de84 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java @@ -23,6 +23,7 @@ import com.rabbitmq.client.Envelope; import com.rabbitmq.client.QueueingConsumer; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; @@ -31,6 +32,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -103,6 +105,83 @@ public class RMQSourceTest { sourceThread.join(); } + /** +* Make sure concurrent access to snapshotState() and notifyCheckpointComplete() don't cause +* an issue. +* +* Without proper synchronization, the test will fail with a concurrent modification exception +* +*/ + @Test + public void testConcurrentAccess() throws Exception { + source.autoAck = false; + sourceThread.start(); + + final Tuple1 error = new Tuple1<>(null); + + Thread.sleep(5); + + Thread snapshotThread = new Thread(new Runnable() { + public long id = 0; + + @Override + public void run() { + while (!Thread.interrupted()) { + try { + source.snapshotState(id++, 0); + } catch (Exception e) { + error.f0 = e; + break; // stop thread + } + } + } + }); + + Thread notifyThread =
[3/5] flink git commit: [FLINK-3242] Also Set User-specified StateBackend without Checkpointing
[FLINK-3242] Also Set User-specified StateBackend without Checkpointing Before, the user-specified StateBackedn would not be set when generating the JobGraph if checkpointing was disabled. This closes #1516 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83b88c2c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83b88c2c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83b88c2c Branch: refs/heads/master Commit: 83b88c2c606f0d36bc04a7250629eb00516af919 Parents: f6d2ce9 Author: Aljoscha KrettekAuthored: Mon Jan 18 11:53:31 2016 +0100 Committer: Stephan Ewen Committed: Thu Jan 28 14:30:28 2016 +0100 -- .../api/graph/StreamingJobGraphGenerator.java | 2 +- .../runtime/state/StateBackendITCase.java | 134 +++ 2 files changed, 135 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/83b88c2c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 50c6a15..56b16a4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -317,10 +317,10 @@ public class StreamingJobGraphGenerator { final CheckpointConfig ceckpointCfg = streamGraph.getCheckpointConfig(); + config.setStateBackend(streamGraph.getStateBackend()); config.setCheckpointingEnabled(ceckpointCfg.isCheckpointingEnabled()); if (ceckpointCfg.isCheckpointingEnabled()) { config.setCheckpointMode(ceckpointCfg.getCheckpointingMode()); - config.setStateBackend(streamGraph.getStateBackend()); } else { // the "at-least-once" input handler is slightly cheaper (in the absence of checkpoints), http://git-wip-us.apache.org/repos/asf/flink/blob/83b88c2c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java -- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java new file mode 100644 index 000..cdfef85 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.state; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.Test; + +import java.io.Serializable; + +import static org.junit.Assert.assertTrue; + + +public class StateBackendITCase extends StreamingMultipleProgramsTestBase { + + /** +* Verify that the user-specified state backend is used even if checkpointing is disabled. +* +* @throws Exception
[4/5] flink git commit: [FLINK-3292]Fix for Bug in flink-jdbc. Not all JDBC drivers supported
[FLINK-3292]Fix for Bug in flink-jdbc. Not all JDBC drivers supported This closes #1551 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d97fcda6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d97fcda6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d97fcda6 Branch: refs/heads/master Commit: d97fcda6635b6821ac3f61c39e0fa156bc7c7fd4 Parents: 83b88c2 Author: Subhobrata DeyAuthored: Wed Jan 27 17:00:37 2016 -0500 Committer: Stephan Ewen Committed: Thu Jan 28 14:37:53 2016 +0100 -- .../flink/api/java/io/jdbc/JDBCInputFormat.java | 16 +++- .../flink/api/java/io/jdbc/JDBCInputFormatTest.java | 3 +++ .../api/java/io/jdbc/JDBCOutputFormatTest.java | 3 +++ 3 files changed, 21 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d97fcda6/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java -- diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java index eb3ac31..84eb309 100644 --- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java +++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java @@ -58,6 +58,8 @@ public class JDBCInputFormat extends RichInputFormat
[1/5] flink git commit: [FLINK-3178] Don't Emit In-Flight Windows When Closing Window Operator
Repository: flink Updated Branches: refs/heads/master 2e2330737 -> 6b01a8902 [FLINK-3178] Don't Emit In-Flight Windows When Closing Window Operator This closes #1542 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4e5a55f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4e5a55f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4e5a55f Branch: refs/heads/master Commit: c4e5a55f027ed73ce557f10d5125a0b168832889 Parents: 2e23307 Author: Aljoscha KrettekAuthored: Mon Jan 18 13:25:03 2016 +0100 Committer: Stephan Ewen Committed: Thu Jan 28 14:19:17 2016 +0100 -- .../examples/windowing/SessionWindowing.java| 2 +- .../util/TopSpeedWindowingExampleData.java | 8 +- ...ractAlignedProcessingTimeWindowOperator.java | 5 - .../windowing/NonKeyedWindowOperator.java | 14 +-- .../operators/windowing/WindowOperator.java | 17 ++- .../api/complex/ComplexIntegrationTest.java | 3 +- ...AlignedProcessingTimeWindowOperatorTest.java | 110 ++- ...AlignedProcessingTimeWindowOperatorTest.java | 89 --- .../util/OneInputStreamOperatorTestHarness.java | 3 +- 9 files changed, 54 insertions(+), 197 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java -- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java index 035727a..baa4af8 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java @@ -116,7 +116,7 @@ public class SessionWindowing { // Update the last seen event time lastSeenState.update(timestamp); - ctx.registerEventTimeTimer(lastSeen + sessionTimeout); + ctx.registerEventTimeTimer(timestamp + sessionTimeout); if (timeSinceLastEvent > sessionTimeout) { return TriggerResult.FIRE_AND_PURGE; http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java -- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java index bf63695..4718b8b 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java @@ -192,9 +192,7 @@ public class TopSpeedWindowingExampleData { "(1,95,1973.61115,1424952007664)\n" + "(0,100,1709.72229,1424952006663)\n" + "(0,100,1737.50007,1424952007664)\n" + - "(1,95,1973.61115,1424952007664)\n" + - "(0,100,1791.66674,1424952009664)\n" + - "(1,95,2211.8,1424952017668)\n"; + "(1,95,1973.61115,1424952007664)\n"; public static final String TOP_CASE_CLASS_SPEEDS = "CarEvent(0,55,15.277,1424951918630)\n" + @@ -267,9 +265,7 @@ public class TopSpeedWindowingExampleData { "CarEvent(1,95,1973.61115,1424952007664)\n" + "CarEvent(0,100,1709.72229,1424952006663)\n" + "CarEvent(0,100,1737.50007,1424952007664)\n" + - "CarEvent(1,95,1973.61115,1424952007664)\n" + - "CarEvent(0,100,1791.66674,1424952009664)\n" + -
flink git commit: [FLINK-3242] Adjust StateBackendITCase for 0.10 signatures of state backends
Repository: flink Updated Branches: refs/heads/release-0.10 2aeb6fac3 -> bef39f881 [FLINK-3242] Adjust StateBackendITCase for 0.10 signatures of state backends Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bef39f88 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bef39f88 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bef39f88 Branch: refs/heads/release-0.10 Commit: bef39f881a3d25334d29711808757e8944642c7e Parents: 2aeb6fa Author: Stephan EwenAuthored: Thu Jan 28 16:58:12 2016 +0100 Committer: Stephan Ewen Committed: Thu Jan 28 17:12:38 2016 +0100 -- .../runtime/state/StateBackendITCase.java | 35 ++-- 1 file changed, 18 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/bef39f88/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java -- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java index cdfef85..12233ce 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java @@ -18,22 +18,23 @@ package org.apache.flink.streaming.runtime.state; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + import org.junit.Test; import java.io.Serializable; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class StateBackendITCase extends StreamingMultipleProgramsTestBase { @@ -70,19 +71,22 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase { } }) .print(); - - boolean caughtSuccess = false; + try { see.execute(); - } catch (JobExecutionException e) { - if (e.getCause() instanceof SuccessException) { - caughtSuccess = true; - } else { - throw e; + fail("This should throw a 'SuccessException'"); + } + catch (JobExecutionException e) { + Throwable cause = e.getCause(); + if (cause == null || !(cause.getCause() instanceof SuccessException)) { + e.printStackTrace(); + fail(e.getMessage()); } } - - assertTrue(caughtSuccess); + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } @@ -90,7 +94,7 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase { private static final long serialVersionUID = 1L; @Override - public void initializeForJob(Environment env) throws Exception { + public void initializeForJob(JobID id) throws Exception { throw new SuccessException(); } @@ -105,11 +109,8 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase { } @Override - public KvState createKvState(String stateId, - String stateName, - TypeSerializer keySerializer, - TypeSerializer valueSerializer, - V defaultValue) throws Exception { + public KvState createKvState( + TypeSerializer keySerializer, TypeSerializer valueSerializer, V defaultValue) throws Exception { return null;
flink git commit: [FLINK-3275] [py] Support for DataSet.setParallelism()
Repository: flink Updated Branches: refs/heads/master 440137cc3 -> 40422d505 [FLINK-3275] [py] Support for DataSet.setParallelism() -parallelism is stored Value object within the OperationInfo, so it can be passed as a reference to multiple operations (in cases where a set is internally executed as multiple operations) -setParallelism is called for every DataSet with either a user-set value or env.getParallelism -added a DataSink set, providing access to name() and setParallelism() for sinks Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40422d50 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40422d50 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40422d50 Branch: refs/heads/master Commit: 40422d5057e5c1d7b75aec48bacbd7518cd7c9e1 Parents: 440137c Author: zentolAuthored: Thu Jan 28 10:00:25 2016 +0100 Committer: zentol Committed: Thu Jan 28 11:50:51 2016 +0100 -- .../flink/python/api/PythonOperationInfo.java | 2 + .../flink/python/api/PythonPlanBinder.java | 117 ++- .../flink/python/api/flink/plan/DataSet.py | 40 ++- .../flink/python/api/flink/plan/Environment.py | 1 + .../python/api/flink/plan/OperationInfo.py | 6 + 5 files changed, 109 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java -- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java index 1e3005d..7f7a993 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java @@ -45,6 +45,7 @@ public class PythonOperationInfo { public boolean toError; public String name; public boolean usesUDF; + public int parallelism; public PythonOperationInfo(PythonPlanStreamer streamer) throws IOException { identifier = (String) streamer.getRecord(); @@ -90,6 +91,7 @@ public class PythonOperationInfo { for (int x = 0; x < valueCount; x++) { values[x] = streamer.getRecord(); } + parallelism = (Integer) streamer.getRecord(true); /* aggregates = new AggregationEntry[count]; http://git-wip-us.apache.org/repos/asf/flink/blob/40422d50/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java -- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index 3877ef1..1534ebf 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -388,42 +388,53 @@ public class PythonPlanBinder { } } + private int getParallelism(PythonOperationInfo info) { + return info.parallelism == -1 ? env.getParallelism() : info.parallelism; + } + private void createCsvSource(PythonOperationInfo info) throws IOException { if (!(info.types instanceof TupleTypeInfo)) { throw new RuntimeException("The output type of a csv source has to be a tuple. The derived type is " + info); } - - sets.put(info.setID, env.createInput(new TupleCsvInputFormat(new Path(info.path), - info.lineDelimiter, info.fieldDelimiter, (TupleTypeInfo) info.types), info.types) - .name("CsvSource").map(new SerializerMap()).name("CsvSourcePostStep")); + Path path = new Path(info.path); + String lineD = info.lineDelimiter; + String fieldD = info.fieldDelimiter; + TupleTypeInfo types = (TupleTypeInfo) info.types; + sets.put(info.setID, env.createInput(new TupleCsvInputFormat(path, lineD, fieldD, types), info.types).setParallelism(getParallelism(info)).name("CsvSource") + .map(new SerializerMap()).setParallelism(getParallelism(info)).name("CsvSourcePostStep")); } private void createTextSource(PythonOperationInfo info) throws IOException { -
flink git commit: [FLINK-3140] [table] NULL value data layout in Row Serializer/Comparator
Repository: flink Updated Branches: refs/heads/master 499b60fed -> 440137cc3 [FLINK-3140] [table] NULL value data layout in Row Serializer/Comparator This closes #1465. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/440137cc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/440137cc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/440137cc Branch: refs/heads/master Commit: 440137cc3ad8ab47dff84212c81e960e0a254eaa Parents: 499b60f Author: twalthrAuthored: Fri Jan 22 15:28:03 2016 +0100 Committer: twalthr Committed: Thu Jan 28 11:44:32 2016 +0100 -- .../common/typeutils/ComparatorTestBase.java| 30 ++ .../runtime/TupleComparatorTTT1Test.java| 18 +- .../runtime/TupleComparatorTTT2Test.java| 8 - flink-libraries/flink-table/pom.xml | 8 + .../table/typeinfo/NullAwareComparator.scala| 218 ++ .../api/table/typeinfo/NullMaskUtils.scala | 98 + .../api/table/typeinfo/RowComparator.scala | 417 +++ .../api/table/typeinfo/RowSerializer.scala | 114 - .../flink/api/table/typeinfo/RowTypeInfo.scala | 61 ++- .../table/test/GroupedAggreagationsITCase.scala | 115 - .../table/test/GroupedAggregationsITCase.scala | 138 ++ .../api/table/typeinfo/RowComparatorTest.scala | 135 ++ .../api/table/typeinfo/RowSerializerTest.scala | 208 + .../api/scala/typeutils/CaseClassTypeInfo.scala | 5 +- 14 files changed, 1408 insertions(+), 165 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/440137cc/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java -- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java index a8ace92..793688d 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java @@ -399,6 +399,32 @@ public abstract class ComparatorTestBase extends TestLogger { } } + // Key extraction tests -- + + @Test + @SuppressWarnings("unchecked") + public void testKeyExtraction() { + TypeComparator comparator = getComparator(true); + T[] data = getSortedData(); + + for (T value : data) { + TypeComparator[] comparators = comparator.getFlatComparators(); + Object[] extractedKeys = new Object[comparators.length]; + int insertedKeys = comparator.extractKeys(value, extractedKeys, 0); + assertTrue(insertedKeys == comparators.length); + + for (int i = 0; i < insertedKeys; i++) { + // check if some keys are null, although this is not supported + if (!supportsNullKeys()) { + assertNotNull(extractedKeys[i]); + } + // compare the extracted key with itself as a basic check + // if the extracted key corresponds to the comparator + assertTrue(comparators[i].compare(extractedKeys[i], extractedKeys[i]) == 0); + } + } + } + // protected void deepEquals(String message, T should, T is) { @@ -450,6 +476,10 @@ public abstract class ComparatorTestBase extends TestLogger { } + protected boolean supportsNullKeys() { + return false; + } + // public static final class TestOutputView extends DataOutputStream implements DataOutputView { http://git-wip-us.apache.org/repos/asf/flink/blob/440137cc/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java -- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java index 6d4afdd..cf73be2 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
[09/11] flink git commit: [FLINK-3262] [web-dashboard] Remove fuzzy versioning from Bower dependencies
[FLINK-3262] [web-dashboard] Remove fuzzy versioning from Bower dependencies Several packages had been updated locally by users when compiling the templates without changing the version in bower.json. The following packages have been explicitly updated. jquery 2.1.4 â 2.2.0 angular1.3.15 â 1.4.8 angular-moment 0.9.2 â 0.10.3 angular-ui-router 0.2.13 â 0.2.15 bootstrap 3.3.5 â 3.3.6 d3 3.5.5 â 3.5.12 dagre-d3 0.4.10 â 0.4.11 font-awesome4.3.0 â 4.5.0 This closes #1525 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ea5e138 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ea5e138 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ea5e138 Branch: refs/heads/master Commit: 1ea5e138793e343010f1cee07d8a0d88e67b6e51 Parents: 40422d5 Author: Greg HoganAuthored: Tue Jan 19 10:21:25 2016 -0500 Committer: Stephan Ewen Committed: Thu Jan 28 13:41:37 2016 +0100 -- .gitignore | 1 + .../web-dashboard/assets/fonts/FontAwesome.otf | Bin 93888 -> 0 bytes .../assets/fonts/fontawesome-webfont.eot| Bin 60767 -> 0 bytes .../assets/fonts/fontawesome-webfont.svg| 565 - .../assets/fonts/fontawesome-webfont.ttf| Bin 122092 -> 0 bytes .../assets/fonts/fontawesome-webfont.woff | Bin 71508 -> 0 bytes .../assets/fonts/fontawesome-webfont.woff2 | Bin 56780 -> 0 bytes flink-runtime-web/web-dashboard/bower.json |20 +- flink-runtime-web/web-dashboard/gulpfile.js |13 +- .../web-dashboard/web/css/vendor.css| 13883 ++-- .../web-dashboard/web/fonts/FontAwesome.otf | Bin 93888 -> 109688 bytes .../web/fonts/fontawesome-webfont.eot | Bin 60767 -> 70807 bytes .../web/fonts/fontawesome-webfont.svg | 134 +- .../web/fonts/fontawesome-webfont.ttf | Bin 122092 -> 142072 bytes .../web/fonts/fontawesome-webfont.woff | Bin 71508 -> 83588 bytes .../web/fonts/fontawesome-webfont.woff2 | Bin 56780 -> 66624 bytes flink-runtime-web/web-dashboard/web/js/index.js |80 +- .../web-dashboard/web/js/vendor.js | 19166 ++--- 18 files changed, 18309 insertions(+), 15553 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/1ea5e138/.gitignore -- diff --git a/.gitignore b/.gitignore index a73a9d3..629d62c 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,7 @@ _site docs/api build-target flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/ +flink-runtime-web/web-dashboard/assets/fonts/ flink-runtime-web/web-dashboard/node_modules/ flink-runtime-web/web-dashboard/bower_components/ atlassian-ide-plugin.xml http://git-wip-us.apache.org/repos/asf/flink/blob/1ea5e138/flink-runtime-web/web-dashboard/assets/fonts/FontAwesome.otf -- diff --git a/flink-runtime-web/web-dashboard/assets/fonts/FontAwesome.otf b/flink-runtime-web/web-dashboard/assets/fonts/FontAwesome.otf deleted file mode 100644 index f7936cc..000 Binary files a/flink-runtime-web/web-dashboard/assets/fonts/FontAwesome.otf and /dev/null differ http://git-wip-us.apache.org/repos/asf/flink/blob/1ea5e138/flink-runtime-web/web-dashboard/assets/fonts/fontawesome-webfont.eot -- diff --git a/flink-runtime-web/web-dashboard/assets/fonts/fontawesome-webfont.eot b/flink-runtime-web/web-dashboard/assets/fonts/fontawesome-webfont.eot deleted file mode 100644 index 33b2bb8..000 Binary files a/flink-runtime-web/web-dashboard/assets/fonts/fontawesome-webfont.eot and /dev/null differ
[08/11] flink git commit: [FLINK-3262] [web-dashboard] Remove fuzzy versioning from Bower dependencies
http://git-wip-us.apache.org/repos/asf/flink/blob/1ea5e138/flink-runtime-web/web-dashboard/assets/fonts/fontawesome-webfont.svg -- diff --git a/flink-runtime-web/web-dashboard/assets/fonts/fontawesome-webfont.svg b/flink-runtime-web/web-dashboard/assets/fonts/fontawesome-webfont.svg deleted file mode 100644 index 1ee89d4..000 --- a/flink-runtime-web/web-dashboard/assets/fonts/fontawesome-webfont.svg +++ /dev/null @@ -1,565 +0,0 @@ - -http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd; > -http://www.w3.org/2000/svg; xmlns:xlink="http://www.w3.org/1999/xlink; version="1.1"> - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
[03/11] flink git commit: [FLINK-3262] [web-dashboard] Remove fuzzy versioning from Bower dependencies
http://git-wip-us.apache.org/repos/asf/flink/blob/1ea5e138/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.ttf -- diff --git a/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.ttf b/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.ttf index ed9372f..26dea79 100644 Binary files a/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.ttf and b/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.ttf differ http://git-wip-us.apache.org/repos/asf/flink/blob/1ea5e138/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.woff -- diff --git a/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.woff b/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.woff index 8b280b9..dc35ce3 100644 Binary files a/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.woff and b/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.woff differ http://git-wip-us.apache.org/repos/asf/flink/blob/1ea5e138/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.woff2 -- diff --git a/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.woff2 b/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.woff2 index 3311d58..500e517 100644 Binary files a/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.woff2 and b/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.woff2 differ
[05/11] flink git commit: [FLINK-3262] [web-dashboard] Remove fuzzy versioning from Bower dependencies
http://git-wip-us.apache.org/repos/asf/flink/blob/1ea5e138/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.eot -- diff --git a/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.eot b/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.eot index 33b2bb8..9b6afae 100644 Binary files a/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.eot and b/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.eot differ
[11/11] flink git commit: [hotfix] [streaming] Processing timer errors are not logged unless the task is actually running.
[hotfix] [streaming] Processing timer errors are not logged unless the task is actually running. This keeps the log cleaner in case of failed timers while canceling tasks. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2e233073 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2e233073 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2e233073 Branch: refs/heads/master Commit: 2e23307372147287c8b49d2985d9564b2ed54eaa Parents: 9637ee7 Author: Stephan EwenAuthored: Thu Jan 28 13:37:09 2016 +0100 Committer: Stephan Ewen Committed: Thu Jan 28 13:41:38 2016 +0100 -- .../org/apache/flink/streaming/runtime/tasks/StreamTask.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2e233073/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 91f11fa..b91c570 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -688,7 +688,9 @@ public abstract class StreamTask try { target.trigger(timestamp); } catch (Throwable t) { - LOG.error("Caught exception while processing timer.", t); + if (task.isRunning) { + LOG.error("Caught exception while processing timer.", t); + } if (task.asyncException == null) { task.asyncException = new TimerException(t); }
[04/11] flink git commit: [FLINK-3262] [web-dashboard] Remove fuzzy versioning from Bower dependencies
http://git-wip-us.apache.org/repos/asf/flink/blob/1ea5e138/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.svg -- diff --git a/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.svg b/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.svg index 1ee89d4..d05688e 100644 --- a/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.svg +++ b/flink-runtime-web/web-dashboard/web/fonts/fontawesome-webfont.svg @@ -1,6 +1,6 @@ http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd; > -http://www.w3.org/2000/svg; xmlns:xlink="http://www.w3.org/1999/xlink; version="1.1"> +http://www.w3.org/2000/svg;> @@ -219,8 +219,8 @@ - - + + @@ -362,7 +362,7 @@ - + @@ -399,7 +399,7 @@ - + @@ -410,9 +410,9 @@ - - - + + + @@ -454,12 +454,12 @@ - + - + @@ -483,13 +483,13 @@ - + - + @@ -523,7 +523,7 @@ - + @@ -531,18 +531,18 @@ - + - + - + - + @@ -555,11 +555,101 @@ - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
[06/11] flink git commit: [FLINK-3262] [web-dashboard] Remove fuzzy versioning from Bower dependencies
http://git-wip-us.apache.org/repos/asf/flink/blob/1ea5e138/flink-runtime-web/web-dashboard/web/css/vendor.css -- diff --git a/flink-runtime-web/web-dashboard/web/css/vendor.css b/flink-runtime-web/web-dashboard/web/css/vendor.css index 672e07f..2a8d00f 100644 --- a/flink-runtime-web/web-dashboard/web/css/vendor.css +++ b/flink-runtime-web/web-dashboard/web/css/vendor.css @@ -1,8565 +1,8810 @@ -/*! - * Font Awesome 4.3.0 by @davegandy - http://fontawesome.io - @fontawesome - * License - http://fontawesome.io/license (Font: SIL OFL 1.1, CSS: MIT License) - */ -/* FONT PATH - * -- */ -@font-face { - font-family: 'FontAwesome'; - src: url('../fonts/fontawesome-webfont.eot?v=4.3.0'); - src: url('../fonts/fontawesome-webfont.eot?#iefix=4.3.0') format('embedded-opentype'), url('../fonts/fontawesome-webfont.woff2?v=4.3.0') format('woff2'), url('../fonts/fontawesome-webfont.woff?v=4.3.0') format('woff'), url('../fonts/fontawesome-webfont.ttf?v=4.3.0') format('truetype'), url('../fonts/fontawesome-webfont.svg?v=4.3.0#fontawesomeregular') format('svg'); - font-weight: normal; - font-style: normal; -} -.fa { - display: inline-block; - font: normal normal normal 14px/1 FontAwesome; - font-size: inherit; - text-rendering: auto; - -webkit-font-smoothing: antialiased; - -moz-osx-font-smoothing: grayscale; - transform: translate(0, 0); +/*! normalize.css v3.0.3 | MIT License | github.com/necolas/normalize.css */ +html { + font-family: sans-serif; + -ms-text-size-adjust: 100%; + -webkit-text-size-adjust: 100%; } -/* makes the font 33% larger relative to the icon container */ -.fa-lg { - font-size: 1.em; - line-height: 0.75em; - vertical-align: -15%; +body { + margin: 0; } -.fa-2x { - font-size: 2em; +article, +aside, +details, +figcaption, +figure, +footer, +header, +hgroup, +main, +menu, +nav, +section, +summary { + display: block; } -.fa-3x { - font-size: 3em; +audio, +canvas, +progress, +video { + display: inline-block; + vertical-align: baseline; } -.fa-4x { - font-size: 4em; +audio:not([controls]) { + display: none; + height: 0; } -.fa-5x { - font-size: 5em; +[hidden], +template { + display: none; } -.fa-fw { - width: 1.28571429em; - text-align: center; +a { + background-color: transparent; } -.fa-ul { - padding-left: 0; - margin-left: 2.14285714em; - list-style-type: none; +a:active, +a:hover { + outline: 0; } -.fa-ul > li { - position: relative; +abbr[title] { + border-bottom: 1px dotted; } -.fa-li { - position: absolute; - left: -2.14285714em; - width: 2.14285714em; - top: 0.14285714em; - text-align: center; +b, +strong { + font-weight: bold; } -.fa-li.fa-lg { - left: -1.85714286em; +dfn { + font-style: italic; } -.fa-border { - padding: .2em .25em .15em; - border: solid 0.08em #ee; - border-radius: .1em; +h1 { + font-size: 2em; + margin: 0.67em 0; } -.pull-right { - float: right; +mark { + background: #ff0; + color: #000; } -.pull-left { - float: left; +small { + font-size: 80%; } -.fa.pull-left { - margin-right: .3em; +sub, +sup { + font-size: 75%; + line-height: 0; + position: relative; + vertical-align: baseline; } -.fa.pull-right { - margin-left: .3em; +sup { + top: -0.5em; } -.fa-spin { - -webkit-animation: fa-spin 2s infinite linear; - animation: fa-spin 2s infinite linear; +sub { + bottom: -0.25em; } -.fa-pulse { - -webkit-animation: fa-spin 1s infinite steps(8); - animation: fa-spin 1s infinite steps(8); +img { + border: 0; } -@-webkit-keyframes fa-spin { - 0% { --webkit-transform: rotate(0deg); -transform: rotate(0deg); - } - 100% { --webkit-transform: rotate(359deg); -transform: rotate(359deg); - } +svg:not(:root) { + overflow: hidden; } -@keyframes fa-spin { - 0% { --webkit-transform: rotate(0deg); -transform: rotate(0deg); - } - 100% { --webkit-transform: rotate(359deg); -transform: rotate(359deg); - } +figure { + margin: 1em 40px; } -.fa-rotate-90 { - filter: progid:DXImageTransform.Microsoft.BasicImage(rotation=1); - -webkit-transform: rotate(90deg); - -ms-transform: rotate(90deg); - transform: rotate(90deg); +hr { + box-sizing: content-box; + height: 0; } -.fa-rotate-180 { - filter: progid:DXImageTransform.Microsoft.BasicImage(rotation=2); - -webkit-transform: rotate(180deg); - -ms-transform: rotate(180deg); - transform: rotate(180deg); +pre { + overflow: auto; } -.fa-rotate-270 { - filter: progid:DXImageTransform.Microsoft.BasicImage(rotation=3); - -webkit-transform: rotate(270deg); - -ms-transform: rotate(270deg); - transform: rotate(270deg); +code, +kbd, +pre, +samp { + font-family: monospace, monospace; + font-size: 1em; } -.fa-flip-horizontal { - filter: progid:DXImageTransform.Microsoft.BasicImage(rotation=0, mirror=1); - -webkit-transform: scale(-1, 1); - -ms-transform: scale(-1, 1); - transform: scale(-1, 1); +button, +input,
[01/11] flink git commit: [FLINK-3262] [web-dashboard] Remove fuzzy versioning from Bower dependencies
Repository: flink Updated Branches: refs/heads/master 40422d505 -> 2e2330737 http://git-wip-us.apache.org/repos/asf/flink/blob/1ea5e138/flink-runtime-web/web-dashboard/web/js/vendor.js -- diff --git a/flink-runtime-web/web-dashboard/web/js/vendor.js b/flink-runtime-web/web-dashboard/web/js/vendor.js index 89ea316..bebddae 100644 --- a/flink-runtime-web/web-dashboard/web/js/vendor.js +++ b/flink-runtime-web/web-dashboard/web/js/vendor.js @@ -1,15 +1,15 @@ /*! - * jQuery JavaScript Library v2.1.4 + * jQuery JavaScript Library v2.2.0 * http://jquery.com/ * * Includes Sizzle.js * http://sizzlejs.com/ * - * Copyright 2005, 2014 jQuery Foundation, Inc. and other contributors + * Copyright jQuery Foundation and other contributors * Released under the MIT license * http://jquery.org/license * - * Date: 2015-04-28T16:01Z + * Date: 2016-01-08T20:02Z */ (function( global, factory ) { @@ -41,10 +41,11 @@ // Can't be in strict mode, several libs including ASP.NET trace // the stack via arguments.caller.callee and Firefox dies if // you try to trace through "use strict" call chains. (#13335) -// - +//"use strict"; var arr = []; +var document = window.document; + var slice = arr.slice; var concat = arr.concat; @@ -64,13 +65,11 @@ var support = {}; var - // Use the correct document accordingly with window argument (sandbox) - document = window.document, - - version = "2.1.4", + version = "2.2.0", // Define a local copy of jQuery jQuery = function( selector, context ) { + // The jQuery object is actually just the init constructor 'enhanced' // Need init if jQuery is called (just allow error to be thrown if not included) return new jQuery.fn.init( selector, context ); @@ -90,6 +89,7 @@ var }; jQuery.fn = jQuery.prototype = { + // The current version of jQuery being used jquery: version, @@ -133,16 +133,14 @@ jQuery.fn = jQuery.prototype = { }, // Execute a callback for every element in the matched set. - // (You can seed the arguments with an array of args, but this is - // only used internally.) - each: function( callback, args ) { - return jQuery.each( this, callback, args ); + each: function( callback ) { + return jQuery.each( this, callback ); }, map: function( callback ) { - return this.pushStack( jQuery.map(this, function( elem, i ) { + return this.pushStack( jQuery.map( this, function( elem, i ) { return callback.call( elem, i, elem ); - })); + } ) ); }, slice: function() { @@ -160,11 +158,11 @@ jQuery.fn = jQuery.prototype = { eq: function( i ) { var len = this.length, j = +i + ( i < 0 ? len : 0 ); - return this.pushStack( j >= 0 && j < len ? [ this[j] ] : [] ); + return this.pushStack( j >= 0 && j < len ? [ this[ j ] ] : [] ); }, end: function() { - return this.prevObject || this.constructor(null); + return this.prevObject || this.constructor(); }, // For internal use only. @@ -176,7 +174,7 @@ jQuery.fn = jQuery.prototype = { jQuery.extend = jQuery.fn.extend = function() { var options, name, src, copy, copyIsArray, clone, - target = arguments[0] || {}, + target = arguments[ 0 ] || {}, i = 1, length = arguments.length, deep = false; @@ -191,7 +189,7 @@ jQuery.extend = jQuery.fn.extend = function() { } // Handle case when target is a string or something (possible in deep copy) - if ( typeof target !== "object" && !jQuery.isFunction(target) ) { + if ( typeof target !== "object" && !jQuery.isFunction( target ) ) { target = {}; } @@ -202,8 +200,10 @@ jQuery.extend = jQuery.fn.extend = function() { } for ( ; i < length; i++ ) { + // Only deal with non-null/undefined values - if ( (options = arguments[ i ]) != null ) { + if ( ( options = arguments[ i ] ) != null ) { + // Extend the base object for ( name in options ) { src = target[ name ]; @@ -215,13 +215,15 @@ jQuery.extend = jQuery.fn.extend = function() { } // Recurse if we're merging plain objects or arrays - if ( deep && copy && ( jQuery.isPlainObject(copy) || (copyIsArray = jQuery.isArray(copy)) ) ) { + if ( deep && copy && ( jQuery.isPlainObject( copy ) || + ( copyIsArray
[10/11] flink git commit: [hotfix] [kafka connector] Replace funky loop with simple division in FixedPartitioner
[hotfix] [kafka connector] Replace funky loop with simple division in FixedPartitioner Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9637ee78 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9637ee78 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9637ee78 Branch: refs/heads/master Commit: 9637ee78846e4df5ef328c620cc991d394056f61 Parents: 1ea5e13 Author: Stephan EwenAuthored: Wed Jan 27 12:20:59 2016 +0100 Committer: Stephan Ewen Committed: Thu Jan 28 13:41:38 2016 +0100 -- .../kafka/partitioner/FixedPartitioner.java | 20 1 file changed, 8 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/9637ee78/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java index d9dcfc1..9b848e0 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java @@ -54,27 +54,23 @@ import java.io.Serializable; public class FixedPartitioner extends KafkaPartitioner implements Serializable { private static final long serialVersionUID = 1627268846962918126L; - int targetPartition = -1; + private int targetPartition = -1; @Override public void open(int parallelInstanceId, int parallelInstances, int[] partitions) { - int p = 0; - for (int i = 0; i < parallelInstances; i++) { - if (i == parallelInstanceId) { - targetPartition = partitions[p]; - return; - } - if (++p == partitions.length) { - p = 0; - } + if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) { + throw new IllegalArgumentException(); } + + this.targetPartition = partitions[parallelInstanceId % partitions.length]; } @Override public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { - if (targetPartition == -1) { + if (targetPartition >= 0) { + return targetPartition; + } else { throw new RuntimeException("The partitioner has not been initialized properly"); } - return targetPartition; } }
flink git commit: [FLINK-3242] Also Set User-specified StateBackend without Checkpointing
Repository: flink Updated Branches: refs/heads/release-0.10 dfeee2372 -> 2aeb6fac3 [FLINK-3242] Also Set User-specified StateBackend without Checkpointing Before, the user-specified StateBackedn would not be set when generating the JobGraph if checkpointing was disabled. This closes #1516 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2aeb6fac Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2aeb6fac Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2aeb6fac Branch: refs/heads/release-0.10 Commit: 2aeb6fac34f83b88ab888fa9d23ade784712e4b3 Parents: dfeee23 Author: Aljoscha KrettekAuthored: Mon Jan 18 11:53:31 2016 +0100 Committer: Stephan Ewen Committed: Thu Jan 28 14:35:22 2016 +0100 -- .../api/graph/StreamingJobGraphGenerator.java | 2 +- .../runtime/state/StateBackendITCase.java | 134 +++ 2 files changed, 135 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2aeb6fac/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index b5f3cf4..d060078 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -274,10 +274,10 @@ public class StreamingJobGraphGenerator { config.setNonChainedOutputs(nonChainableOutputs); config.setChainedOutputs(chainableOutputs); + config.setStateBackend(streamGraph.getStateBackend()); config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled()); if (streamGraph.isCheckpointingEnabled()) { config.setCheckpointMode(streamGraph.getCheckpointingMode()); - config.setStateBackend(streamGraph.getStateBackend()); } else { // the at least once input handler is slightly cheaper (in the absence of checkpoints), // so we use that one if checkpointing is not enabled http://git-wip-us.apache.org/repos/asf/flink/blob/2aeb6fac/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java -- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java new file mode 100644 index 000..cdfef85 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.state; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.Test; + +import java.io.Serializable; + +import static org.junit.Assert.assertTrue; + + +public class StateBackendITCase extends