[GitHub] flink issue #6365: [FLINK-9849] [hbase] Hbase upgrade to 2.0.1
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/6365 That would be ideal - a new connector for hbase 2.x considering that: hbase 1.x is stable hbase 2.y is newly released. ---
[GitHub] flink issue #6378: [FLINK-9236] [pom] upgrade the version of apache parent p...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/6378 No particular reason - considering the date when JIRA was first logged :-) ---
[GitHub] flink issue #6378: [FLINK-9236] [pom] upgrade the version of apache parent p...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/6378 lgtm Pending QA bot ---
[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/6335 lgtm ---
[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/6335 From https://travis-ci.org/apache/flink/jobs/404127448 : ``` Failed tests: BlobServerPutTest.testPutBufferFailsIncomingForJob Expected: (an instance of java.io.IOException and exception with message a string containing " (Permission denied)") but: exception with message a string containing " (Permission denied)" message was "/tmp/junit4230160308108538699/junit7824839689112636430/blobStore-94b4189a-4e00-4bca-82c2-5ca72b1fb950/incoming/temp-0001" Stacktrace was: java.nio.file.AccessDeniedException: /tmp/junit4230160308108538699/junit7824839689112636430/blobStore-94b4189a-4e00-4bca-82c2-5ca72b1fb950/incoming/temp-0001 at sun.nio.fs.UnixException.translateToIOException(UnixException.java:84) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) at java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:434) at java.nio.file.Files.newOutputStream(Files.java:216) at org.apache.flink.runtime.blob.BlobServer.putBuffer(BlobServer.java:594) at org.apache.flink.runtime.blob.BlobServer.putTransient(BlobServer.java:542) at org.apache.flink.runtime.blob.BlobServerPutTest.put(BlobServerPutTest.java:799) at org.apache.flink.runtime.blob.BlobServerPutTest.testPutBufferFailsIncoming(BlobServerPutTest.java:559) at org.apache.flink.runtime.blob.BlobServerPutTest.testPutBufferFailsIncomingForJob(BlobServerPutTest.java:516) ``` Please check the test failure. Thanks ---
[GitHub] flink pull request #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafk...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/6181#discussion_r196368052 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.partitioner; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Arrays; + +/** + * A partitioner that uses the hash of the provided key to distribute + * the values over the partitions as evenly as possible. + * This partitioner ensures that all records with the same key will be sent to + * the same Kafka partition. + * + * Note that this will cause a lot of network connections to be created between + * all the Flink instances and all the Kafka brokers. + */ +@PublicEvolving +public class FlinkKeyHashPartitioner extends FlinkKafkaPartitioner { + + private static final long serialVersionUID = -2006468063065010594L; + + @Override + public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + Preconditions.checkArgument( + partitions != null && partitions.length > 0, + "Partitions of the target topic is empty."); + + return partitions[hash(key) % partitions.length]; --- End diff -- Should we guard against hash(key) % partitions.length < 0 (in case someone overrides hash()) ? ---
[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r189036753 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -507,7 +507,7 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) { SerializedValue<AssignerWithPunctuatedWatermarks> watermarksPunctuated, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { - List<KafkaTopicPartitionState> partitionStates = new LinkedList<>(); + List<KafkaTopicPartitionState> partitionStates = new CopyOnWriteArrayList<>(); --- End diff -- explanation can be made with a comment. No need to link to issue comment. ---
[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/5991#discussion_r187647275 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java --- @@ -221,7 +221,8 @@ private FlinkKafkaConsumer08( getLong( checkNotNull(props, "props"), KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED), - !getBoolean(props, KEY_DISABLE_METRICS, false)); + !getBoolean(props, KEY_DISABLE_METRICS, false), + getBoolean(props, KEY_CHECK_UNAVAILABLE_TOPICS, false)); --- End diff -- Should this be named KEY_CHECK_UNAVAILABLE_PARTITIONS ? ---
[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/5991#discussion_r187648458 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -374,8 +384,8 @@ void setOffsetsToCommit( * This method is exposed for testing purposes. */ @VisibleForTesting - void reassignPartitions(List<KafkaTopicPartitionState> newPartitions) throws Exception { - if (newPartitions.size() == 0) { + void reassignPartitions(List<KafkaTopicPartitionState> newPartitions, List partitionsToBeRemoved) throws Exception { + if (newPartitions.size() == 0 && partitionsToBeRemoved.isEmpty()) { --- End diff -- size() -> ! isEmpty ---
[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/5991#discussion_r187647828 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -80,6 +82,9 @@ /** The queue of unassigned partitions that we need to assign to the Kafka consumer. */ private final ClosableBlockingQueue<KafkaTopicPartitionState> unassignedPartitionsQueue; + /** The list of partitions to be removed from kafka consumer. */ + private final List partitionsToBeRemoved; --- End diff -- Should this be Set to facilitate fast lookup ? ---
[GitHub] flink issue #5920: [FLINK-9231] [web] Enable SO_REUSEADDR on listen sockets ...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/5920 lgtm ---
[GitHub] flink issue #5892: [FLINK-9214] YarnClient should be stopped in YARNSessionC...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/5892 lgtm ---
[GitHub] flink issue #5894: [FLINK-9185] [runtime] Fix potential null dereference in ...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/5894 lgtm Please check test failure. ---
[GitHub] flink issue #5848: Minor cleanup of Java example code for AsyncFunctions
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/5848 lgtm ---
[GitHub] flink issue #5777: [FLINK-7897] Consider using nio.Files for file deletion i...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/5777 @yuqi1129 What do you think ? ---
[GitHub] flink issue #5205: [FLINK-8037] Fix integer multiplication or shift implicit...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/5205 Looks like the PR needs to be rebased. ---
[GitHub] flink issue #5488: [FLINK-8335] [hbase] Upgrade hbase connector dependency t...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/5488 Should be good after 1.4.1 is filled in ---
[GitHub] flink issue #5205: [FLINK-8037] Fix integer multiplication or shift implicit...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/5205 +1 on @StephanEwen's suggestion. ---
[GitHub] flink issue #4356: [FLINK-5486] Fix lacking of synchronization in BucketingS...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4356 retest this please ---
[GitHub] flink issue #4316: [FLINK-6105] Use InterruptedIOException instead of IOExce...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4316 I think InterruptedException should be handled uniformly in HadoopInputFormatBase.java ---
[GitHub] flink issue #4562: [FLINK-7402] Fix ineffective null check in NettyMessage#w...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4562 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4621: [FLINK-7495] Call to AbstractUdfStreamOperator#initialize...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4621 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4570: [FLINK-7438][DataStream API]Remove useless import, avoid ...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4570 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4296#discussion_r135367240 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -526,18 +551,32 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) { return createGroupPatternState((GroupPattern) currentPattern, sinkState, proceedState, isOptional); } - final IterativeCondition trueFunction = getTrueFunction(); - final State singletonState = createState(currentPattern.getName(), State.StateType.Normal); // if event is accepted then all notPatterns previous to the optional states are no longer valid final State sink = copyWithoutTransitiveNots(sinkState); singletonState.addTake(sink, takeCondition); + // if no element accepted the previous nots are still valid. + final IterativeCondition proceedCondition = getTrueFunction(); + // for the first state of a group pattern, its PROCEED edge should point to the following state of // that group pattern and the edge will be added at the end of creating the NFA for that group pattern if (isOptional && !headOfGroup(currentPattern)) { - // if no element accepted the previous nots are still valid. - singletonState.addProceed(proceedState, trueFunction); + if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) { + final IterativeCondition untilCondition = + (IterativeCondition) currentPattern.getUntilCondition(); + if (untilCondition != null) { + singletonState.addProceed( + originalStateMap.get(proceedState.getName()), + new AndCondition<>(proceedCondition, untilCondition)); --- End diff -- Why is this not wrapped with NotCondition ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4296#discussion_r135366609 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -526,18 +551,32 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) { return createGroupPatternState((GroupPattern) currentPattern, sinkState, proceedState, isOptional); } - final IterativeCondition trueFunction = getTrueFunction(); - final State singletonState = createState(currentPattern.getName(), State.StateType.Normal); // if event is accepted then all notPatterns previous to the optional states are no longer valid final State sink = copyWithoutTransitiveNots(sinkState); singletonState.addTake(sink, takeCondition); + // if no element accepted the previous nots are still valid. + final IterativeCondition proceedCondition = getTrueFunction(); + // for the first state of a group pattern, its PROCEED edge should point to the following state of // that group pattern and the edge will be added at the end of creating the NFA for that group pattern if (isOptional && !headOfGroup(currentPattern)) { - // if no element accepted the previous nots are still valid. - singletonState.addProceed(proceedState, trueFunction); + if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) { + final IterativeCondition untilCondition = + (IterativeCondition) currentPattern.getUntilCondition(); + if (untilCondition != null) { + singletonState.addProceed( + originalStateMap.get(proceedState.getName()), + new AndCondition<>(proceedCondition, untilCondition)); + } + singletonState.addProceed(proceedState, + untilCondition != null --- End diff -- Redundant check - see line 568 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4296#discussion_r135366122 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -421,6 +437,15 @@ private void addStopStateToLooping(final State loopingState) { untilCondition, true); + if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY) && + times.getFrom() != times.getTo()) { + if (untilCondition != null) { + State sinkStateCopy = copy(sinkState); + originalStateMap.put(sinkState.getName(), sinkStateCopy); --- End diff -- When are the old entries cleared in this map ? Shall we consider using map which expires entries by TTL ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4320: [FLINK-6244] Emit timeouted Patterns as Side Outpu...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4320#discussion_r134692032 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java --- @@ -348,7 +353,18 @@ private void updateNFA(NFA nfa) throws IOException { * @param event The current event to be processed * @param timestamp The timestamp of the event */ - protected abstract void processEvent(NFA nfa, IN event, long timestamp); + private void processEvent(NFA nfa, IN event, long timestamp) { + Tuple2<Collection<Map<String, List>>, Collection<Tuple2<Map<String, List>, Long>>> patterns = + nfa.process(event, timestamp); + + try { + processMatchedSequences(patterns.f0, timestamp); + processTimeoutedSequence(patterns.f1, timestamp); --- End diff -- processTimeoutedSequence -> processTimedoutSequence --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4320: [FLINK-6244] Emit timeouted Patterns as Side Outpu...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4320#discussion_r134692117 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java --- @@ -348,7 +353,18 @@ private void updateNFA(NFA nfa) throws IOException { * @param event The current event to be processed * @param timestamp The timestamp of the event */ - protected abstract void processEvent(NFA nfa, IN event, long timestamp); + private void processEvent(NFA nfa, IN event, long timestamp) { + Tuple2<Collection<Map<String, List>>, Collection<Tuple2<Map<String, List>, Long>>> patterns = + nfa.process(event, timestamp); + + try { + processMatchedSequences(patterns.f0, timestamp); --- End diff -- processMatchedSequences -> processMatchingSequences --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4320: [FLINK-6244] Emit timeouted Patterns as Side Outpu...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4320#discussion_r134687041 --- Diff: docs/dev/libs/cep.md --- @@ -1279,63 +1279,75 @@ and `flatSelect` API calls allow a timeout handler to be specified. This timeout partial event sequence. The timeout handler receives all the events that have been matched so far by the pattern, and the timestamp when the timeout was detected. +In order to treat partial patterns, the `select` and `flatSelect` API calls offer an overloaded version which takes as +parameters + + * `PatternTimeoutFunction`/`PatternFlatTimeoutFunction` + * [OutputTag]({{ site.baseurl }}/dev/stream/side_output.html) for the side output in which the timeouted matches will be returned --- End diff -- timeouted -> timed out --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4320: [FLINK-6244] Emit timeouted Patterns as Side Outpu...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4320#discussion_r134689181 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala --- @@ -84,37 +79,54 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { * pattern sequence. * @tparam L Type of the resulting timeout event * @tparam R Type of the resulting event +* @deprecated Use the version that returns timeouted events as a side-output --- End diff -- timeouted -> timed out --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4410: [FLINK-7268] [checkpoints] Scope SharedStateRegist...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4410#discussion_r134647310 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -1044,10 +1049,23 @@ public boolean restoreLatestCheckpointedState( throw new IllegalStateException("CheckpointCoordinator is shut down"); } - // Recover the checkpoints - completedCheckpointStore.recover(sharedStateRegistry); + // We create a new shared state registry object, so that all pending async disposal requests from previous + // runs will go against the old object (were they can do no harm). + // This must happen under the checkpoint lock. + sharedStateRegistry.close(); + sharedStateRegistry = sharedStateRegistryFactory.create(executor); + + // Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery --- End diff -- If we use highAvailabilityServices.getJobManagerLeaderRetriever(), Job Id is required. Can Job Id be obtained from JobVertexID ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4568: [FLINK-7483][blob] prevent cleanup of re-registered jobs
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4568 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r134142721 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -108,11 +139,63 @@ public BlobCache( this.numFetchRetries = 0; } + // Initializing the clean up task + this.cleanupTimer = new Timer(true); + + cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000; + this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval); + // Add shutdown hook to delete storage directory shutdownHook = BlobUtils.addShutdownHook(this, LOG); } /** +* Registers use of job-related BLOBs. +* +* Using any other method to access BLOBs, e.g. {@link #getFile}, is only valid within calls +* to {@link #registerJob(JobID)} and {@link #releaseJob(JobID)}. +* +* @param jobId +* ID of the job this blob belongs to +* +* @see #releaseJob(JobID) +*/ + public void registerJob(JobID jobId) { + synchronized (jobRefCounters) { + RefCount ref = jobRefCounters.get(jobId); + if (ref == null) { + ref = new RefCount(); + jobRefCounters.put(jobId, ref); + } + ++ref.references; --- End diff -- Should keepUntil be modified (in case the code at line 193 runs) ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r134142624 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -108,11 +139,63 @@ public BlobCache( this.numFetchRetries = 0; } + // Initializing the clean up task + this.cleanupTimer = new Timer(true); + + cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000; + this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval); + // Add shutdown hook to delete storage directory shutdownHook = BlobUtils.addShutdownHook(this, LOG); } /** +* Registers use of job-related BLOBs. +* +* Using any other method to access BLOBs, e.g. {@link #getFile}, is only valid within calls +* to {@link #registerJob(JobID)} and {@link #releaseJob(JobID)}. +* +* @param jobId +* ID of the job this blob belongs to +* +* @see #releaseJob(JobID) +*/ + public void registerJob(JobID jobId) { + synchronized (jobRefCounters) { + RefCount ref = jobRefCounters.get(jobId); + if (ref == null) { + ref = new RefCount(); + jobRefCounters.put(jobId, ref); + } + ++ref.references; + } + } + + /** +* Unregisters use of job-related BLOBs and allow them to be released. +* +* @param jobId +* ID of the job this blob belongs to +* +* @see #registerJob(JobID) +*/ + public void releaseJob(JobID jobId) { + synchronized (jobRefCounters) { + RefCount ref = jobRefCounters.get(jobId); + + if (ref == null) { + LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls"); --- End diff -- Including jobId would help troubleshooting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4562: [FLINK-7402] Fix ineffective null check in NettyMe...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4562#discussion_r133980441 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -220,6 +220,10 @@ void releaseBuffer() { @Override ByteBuf write(ByteBufAllocator allocator) throws IOException { + if (null == buffer) { + throw new NullPointerException(); --- End diff -- Add exception message to provide more information. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4356: [FLINK-5486] Fix lacking of synchronization in BucketingS...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4356 Run QA again ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4482: [FLINK-4534] Fix synchronization issue in BucketingSink
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4482 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4404: [FLINK-4565] [table] Support for SQL IN operator
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4404 @fhueske Can you take a look at #4493 ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4404#discussion_r131823213 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import com.google.common.collect.ImmutableList +import org.apache.calcite.rex.{RexNode, RexSubQuery} +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} + +case class In(expression: Expression, elements: Seq[Expression]) extends Expression { + + override def toString = s"$expression.in(${elements.mkString(", ")})" + + override private[flink] def children: Seq[Expression] = expression +: elements.distinct + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { +// check if this is a sub-query expression or an element list +elements.head match { + + case TableReference(name, table) => +RexSubQuery.in(table.getRelNode, ImmutableList.of(expression.toRexNode)) + + case _ => +relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*) +} + } + + override private[flink] def validateInput(): ValidationResult = { +// check if this is a sub-query expression or an element list +elements.head match { + + case TableReference(name, table) => +if (elements.length != 1) { + return ValidationFailure("IN operator supports only one table reference.") +} +if (table.tableEnv.isInstanceOf[StreamTableEnvironment]) { + return ValidationFailure( +"Sub-query IN operator on stream tables is currently not supported.") +} +val tableOutput = table.logicalPlan.output +if (tableOutput.length > 1) { + return ValidationFailure( +s"The sub-query table '$name' must not have more than one column.") +} +(expression.resultType, tableOutput.head.resultType) match { + case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess + case (lType, rType) if lType == rType => ValidationSuccess --- End diff -- @twalthr: Can you take a look at my PR ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4404#discussion_r131726341 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import com.google.common.collect.ImmutableList +import org.apache.calcite.rex.{RexNode, RexSubQuery} +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} + +case class In(expression: Expression, elements: Seq[Expression]) extends Expression { + + override def toString = s"$expression.in(${elements.mkString(", ")})" + + override private[flink] def children: Seq[Expression] = expression +: elements.distinct + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { +// check if this is a sub-query expression or an element list +elements.head match { + + case TableReference(name, table) => +RexSubQuery.in(table.getRelNode, ImmutableList.of(expression.toRexNode)) + + case _ => +relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*) +} + } + + override private[flink] def validateInput(): ValidationResult = { +// check if this is a sub-query expression or an element list +elements.head match { + + case TableReference(name, table) => +if (elements.length != 1) { + return ValidationFailure("IN operator supports only one table reference.") +} +if (table.tableEnv.isInstanceOf[StreamTableEnvironment]) { + return ValidationFailure( +"Sub-query IN operator on stream tables is currently not supported.") +} +val tableOutput = table.logicalPlan.output +if (tableOutput.length > 1) { + return ValidationFailure( +s"The sub-query table '$name' must not have more than one column.") +} +(expression.resultType, tableOutput.head.resultType) match { + case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess + case (lType, rType) if lType == rType => ValidationSuccess --- End diff -- Created #4493 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4493: [FLINK-4565] [table] Support for SQL IN operator -...
GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/4493 [FLINK-4565] [table] Support for SQL IN operator - move case of equal check *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-1234] [component] Title of the pull request", where *FLINK-1234* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4493.patch
[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4404#discussion_r131654308 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import com.google.common.collect.ImmutableList +import org.apache.calcite.rex.{RexNode, RexSubQuery} +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} + +case class In(expression: Expression, elements: Seq[Expression]) extends Expression { + + override def toString = s"$expression.in(${elements.mkString(", ")})" + + override private[flink] def children: Seq[Expression] = expression +: elements.distinct + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { +// check if this is a sub-query expression or an element list +elements.head match { + + case TableReference(name, table) => +RexSubQuery.in(table.getRelNode, ImmutableList.of(expression.toRexNode)) + + case _ => +relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*) +} + } + + override private[flink] def validateInput(): ValidationResult = { +// check if this is a sub-query expression or an element list +elements.head match { + + case TableReference(name, table) => +if (elements.length != 1) { + return ValidationFailure("IN operator supports only one table reference.") +} +if (table.tableEnv.isInstanceOf[StreamTableEnvironment]) { + return ValidationFailure( +"Sub-query IN operator on stream tables is currently not supported.") +} +val tableOutput = table.logicalPlan.output +if (tableOutput.length > 1) { + return ValidationFailure( +s"The sub-query table '$name' must not have more than one column.") +} +(expression.resultType, tableOutput.head.resultType) match { + case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess + case (lType, rType) if lType == rType => ValidationSuccess --- End diff -- Should I open a new JIRA ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4404#discussion_r131519235 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import com.google.common.collect.ImmutableList +import org.apache.calcite.rex.{RexNode, RexSubQuery} +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} + +case class In(expression: Expression, elements: Seq[Expression]) extends Expression { + + override def toString = s"$expression.in(${elements.mkString(", ")})" + + override private[flink] def children: Seq[Expression] = expression +: elements.distinct + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { +// check if this is a sub-query expression or an element list +elements.head match { + + case TableReference(name, table) => +RexSubQuery.in(table.getRelNode, ImmutableList.of(expression.toRexNode)) + + case _ => +relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*) +} + } + + override private[flink] def validateInput(): ValidationResult = { +// check if this is a sub-query expression or an element list +elements.head match { + + case TableReference(name, table) => +if (elements.length != 1) { + return ValidationFailure("IN operator supports only one table reference.") +} +if (table.tableEnv.isInstanceOf[StreamTableEnvironment]) { + return ValidationFailure( +"Sub-query IN operator on stream tables is currently not supported.") +} +val tableOutput = table.logicalPlan.output +if (tableOutput.length > 1) { + return ValidationFailure( +s"The sub-query table '$name' must not have more than one column.") +} +(expression.resultType, tableOutput.head.resultType) match { + case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess + case (lType, rType) if lType == rType => ValidationSuccess --- End diff -- Should this be moved ahead of the isNumeric() check since this check is cheaper ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4404#discussion_r131511583 --- Diff: docs/dev/table/sql.md --- @@ -497,6 +497,23 @@ FROM ( {% endhighlight %} + + + +In +Batch + + + Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression. --- End diff -- Should the syntax be enhanced so that user can specify one column in the table with multiple columns ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4328: [FLINK-6493] Fix ineffective null check in Registe...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4328#discussion_r130235370 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java --- @@ -175,14 +175,21 @@ public boolean equals(Object obj) { return false; } + Snapshot snapshot; + + if (obj instanceof Snapshot) { + snapshot = (Snapshot)obj; + } else { --- End diff -- nit: if you place the condition for else first, declaration and assignment to snapshot can be done in one line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4356: [FLINK-5486] Fix lacking of synchronization in BucketingS...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4356 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4345: [FLINK-7197] [gelly] Missing call to GraphAlgorithmWrappi...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4345 Fine by me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4345: [FLINK-7197] [gelly] Missing call to GraphAlgorith...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4345#discussion_r127532176 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java --- @@ -56,7 +56,9 @@ public TranslateGraphIds(TranslateFunction<OLD, NEW> translator) { @Override protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) { - super.mergeConfiguration(other); + if (!super.canMergeConfigurationWith(other)) { --- End diff -- mergeConfiguration() wouldn't be called this way. Is that intended ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4315: [FLINK-5541] Missing null check for localJar in FlinkSubm...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4315 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4260#discussion_r127075409 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.BlobService; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.RunningJobsRegistry; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcMethod; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Base class for the Dispatcher component. The Dispatcher component is responsible + * for receiving job submissions, persisting them, spawning JobManagers to execute + * the jobs and to recover them in case of a master failure. Furthermore, it knows + * about the state of the Flink session cluster. + */ +public abstract class Dispatcher extends RpcEndpoint { + + public static final String DISPATCHER_NAME = "dispatcher"; + + private final SubmittedJobGraphStore submittedJobGraphStore; + private final RunningJobsRegistry runningJobsRegistry; + + private final HighAvailabilityServices highAvailabilityServices; + private final BlobServer blobServer; + private final HeartbeatServices heartbeatServices; + private final MetricRegistry metricRegistry; + + private final FatalErrorHandler fatalErrorHandler; + + private final Map<JobID, JobManagerRunner> jobManagerRunners; + + protected Dispatcher( + RpcService rpcService, + String endpointId, + HighAvailabilityServices highAvailabilityServices, + BlobServer blobServer, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry, + FatalErrorHandler fatalErrorHandler) throws Exception { + super(rpcService, endpointId); + + this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); + this.blobServer = Preconditions.checkNotNull(blobServer); + this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices); + this.metricRegistry = Preconditions.checkNotNull(metricRegistry); + this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); + + this.submittedJobGraphStore = highAvailabilityServices.getSubmittedJobGraphStore(); + this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry(); + +
[GitHub] flink issue #4202: [FLINK-6422] [core] Unreachable code in FileInputFormat#c...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4202 Can you take a look at the error in CI ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4151: [FLINK-6389] [connector] Upgrade hbase dependency to 1.3....
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4151 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4140: [FLINK-6943] Improve exceptions within TypeExtractionUtil...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4140 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4039: [FLINK-6783] Changed passing index of type argumen...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4039#discussion_r122570865 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java --- @@ -161,6 +164,77 @@ public static LambdaExecutable checkAndExtractLambda(Function function) throws T } /** +* Extracts type from given index from lambda. It supports nested types. +* +* @param exec lambda function to extract the type from +* @param lambdaTypeArgumentIndices position of type to extract in type hierarchy +* @param paramLen count of total parameters of the lambda (including closure parameters) +* @param baseParametersLen count of lambda interface parameters (without closure parameters) +* @return extracted type +*/ + public static Type extractTypeFromLambda( + LambdaExecutable exec, + int[] lambdaTypeArgumentIndices, + int paramLen, + int baseParametersLen) { + Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]]; + for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) { + output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]); + } + return output; + } + + /** +* * This method extracts the n-th type argument from the given type. An InvalidTypesException +* is thrown if the type does not have any type arguments or if the index exceeds the number +* of type arguments. +* +* @param t Type to extract the type arguments from +* @param index Index of the type argument to extract +* @return The extracted type argument +* @throws InvalidTypesException if the given type does not have any type arguments or if the +* index exceeds the number of type arguments. +*/ + public static Type extractTypeArgument(Type t, int index) throws InvalidTypesException { + if (t instanceof ParameterizedType) { + Type[] actualTypeArguments = ((ParameterizedType) t).getActualTypeArguments(); + + if (index < 0 || index >= actualTypeArguments.length) { + throw new InvalidTypesException("Cannot extract the type argument with index " + + index + " because the type has only " + actualTypeArguments.length + + " type arguments."); + } else { + return actualTypeArguments[index]; + } + } else { + throw new InvalidTypesException("The given type " + t + " is not a parameterized type."); + } + } + + /** +* Extracts a Single Abstract Method (SAM) as defined in Java Specification (4.3.2. The Class Object, +* 9.8 Functional Interfaces, 9.4.3 Interface Method Body) from given class. +* +* @param baseClass +* @throws InvalidTypesException if the given class does not implement +* @return +*/ + public static Method getSingleAbstractMethod(Class baseClass) { + Method sam = null; + for (Method method : baseClass.getMethods()) { + if (Modifier.isAbstract(method.getModifiers())) { + if (sam == null) { + sam = method; + } else { + throw new InvalidTypesException( + "Given class: " + baseClass + " is not a FunctionalInterface. It does not have a SAM."); --- End diff -- This message seems to be inexact: if there is no SAM, sam would be null upon returning from the method. I suggest changing the message and adding a check (for null sam) prior to returning. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3957: [FLINK-6332] [build] Upgrade Scala versions
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/3957 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3981: [FLINK-6646] [yarn] Let YarnJobManager delete Yarn...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3981#discussion_r118304278 --- Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala --- @@ -89,5 +92,37 @@ class YarnJobManager( flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5), TimeUnit.SECONDS) + val yarnFilesPath: Option[String] = Option(System.getenv().get(YarnConfigKeys.FLINK_YARN_FILES)) + override val jobPollingInterval = YARN_HEARTBEAT_DELAY + + override def handleMessage: Receive = { +handleYarnShutdown orElse super.handleMessage + } + + def handleYarnShutdown: Receive = { +case msg:StopCluster => + super.handleMessage(msg) + + // do global cleanup if the yarn files path has been set + yarnFilesPath match { +case Some(filePath) => + log.info(s"Deleting yarn application files under $filePath.") + + val path = new Path(filePath) + + try { +val fs = path.getFileSystem +fs.delete(path, true) --- End diff -- Please check the return value from delete() call --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3938: [FLINK-6624] [cep] Fix SharedBuffer#hashCode().
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/3938 Lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r117122323 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala --- @@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate( "non-windowed GroupBy aggregation.") } +val isCountWindow = window match { + case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => true + case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => true + case _ => false +} + +if (isCountWindow && grouping.length > 0 && queryConfig.getMinIdleStateRetentionTime < 0) { + LOG.warn( --- End diff -- Should this be error() ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3919: [FLINK-6583][talbe]Enable QueryConfig in count bas...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r117121382 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala --- @@ -245,6 +264,7 @@ object DataStreamGroupWindowAggregate { case SlidingGroupWindow(_, timeField, size, slide) if isProctimeAttribute(timeField) && isRowCountLiteral(size) => stream.countWindow(toLong(size), toLong(slide)) + .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide))); --- End diff -- Should this be toLong(size) ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3885: [FLINK-6552] Allow differing types for side output...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3885#discussion_r116611287 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -318,9 +318,10 @@ public void addVirtualSideOutputNode(Integer originalId, Integer virtualId, Outp continue; } - if (!tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) { - throw new IllegalArgumentException("Trying to add a side input for the same id " + - "with a different type. This is not allowed."); + if (tag.f1.getId().equals(outputTag.getId()) && + !tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) { + throw new IllegalArgumentException("Trying to add a side output for the same" + --- End diff -- minor: missing space after "same" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3892: [FLINK-6530] Close response in DatadogHttpClient
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/3892 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2422: FLINK-4499: [WIP] Introduce findbugs maven plugin
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/2422 Hadoop has switched to spotbugs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3839: FLINK-6474 Potential loss of precision in 32 bit i...
Github user tedyu closed the pull request at: https://github.com/apache/flink/pull/3839 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3839: FLINK-6474 Potential loss of precision in 32 bit integer ...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/3839 @StefanRRichter Can you take a look ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3839: FLINK-6474 Potential loss of precision in 32 bit i...
GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/3839 FLINK-6474 Potential loss of precision in 32 bit integer multiplication Cast numNetworkBuffers to long before multiplication. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3839.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3839 commit 3f35aab20f4f0c3b11dbee3cfd5bbbe0249a9aa5 Author: tedyu <yuzhih...@gmail.com> Date: 2017-05-07T19:20:27Z FLINK-6474 Potential loss of precision in 32 bit integer multiplication --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3837: [FLINK-6471] [checkpoint] Fix RocksDBStateBackendTest
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/3837 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3825: [FLINK-6445] [cep] Fix NPE in no-condition pattern...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3825#discussion_r114923660 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java --- @@ -32,8 +34,8 @@ private final IterativeCondition right; public AndCondition(final IterativeCondition left, final IterativeCondition right) { - this.left = left; - this.right = right; + this.left = Preconditions.checkNotNull(left, "The condition cannot be null."); --- End diff -- Better indicate whether left or right is null in the message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3541: [FLINK-6032] [cep] Clean-up operator state when no...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3541#discussion_r112071560 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java --- @@ -31,6 +31,18 @@ public double getVolume() { } @Override + public boolean equals(Object obj) { + return obj instanceof SubEvent && + super.equals(obj) && + ((SubEvent) obj).volume == volume; + } + + @Override + public int hashCode() { + return super.hashCode() + (int) volume; --- End diff -- Common practice is to multiply super.hashCode() by a prime (e.g. 37) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3541: [FLINK-6032] [cep] Clean-up operator state when no...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3541#discussion_r112071274 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java --- @@ -385,4 +393,25 @@ public int hashCode() { return getClass().hashCode(); } } + + // Testing Methods // + + @VisibleForTesting + public boolean hasNonEmptyNFA(KEY key) throws IOException { + setCurrentKey(key); + return nfaOperatorState.value() != null; + } + + @VisibleForTesting + public boolean hasNonEmptyPQ(KEY key) throws IOException { --- End diff -- These 3 methods can be declared package private. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3677: [FLINK-4848] [ssl] Throw meaningful exception when SSL is...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/3677 lgtm Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3608: FLINK-6169 yarnClient should be stopped in Abstrac...
Github user tedyu closed the pull request at: https://github.com/apache/flink/pull/3608 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3608: FLINK-6169 yarnClient should be stopped in Abstrac...
GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/3608 FLINK-6169 yarnClient should be stopped in AbstractYarnClusterDescriptor in case of error Stop yarnClient before throwing exception You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3608.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3608 commit f0528de9cc03f603b77d6adcd222ff085967b614 Author: tedyu <yuzhih...@gmail.com> Date: 2017-03-24T16:30:31Z FLINK-6169 yarnClient should be stopped in AbstractYarnClusterDescriptor in case of error --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3235: FLINK-5517 Upgrade hbase version to 1.3.0
GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/3235 FLINK-5517 Upgrade hbase version to 1.3.0 In the thread 'Help using HBase with Flink 1.1.4', Giuliano reported seeing: ``` java.lang.IllegalAccessError: tried to access method com.google.common.base.Stopwatch.()V from class org.apache.hadoop.hbase.zookeeper.MetaTableLocator ``` The above has been solved by HBASE-14963 Upgrading to hbase 1.3.0 release would give better user experience. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3235.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3235 commit b11e691e379760fec9bad6376ee277b5ed4c1912 Author: tedyu <yuzhih...@gmail.com> Date: 2017-01-30T00:44:04Z FLINK-5517 Upgrade hbase version to 1.3.0 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2824: [FLINK-5050] JSON.org license is CatX
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/2824 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2421: FLINK-4482 numUnsuccessfulCheckpointsTriggers is accessed...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/2421 The test failure in flink-mesos is unrelated to the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2421: FLINK-4482 numUnsuccessfulCheckpointsTriggers is a...
GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/2421 FLINK-4482 numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2421.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2421 commit 2021f4790715ecb762dbb23438bf0b2b2755845e Author: tedyu <yuzhih...@gmail.com> Date: 2016-08-25T18:11:44Z FLINK-4482 numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2409: FLINK-4437 Lock evasion around lastTriggeredCheckp...
Github user tedyu closed the pull request at: https://github.com/apache/flink/pull/2409 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2409: FLINK-4437 Lock evasion around lastTriggeredCheckpoint ma...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/2409 I ran test suite which patch which failed here: ``` Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 201.106 sec <<< FAILURE! - in org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase The JobManager should handle gracefully failing task manager with slot sharing(org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase) Time elapsed: 200.43 sec <<< ERROR! java.util.concurrent.TimeoutException: Futures timed out after [20 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86) at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.ready(package.scala:86) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.waitForTaskManagersToBeRegistered(FlinkMiniCluster.scala:455) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.waitForTaskManagersToBeRegistered(FlinkMiniCluster.scala:439) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:330) at org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:269) at org.apache.flink.runtime.testingUtils.TestingUtils$.startTestingCluster(TestingUtils.scala:86) at org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(TaskManagerFailsWithSlotSharingITCase.scala:73) at org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(TaskManagerFailsWithSlotSharingITCase.scala:53) at org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(TaskManagerFailsWithSlotSharingITCase.scala:53) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase.withFixture(TaskManagerFailsWithSlotSharingITCase.scala:38) ``` Doesn't seem to be related to patch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2409: FLINK-4437 Lock evasion around lastTriggeredCheckpoint ma...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/2409 ``` Executing Maven: -B -f /home/jenkins/jenkins-slave/workspace/flink-github-ci/pom.xml -Dmaven.repo.local=/home/jenkins/jenkins-slave/maven-repositories/1 clean install -Dflink.forkCount=1C ``` I don't see flink-github-ci in the source tree. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2409: FLINK-4437 Lock evasion around lastTriggeredCheckpoint ma...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/2409 What's the command line for running EventTimeWindowCheckpointingITCase alone ? I tried 'mvn test -Dtest=EventTimeWindowCheckpointingITCase' which didn't work. Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2409: FLINK-4437 Lock evasion around lastTriggeredCheckpoint ma...
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/2409 Pardon. See if the current formation is the same as your reproduction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2409: FLINK-4437 Lock evasion around lastTriggeredCheckp...
GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/2409 FLINK-4437 Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2409.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2409 commit 682bf10a70789c76317d460a9c994ebf00f5a516 Author: tedyu <yuzhih...@gmail.com> Date: 2016-08-23T19:10:03Z FLINK-4437 Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2402: [FLINK-4436] Unclosed DataOutputBuffer in Utils#se...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/2402#discussion_r75874655 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -164,6 +164,7 @@ public static void setTokensFor(ContainerLaunchContext amContainer, List p ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); amContainer.setTokens(securityTokens); + dob.close(); --- End diff -- Can you use try-with-resources here ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3301 Ineffective synchronization in Mess...
Github user tedyu closed the pull request at: https://github.com/apache/flink/pull/1560 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3301 Ineffective synchronization in Mess...
GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/1560 FLINK-3301 Ineffective synchronization in MessageAcknowledgingSourceBase#restoreState This PR changes synchronization on pendingCheckpoints to this (MessageAcknowledgingSourceBase) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1560.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1560 commit 18338c7c51c3f1ada3541ee988c8028d3a87d38a Author: tedyu <yuzhih...@gmail.com> Date: 2016-01-29T04:47:24Z FLINK-3301 Ineffective synchronization in MessageAcknowledgingSourceBase#restoreState --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3103] Remove synchronization in FsState...
Github user tedyu closed the pull request at: https://github.com/apache/flink/pull/1474 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3103 Remove synchronization in FsStateBa...
GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/1474 FLINK-3103 Remove synchronization in FsStateBackend#FsCheckpointStateOutputStream#close() Stephan made the following comment: I would actually suggest to not use a lock in the close() methods either. In a single threaded use, there is really no need for these locks. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1474.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1474 commit eb6dd48ee8a9a20889d7fcd994b8fc0d7df9ba1c Author: tedyu <yuzhih...@gmail.com> Date: 2015-12-21T17:29:16Z FLINK-3103 Remove synchronization in FsStateBackend#FsCheckpointStateOutputStream#close() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3005 Commons-collections object deserial...
GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/1381 FLINK-3005 Commons-collections object deserialization remote command ⦠â¦execution vulnerability You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1381.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1381 commit 216c41f20aef19b515d94f276ceda39a232ab689 Author: tedyu <yuzhih...@gmail.com> Date: 2015-11-18T21:56:31Z FLINK-3005 Commons-collections object deserialization remote command execution vulnerability --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2913] Close of ObjectOutputStream shoul...
GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/1353 [FLINK-2913] Close of ObjectOutputStream should be enclosed in finall⦠â¦y block in FsStateBackend You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1353.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1353 commit a00949b761efb0ceb6efb23a57095812c7ed2778 Author: tedyu <yuzhih...@gmail.com> Date: 2015-11-14T00:43:53Z [FLINK-2913] Close of ObjectOutputStream should be enclosed in finally block in FsStateBackend --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2826 transformed is modified in Broadcas...
GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/1339 FLINK-2826 transformed is modified in BroadcastVariableMaterializatio⦠â¦n#decrementReferenceInternal without proper locking You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1339.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1339 commit 76fa1f85a5b4dd7ca7b47290b45da7bd7835fbda Author: tedyu <yuzhih...@gmail.com> Date: 2015-11-08T15:43:03Z FLINK-2826 transformed is modified in BroadcastVariableMaterialization#decrementReferenceInternal without proper locking --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2866 Potential resource leak due to uncl...
GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/1282 FLINK-2866 Potential resource leak due to unclosed ObjectInputStream in FileSerializableStateHandle Use try-with-resources to close FSDataInputStream You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1282.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1282 commit 5e10e07d1649a672f79871b223b35fb5f86b6c31 Author: tedyu <yuzhih...@gmail.com> Date: 2015-10-22T02:40:21Z Use try-with-resources to close FSDataInputStream --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2595 Unclosed JarFile may leak resource ...
GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/1137 FLINK-2595 Unclosed JarFile may leak resource in ClassLoaderUtilsTest You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1137.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1137 commit 705fa05121a6a2732e2604148653705a419f4d60 Author: Ted <yuzhih...@gmail.com> Date: 2015-09-16T16:29:54Z FLINK-2595 Unclosed JarFile may leak resource in ClassLoaderUtilsTest --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2605 Unclosed RandomAccessFile may leak ...
Github user tedyu commented on the pull request: https://github.com/apache/flink/pull/1089#issuecomment-137772269 Your reasoning w.r.t. current scope of try block was the same as mine. I think it is good to make the scope as accurate as possible. Generally speaking, we don't want to react to unrelated exception. Please keep the current formation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2605 Unclosed RandomAccessFile may leak ...
GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/1089 FLINK-2605 Unclosed RandomAccessFile may leak resource in StaticFileS⦠â¦erverHandler You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1089.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1089 commit d900be453f9ebac68bcf7e4c75b8d80d0fa8b2c5 Author: tedyu <yuzhih...@gmail.com> Date: 2015-09-03T14:11:32Z FLINK-2605 Unclosed RandomAccessFile may leak resource in StaticFileServerHandler --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2322 Unclosed stream may leak resource
Github user tedyu commented on the pull request: https://github.com/apache/flink/pull/928#issuecomment-127074789 I looked at https://travis-ci.org/apache/flink/jobs/73803261 but couldn't figure out which test failed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2322 Unclosed stream may leak resource
Github user tedyu commented on the pull request: https://github.com/apache/flink/pull/928#issuecomment-126021923 Anything I can do to move this forward ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2322 Unclosed stream may leak resource
Github user tedyu commented on the pull request: https://github.com/apache/flink/pull/928#issuecomment-125010977 I used the following command: mvn clean package -DskipTests eclipse:eclipse However, under flink-java, I only found one .classpath file: /Users/tyu/flink/flink-java/target/.classpath How do people generate project file for Eclipse ? Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2322 Unclosed stream may leak resource
Github user tedyu commented on the pull request: https://github.com/apache/flink/pull/928#issuecomment-123689320 I did use tabs. I have the following in .vimrc: set tabstop=2numbers of spaces of tab character --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---