[GitHub] flink pull request #4560: Flink 7077

2017-08-18 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4560#discussion_r134003593
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -960,12 +965,26 @@ public void handleError(final Exception exception) {
 
@Override
public void releaseResource(InstanceID instanceId) {
-   stopWorker(instanceId);
+   runAsync(new Runnable() {
+   @Override
+   public void run() {
+   for (Map.Entry entry : taskExecutors.entrySet()) {
+   if 
(entry.getValue().getInstanceID().equals(instanceId)) {
+   
stopWorker(entry.getKey());
--- End diff --

I was thinking along those same lines - the slot manager deals with 
`InstanceID` mostly, and its log lines are tough to correlate with lower-level 
resource manager information that is `ResourceID` based.   It would be a nice 
improvement to make `InstanceID` a composite key that included `ResourceID`.

Regardless, I think the `stopWorker` method should use `ResourceID`, 
because `InstanceID` isn't a concept exposed to the RM subclasses.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133557#comment-16133557
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user rangadi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134044031
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[GitHub] flink pull request #4239: [FLINK-6988] flink-connector-kafka-0.11 with exact...

2017-08-18 Thread rangadi
Github user rangadi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134044031
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.11.x. By default producer
+ * will use {@link Semantic#EXACTLY_ONCE} semantic.
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular
+ * {@link 

[GitHub] flink pull request #4560: Flink 7077

2017-08-18 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4560#discussion_r134004745
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
 ---
@@ -68,7 +68,9 @@ public boolean isIdle() {
}
 
public void markIdle() {
-   idleSince = System.currentTimeMillis();
+   if (!isIdle()) {
+   idleSince = System.currentTimeMillis();
+   }
--- End diff --

The reason that the SM idleness unit tests didn't catch this is, `markIdle` 
is called when a slot report is received, which in practice occurs continuously 
but is tough to simulate.  Just making clear that there are idle tests but I 
didn't see an obvious way to improve them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7479) Support to retrieve the past event by physical offset

2017-08-18 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133888#comment-16133888
 ] 

Dian Fu commented on FLINK-7479:


Hi [~kkl0u], 
{quote}
Before opening a PR for a feature, we first have to discuss iff we want to 
include that feature in the library. 
If we do not go like this, then the only thing that will happen is that we pile 
up PRs that we do not know how to handle at the end.
{quote}
Agree. I will create the PR until we agree on the feature next time. Thanks for 
remind. :)

{quote}
I have not looked at the code itself, but from the topic, this just seems to 
just be syntactic sugar on top of the existing APIs.
{quote}
I don't think so. The tests in the PR can show that it's not able to achieve 
this functionality on top of the existing APIs.

{quote}
Until users start using the library and ask for features, and given that this 
is a first iteration of the CEP/SQL integration, 
I would suggest to keep it to a minimal and avoid stuff that only contribute 
"facilitation methods" for already existing functionality.
{quote}
Totally agree. I will try to keep the CEP/SQL integration feature a minimal and 
only adding features when required.  For the {{PREV}} clause, I think it should 
be included in the minimal feature as it's one important building block for 
pattern defination.

> Support to retrieve the past event by physical offset 
> --
>
> Key: FLINK-7479
> URL: https://issues.apache.org/jira/browse/FLINK-7479
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, it's already able to retrieve events matched to the specifed 
> pattern in {{IterativeCondition.Context}}. While there are also requirements 
> to retrieve events by an physical offset. The retrieved events may not be 
> matched to any pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7479) Support to retrieve the past event by physical offset

2017-08-18 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133888#comment-16133888
 ] 

Dian Fu edited comment on FLINK-7479 at 8/19/17 2:27 AM:
-

Hi [~kkl0u], 
Thanks a lot for your reply.
{quote}
Before opening a PR for a feature, we first have to discuss iff we want to 
include that feature in the library. 
If we do not go like this, then the only thing that will happen is that we pile 
up PRs that we do not know how to handle at the end.
{quote}
Agree. I will create the PR until we agree on the feature next time. Thanks for 
remind. :)

{quote}
I have not looked at the code itself, but from the topic, this just seems to 
just be syntactic sugar on top of the existing APIs.
{quote}
I don't think so. The tests in the PR can show that it's not able to achieve 
this functionality on top of the existing APIs.

{quote}
Until users start using the library and ask for features, and given that this 
is a first iteration of the CEP/SQL integration, 
I would suggest to keep it to a minimal and avoid stuff that only contribute 
"facilitation methods" for already existing functionality.
{quote}
Totally agree. I will try to keep the CEP/SQL integration feature a minimal and 
only adding features when required.  For the {{PREV}} clause, I think it should 
be included in the minimal feature as it's one important building block for 
pattern defination.


was (Author: dian.fu):
Hi [~kkl0u], 
{quote}
Before opening a PR for a feature, we first have to discuss iff we want to 
include that feature in the library. 
If we do not go like this, then the only thing that will happen is that we pile 
up PRs that we do not know how to handle at the end.
{quote}
Agree. I will create the PR until we agree on the feature next time. Thanks for 
remind. :)

{quote}
I have not looked at the code itself, but from the topic, this just seems to 
just be syntactic sugar on top of the existing APIs.
{quote}
I don't think so. The tests in the PR can show that it's not able to achieve 
this functionality on top of the existing APIs.

{quote}
Until users start using the library and ask for features, and given that this 
is a first iteration of the CEP/SQL integration, 
I would suggest to keep it to a minimal and avoid stuff that only contribute 
"facilitation methods" for already existing functionality.
{quote}
Totally agree. I will try to keep the CEP/SQL integration feature a minimal and 
only adding features when required.  For the {{PREV}} clause, I think it should 
be included in the minimal feature as it's one important building block for 
pattern defination.

> Support to retrieve the past event by physical offset 
> --
>
> Key: FLINK-7479
> URL: https://issues.apache.org/jira/browse/FLINK-7479
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, it's already able to retrieve events matched to the specifed 
> pattern in {{IterativeCondition.Context}}. While there are also requirements 
> to retrieve events by an physical offset. The retrieved events may not be 
> matched to any pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4560: Flink 7077

2017-08-18 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4560
  
@tillrohrmann fixed the test issue and other noted issues.  Once #4555 is 
in, this is good to go.

The test issue was a race in RM acquiring leadership vs RPC call.   Added 
some code to `TestLeaderElectionService` to wait for leader ack and updated the 
base RM and Mesos RM to use it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-7129) Dynamically changing patterns

2017-08-18 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131782#comment-16131782
 ] 

Dian Fu edited comment on FLINK-7129 at 8/18/17 6:22 AM:
-

Hi [~dawidwys],
Thanks a lot for your work here. I'm very interested in this feature and do you 
mind to share the status of the ticket? Besides, I'm also trying to figure out 
the purpose of this ticket. Do we want to support multiple patterns in the same 
stream? If yes, then which one of the following option does that mean:
1) events are separated by some strategy and one event will try to match only 
one of the NFAs?
2) each event will try to match all the NFAs?


was (Author: dian.fu):
Hi [~dawidwys],
Thanks a lot for your work here. I'm very interested in this feature and do you 
mind to share the status of the ticket? Besides, does this ticket want to 
support multiple patterns in the same stream? That is, different events will 
try to match different NFA?

> Dynamically changing patterns
> -
>
> Key: FLINK-7129
> URL: https://issues.apache.org/jira/browse/FLINK-7129
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> An umbrella task for introducing mechanism for injecting patterns through 
> coStream



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7475) ListState support update

2017-08-18 Thread yf (JIRA)
yf created FLINK-7475:
-

 Summary: ListState support update
 Key: FLINK-7475
 URL: https://issues.apache.org/jira/browse/FLINK-7475
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: yf


If I want to update the list. 
I have to do two steps: 
listState.clear() 
for (Element e : myList) { 
listState.add(e); 
} 

Why not I update the state by: 
listState.update(myList) ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131779#comment-16131779
 ] 

ASF GitHub Bot commented on FLINK-7442:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4554
  
This is the exception I got with the class loader from 
https://github.com/apache/flink/blob/fa11845b926f8371e9cee47775ca0e48176b686e/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java:
```
org.apache.flink.client.program.ProgramInvocationException: Could not start 
the ActorSystem needed to talk to the JobManager.
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:462)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at 
org.apache.flink.streaming.tests.ClassLoaderTestProgram.main(ClassLoaderTestProgram.java:95)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:538)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:430)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:840)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:285)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.util.FlinkException: Could not start the 
ActorSystem lazily.
at 
org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:235)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:460)
... 22 more
Caused by: com.typesafe.config.ConfigException$Missing: No configuration 
setting found for key 'akka.version'
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
at 
com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:206)
at akka.actor.ActorSystem$Settings.(ActorSystem.scala:169)
at akka.actor.ActorSystemImpl.(ActorSystem.scala:505)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
at 
org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:106)
at 
org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:94)
at 
org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala)
at 
org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:231)
... 23 more
```

This is on the client but just seeing it makes me believe that that class 
loader could cause problems. 


> Add option for using a child-first classloader for loading user code
> 
>
> Key: FLINK-7442
> URL: https://issues.apache.org/jira/browse/FLINK-7442
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>




--
This message was sent by 

[GitHub] flink issue #4554: [FLINK-7442] Add option for using child-first classloader...

2017-08-18 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4554
  
This is the exception I got with the class loader from 
https://github.com/apache/flink/blob/fa11845b926f8371e9cee47775ca0e48176b686e/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java:
```
org.apache.flink.client.program.ProgramInvocationException: Could not start 
the ActorSystem needed to talk to the JobManager.
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:462)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at 
org.apache.flink.streaming.tests.ClassLoaderTestProgram.main(ClassLoaderTestProgram.java:95)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:538)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:430)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:840)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:285)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.util.FlinkException: Could not start the 
ActorSystem lazily.
at 
org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:235)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:460)
... 22 more
Caused by: com.typesafe.config.ConfigException$Missing: No configuration 
setting found for key 'akka.version'
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
at 
com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:206)
at akka.actor.ActorSystem$Settings.(ActorSystem.scala:169)
at akka.actor.ActorSystemImpl.(ActorSystem.scala:505)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
at 
org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:106)
at 
org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:94)
at 
org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala)
at 
org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:231)
... 23 more
```

This is on the client but just seeing it makes me believe that that class 
loader could cause problems. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7129) Dynamically changing patterns

2017-08-18 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131782#comment-16131782
 ] 

Dian Fu commented on FLINK-7129:


Hi [~dawidwys],
Thanks a lot for your work here. I'm very interested in this feature and do you 
mind to share the status of the ticket? Besides, does this ticket want to 
support multiple patterns in the same stream? That is, different events will 
try to match different NFA?

> Dynamically changing patterns
> -
>
> Key: FLINK-7129
> URL: https://issues.apache.org/jira/browse/FLINK-7129
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> An umbrella task for introducing mechanism for injecting patterns through 
> coStream



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7479) Support to retrieve the past event by physical offset

2017-08-18 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133628#comment-16133628
 ] 

Kostas Kloudas commented on FLINK-7479:
---

Hi [~dian.fu], 

Before opening a PR for a feature, we first have to discuss iff we want to 
include that feature in the library. 
If we do not go like this, then the only thing that will happen is that we pile 
up PRs that we do not know how to handle at the end. 

I have not looked at the code itself, but from the topic, this just seems to 
just be syntactic sugar on top of the existing APIs.
It may be necessary for completeness, as it is in the SQL specification, but a 
specification is just a specification until users start using it
or asking for it. Until users start using the library and ask for features, and 
given that this is a first iteration of the CEP/SQL integration, 
I would suggest to keep it to a minimal and avoid stuff that only contribute 
"facilitation methods" for already existing functionality.


> Support to retrieve the past event by physical offset 
> --
>
> Key: FLINK-7479
> URL: https://issues.apache.org/jira/browse/FLINK-7479
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, it's already able to retrieve events matched to the specifed 
> pattern in {{IterativeCondition.Context}}. While there are also requirements 
> to retrieve events by an physical offset. The retrieved events may not be 
> matched to any pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131812#comment-16131812
 ] 

ASF GitHub Bot commented on FLINK-7245:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4530
  
I see, didn't consider the private timeServiceManager. Maybe this is the 
best approach then. 


> Enhance the operators to support holding back watermarks
> 
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>   if (timeServiceManager != null) {
>   timeServiceManager.advanceWatermark(mark);
>   }
>   output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

2017-08-18 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4530
  
I see, didn't consider the private timeServiceManager. Maybe this is the 
best approach then. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131858#comment-16131858
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4238


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> 
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131859#comment-16131859
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4358
  
I've merged #4238. Could you please rebase onto the latest master? 
Moreover, I've noticed that 
`JobManagerCleanupITCase.testBlobServerCleanupCancelledJob` failed in one of 
the Travis runs. Not sure if this is related.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4558: [FLINK-7056][tests][hotfix] make sure the client a...

2017-08-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4558


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4238


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7056) add API to allow job-related BLOBs to be stored

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131857#comment-16131857
 ] 

ASF GitHub Bot commented on FLINK-7056:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4558


> add API to allow job-related BLOBs to be stored
> ---
>
> Key: FLINK-7056
> URL: https://issues.apache.org/jira/browse/FLINK-7056
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
> Fix For: 1.4.0
>
>
> To ease cleanup, we will make job-related BLOBs be reflected in the blob 
> storage so that they may be removed along with the job. This adds the jobId 
> to many methods similar to the previous code from the {{NAME_ADDRESSABLE}} 
> mode.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4358: [FLINK-7068][blob] change BlobService sub-classes for per...

2017-08-18 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4358
  
I've merged #4238. Could you please rebase onto the latest master? 
Moreover, I've noticed that 
`JobManagerCleanupITCase.testBlobServerCleanupCancelledJob` failed in one of 
the Travis runs. Not sure if this is related.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-7347) "removeAll" is extremely inefficient in MessageAcknowledgingSourceBase.notifyCheckpointComplete

2017-08-18 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7347:

Fix Version/s: 1.4.0

> "removeAll" is extremely inefficient in 
> MessageAcknowledgingSourceBase.notifyCheckpointComplete
> ---
>
> Key: FLINK-7347
> URL: https://issues.apache.org/jira/browse/FLINK-7347
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.1
>Reporter: Yonatan Most
> Fix For: 1.4.0
>
>
> Observe this line in 
> {{MessageAcknowledgingSourceBase.notifyCheckpointComplete}}:
> {code}
> idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
> {code}
> The implementation of {{removeAll}} is such that if the set is smaller than 
> the collection to remove, then the set is iterated and every item is checked 
> for containment in the collection. The type of {{checkpoint.f1}} here is 
> {{ArrayList}}, so the {{contains}} action is very inefficient, and it is 
> performed for every item in {{idsProcessedButNotAcknowledged}}.
> In our pipeline we had about 10 million events processed, and the checkpoint 
> was stuck on the {{removeAll}} call for hours.
> A simple solution is to make {{idsForCurrentCheckpoint}} a {{HashSet}} 
> instead of an {{ArrayList}}. The fact that it's a list is not really used 
> anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131846#comment-16131846
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4238
  
Thanks for addressing the review comments @NicoK. Travis passed. Will merge 
this PR.


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> 
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4238: [FLINK-7057][blob] move BLOB ref-counting from LibraryCac...

2017-08-18 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4238
  
Thanks for addressing the review comments @NicoK. Travis passed. Will merge 
this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3089) State API Should Support Data Expiration (State TTL)

2017-08-18 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131864#comment-16131864
 ] 

Sihua Zhou commented on FLINK-3089:
---

[~richtesn] I'm not sure if I understand the problem you said correctly. Could 
you please explain more details, thanks.

> State API Should Support Data Expiration (State TTL)
> 
>
> Key: FLINK-3089
> URL: https://issues.apache.org/jira/browse/FLINK-3089
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Niels Basjes
>
> In some usecases (webanalytics) there is a need to have a state per visitor 
> on a website (i.e. keyBy(sessionid) ).
> At some point the visitor simply leaves and no longer creates new events (so 
> a special 'end of session' event will not occur).
> The only way to determine that a visitor has left is by choosing a timeout, 
> like "After 30 minutes no events we consider the visitor 'gone'".
> Only after this (chosen) timeout has expired should we discard this state.
> In the Trigger part of Windows we can set a timer and close/discard this kind 
> of information. But that introduces the buffering effect of the window (which 
> in some scenarios is unwanted).
> What I would like is to be able to set a timeout on a specific state which I 
> can update afterwards.
> This makes it possible to create a map function that assigns the right value 
> and that discards the state automatically.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4554: [FLINK-7442] Add option for using child-first classloader...

2017-08-18 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4554
  
Probably, yes. I also don't know what other stuff this could break, though. 
For example, I don't know if queryable state, which starts netty (and Akka) 
stuff will still work with this. Unfortunately we don't have any end-to-end 
tests for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131926#comment-16131926
 ] 

ASF GitHub Bot commented on FLINK-7442:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4554
  
Probably, yes. I also don't know what other stuff this could break, though. 
For example, I don't know if queryable state, which starts netty (and Akka) 
stuff will still work with this. Unfortunately we don't have any end-to-end 
tests for that.


> Add option for using a child-first classloader for loading user code
> 
>
> Key: FLINK-7442
> URL: https://issues.apache.org/jira/browse/FLINK-7442
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4562: [FLINK-7402] Fix ineffective null check in NettyMe...

2017-08-18 Thread zjureel
GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/4562

[FLINK-7402] Fix ineffective null check in NettyMessage#write()

## What is the purpose of the change

Fix ineffective null check in NettyMessage#write()

## Brief change log

*(for example:)*
  - *Add null check and remove unnecessary null check*


## Verifying this change

*(Please pick either of the following options)*

No test case

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-7402

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4562.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4562


commit e289b37cf8adce56876b47ee33ed161058bf64e6
Author: zjureel 
Date:   2017-08-18T06:43:34Z

[FLINK-7402] Fix ineffective null check in NettyMessage#write()




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-7347) "removeAll" is extremely inefficient in MessageAcknowledgingSourceBase.notifyCheckpointComplete

2017-08-18 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-7347:
---

Assignee: Yonatan Most

> "removeAll" is extremely inefficient in 
> MessageAcknowledgingSourceBase.notifyCheckpointComplete
> ---
>
> Key: FLINK-7347
> URL: https://issues.apache.org/jira/browse/FLINK-7347
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.1
>Reporter: Yonatan Most
>Assignee: Yonatan Most
> Fix For: 1.4.0
>
>
> Observe this line in 
> {{MessageAcknowledgingSourceBase.notifyCheckpointComplete}}:
> {code}
> idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
> {code}
> The implementation of {{removeAll}} is such that if the set is smaller than 
> the collection to remove, then the set is iterated and every item is checked 
> for containment in the collection. The type of {{checkpoint.f1}} here is 
> {{ArrayList}}, so the {{contains}} action is very inefficient, and it is 
> performed for every item in {{idsProcessedButNotAcknowledged}}.
> In our pipeline we had about 10 million events processed, and the checkpoint 
> was stuck on the {{removeAll}} call for hours.
> A simple solution is to make {{idsForCurrentCheckpoint}} a {{HashSet}} 
> instead of an {{ArrayList}}. The fact that it's a list is not really used 
> anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-18 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131805#comment-16131805
 ] 

Fabian Hueske commented on FLINK-7446:
--

That's in fact a good question. In principle we could avoid timestamp 
extraction. However, we need to generate watermarks and both things go together 
in the DataStream API. I see two options: generate timestamps and watermarks 
and validate that field and extracted timestamp are the same or create new 
interfaces / operators that generate watermarks based on a field. 

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4554: [FLINK-7442] Add option for using child-first classloader...

2017-08-18 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4554
  
Ah, I see - that is probably related to reading resources from a jar file 
(like the config), methods like `getResourceAsStream()`.
I can see that the "delegate to parent after child" lookup logic might not 
quite work for that, yes.

I think that particular bug you encountered there is almost an argument to 
not change the logic on the client, yet. Since we would run the entire Flink 
code through that classloader as well, the implications are trickier than in 
the runtime task, where we only instantiate the user code functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...

2017-08-18 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4239
  
Implemented fixed size pool of producers, please check last commit.

If we run out of producers in the pool, exception is being thrown aborting 
ongoing snapshot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131913#comment-16131913
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4239
  
Implemented fixed size pool of producers, please check last commit.

If we run out of producers in the pool, exception is being thrown aborting 
ongoing snapshot.


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4560: Flink 7077

2017-08-18 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4560
  
Thanks for you contribution @EronWright. `MesosResourceManagerTest` seems 
to fail on Travis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Reopened] (FLINK-7300) End-to-end tests are instable on Travis

2017-08-18 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reopened FLINK-7300:
--

The issue appeared again: https://travis-ci.org/apache/flink/jobs/265362414

I think the reason for the sporadic failures needs a bit more investigation.

> End-to-end tests are instable on Travis
> ---
>
> Key: FLINK-7300
> URL: https://issues.apache.org/jira/browse/FLINK-7300
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> It seems like the end-to-end tests are instable, causing the {{misc}} build 
> profile to sporadically fail.
> Incorrect matched output:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/258569408/log.txt?X-Amz-Expires=30=20170731T060526Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request=host=4ef9ff5e60fe06db53a84be8d73775a46cb595a8caeb806b05dbbf824d3b69e8
> Another failure example of a different cause then the above, also on the 
> end-to-end tests:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/258841693/log.txt?X-Amz-Expires=30=20170731T060007Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request=host=4a106b3990228b7628c250cc15407bc2c131c8332e1a94ad68d649fe8d32d726



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-3089) State API Should Support Data Expiration (State TTL)

2017-08-18 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131845#comment-16131845
 ] 

Sihua Zhou commented on FLINK-3089:
---

[~stefanrichte...@gmail.com] i think ttl is also fine for 
checkpoints/savepoints. After performing recovery/resume, everything (include 
the source's offset, e.g. kafka ...) revert to the point where 
checkpoints/savepoints ware happen.

> State API Should Support Data Expiration (State TTL)
> 
>
> Key: FLINK-3089
> URL: https://issues.apache.org/jira/browse/FLINK-3089
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Niels Basjes
>
> In some usecases (webanalytics) there is a need to have a state per visitor 
> on a website (i.e. keyBy(sessionid) ).
> At some point the visitor simply leaves and no longer creates new events (so 
> a special 'end of session' event will not occur).
> The only way to determine that a visitor has left is by choosing a timeout, 
> like "After 30 minutes no events we consider the visitor 'gone'".
> Only after this (chosen) timeout has expired should we discard this state.
> In the Trigger part of Windows we can set a timer and close/discard this kind 
> of information. But that introduces the buffering effect of the window (which 
> in some scenarios is unwanted).
> What I would like is to be able to set a timeout on a specific state which I 
> can update afterwards.
> This makes it possible to create a map function that assigns the right value 
> and that discards the state automatically.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-18 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7057.
--
   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via 7b23624066c46d58c7b7181e5576a9834af9ac7a

> move BLOB ref-counting from LibraryCacheManager to BlobCache
> 
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
> Fix For: 1.4.0
>
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7056) add API to allow job-related BLOBs to be stored

2017-08-18 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131862#comment-16131862
 ] 

Till Rohrmann commented on FLINK-7056:
--

Hotfix applied for test stability via d0a150609b46cabfe7f5f0d760c465dcee5588fb

> add API to allow job-related BLOBs to be stored
> ---
>
> Key: FLINK-7056
> URL: https://issues.apache.org/jira/browse/FLINK-7056
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
> Fix For: 1.4.0
>
>
> To ease cleanup, we will make job-related BLOBs be reflected in the blob 
> storage so that they may be removed along with the job. This adds the jobId 
> to many methods similar to the previous code from the {{NAME_ADDRESSABLE}} 
> mode.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7476) Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction

2017-08-18 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-7476:
-

 Summary: Try to recover from failure in 
TwoPhaseCommitSinkFunction.beginTransaction
 Key: FLINK-7476
 URL: https://issues.apache.org/jira/browse/FLINK-7476
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski


Currently when using TwoPhaseCommitSinkFunction, if there is some intermittent 
failure in "beginTransaction", not only the snapshot that triggered this call 
fail, but also any subsequent write requests will fail also, rendering such 
sink unusable until application restart.

This issue is in code that hasn't been released yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131889#comment-16131889
 ] 

ASF GitHub Bot commented on FLINK-7442:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4554
  
Ah, I see - that is probably related to reading resources from a jar file 
(like the config), methods like `getResourceAsStream()`.
I can see that the "delegate to parent after child" lookup logic might not 
quite work for that, yes.

I think that particular bug you encountered there is almost an argument to 
not change the logic on the client, yet. Since we would run the entire Flink 
code through that classloader as well, the implications are trickier than in 
the runtime task, where we only instantiate the user code functions.


> Add option for using a child-first classloader for loading user code
> 
>
> Key: FLINK-7442
> URL: https://issues.apache.org/jira/browse/FLINK-7442
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7476) Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction

2017-08-18 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-7476:

Fix Version/s: 1.4.0

> Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction
> --
>
> Key: FLINK-7476
> URL: https://issues.apache.org/jira/browse/FLINK-7476
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Currently when using TwoPhaseCommitSinkFunction, if there is some 
> intermittent failure in "beginTransaction", not only the snapshot that 
> triggered this call fail, but also any subsequent write requests will fail 
> also, rendering such sink unusable until application restart.
> This issue is in code that hasn't been released yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7476) Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction

2017-08-18 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-7476:

Affects Version/s: 1.4.0

> Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction
> --
>
> Key: FLINK-7476
> URL: https://issues.apache.org/jira/browse/FLINK-7476
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Currently when using TwoPhaseCommitSinkFunction, if there is some 
> intermittent failure in "beginTransaction", not only the snapshot that 
> triggered this call fail, but also any subsequent write requests will fail 
> also, rendering such sink unusable until application restart.
> This issue is in code that hasn't been released yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4561: [FLINK-7476][streaming] Continue using previous tr...

2017-08-18 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4561

 [FLINK-7476][streaming] Continue using previous transaction on failures

First commit comes from #4557.

Previously when using TwoPhaseCommitSinkFunction, if there was some 
intermittent failure in "beginTransaction", not only the snapshot that 
triggered this call failed, but also any subsequent write requests would fail 
also. This caused such sink unusable until application restart.

This PR changes order of execution of the methods from `PublicEvolving` 
class that has not been yet released.

PR is covered by existing tests in `TwoPhaseCommitSinkFunctionTest` as well 
as two additional test cases (`testContinueWorkOnBeginTransactionFailure` would 
be failing before this PR).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink 2phase-recover

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4561.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4561


commit 813c8121b406b60ea478f4765b33b7d75c221d1e
Author: Piotr Nowojski 
Date:   2017-08-14T14:40:45Z

[hotifx][streaming] Simplify state of TwoPhaseCommitSinkFunction

commit 249b6419af3d15414dc411838aba624d0ee2f3a1
Author: Piotr Nowojski 
Date:   2017-08-14T13:09:39Z

[hotfix][tests] Implement AutoCloseable in TestHarness

commit a0ae6324dcaded581a3352c8ff4bae6e86e01fde
Author: Piotr Nowojski 
Date:   2017-08-17T13:46:47Z

[hotfix][streaming] Refactor TwoPhaseCommitSinkFunctionTest

commit e3c7dc83ccbce2505be5769fa0827b09dfa54875
Author: Piotr Nowojski 
Date:   2017-08-17T10:29:16Z

[FLINK-7476][streaming] Continue using previous transaction on failures

Previuosly when using TwoPhaseCommitSinkFunction, if there was some 
intermittent failure
in "beginTransaction", not only the snapshot that triggered this call 
failed, but also
any subsequent write requests would fail also. This caused such sink 
unusable until
application restart.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7476) Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131907#comment-16131907
 ] 

ASF GitHub Bot commented on FLINK-7476:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4561

 [FLINK-7476][streaming] Continue using previous transaction on failures

First commit comes from #4557.

Previously when using TwoPhaseCommitSinkFunction, if there was some 
intermittent failure in "beginTransaction", not only the snapshot that 
triggered this call failed, but also any subsequent write requests would fail 
also. This caused such sink unusable until application restart.

This PR changes order of execution of the methods from `PublicEvolving` 
class that has not been yet released.

PR is covered by existing tests in `TwoPhaseCommitSinkFunctionTest` as well 
as two additional test cases (`testContinueWorkOnBeginTransactionFailure` would 
be failing before this PR).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink 2phase-recover

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4561.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4561


commit 813c8121b406b60ea478f4765b33b7d75c221d1e
Author: Piotr Nowojski 
Date:   2017-08-14T14:40:45Z

[hotifx][streaming] Simplify state of TwoPhaseCommitSinkFunction

commit 249b6419af3d15414dc411838aba624d0ee2f3a1
Author: Piotr Nowojski 
Date:   2017-08-14T13:09:39Z

[hotfix][tests] Implement AutoCloseable in TestHarness

commit a0ae6324dcaded581a3352c8ff4bae6e86e01fde
Author: Piotr Nowojski 
Date:   2017-08-17T13:46:47Z

[hotfix][streaming] Refactor TwoPhaseCommitSinkFunctionTest

commit e3c7dc83ccbce2505be5769fa0827b09dfa54875
Author: Piotr Nowojski 
Date:   2017-08-17T10:29:16Z

[FLINK-7476][streaming] Continue using previous transaction on failures

Previuosly when using TwoPhaseCommitSinkFunction, if there was some 
intermittent failure
in "beginTransaction", not only the snapshot that triggered this call 
failed, but also
any subsequent write requests would fail also. This caused such sink 
unusable until
application restart.




> Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction
> --
>
> Key: FLINK-7476
> URL: https://issues.apache.org/jira/browse/FLINK-7476
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Currently when using TwoPhaseCommitSinkFunction, if there is some 
> intermittent failure in "beginTransaction", not only the snapshot that 
> triggered this call fail, but also any subsequent write requests will fail 
> also, rendering such sink unusable until application restart.
> This issue is in code that hasn't been released yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI

2017-08-18 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131948#comment-16131948
 ] 

Jark Wu commented on FLINK-7465:


I think the problem is how to design the state. Even if it is a bitarray or 
bitmap, it is expensive to de/serialize the state when every time call the 
{{accumulate}}.

> Add build-in BloomFilterCount on TableAPI
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7402) Ineffective null check in NettyMessage#write()

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131944#comment-16131944
 ] 

ASF GitHub Bot commented on FLINK-7402:
---

GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/4562

[FLINK-7402] Fix ineffective null check in NettyMessage#write()

## What is the purpose of the change

Fix ineffective null check in NettyMessage#write()

## Brief change log

*(for example:)*
  - *Add null check and remove unnecessary null check*


## Verifying this change

*(Please pick either of the following options)*

No test case

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-7402

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4562.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4562


commit e289b37cf8adce56876b47ee33ed161058bf64e6
Author: zjureel 
Date:   2017-08-18T06:43:34Z

[FLINK-7402] Fix ineffective null check in NettyMessage#write()




> Ineffective null check in NettyMessage#write()
> --
>
> Key: FLINK-7402
> URL: https://issues.apache.org/jira/browse/FLINK-7402
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the null check in finally block:
> {code}
>   finally {
> if (buffer != null) {
>   buffer.recycle();
> }
> {code}
> But buffer has been dereferenced in the try block without guard.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7347) "removeAll" is extremely inefficient in MessageAcknowledgingSourceBase.notifyCheckpointComplete

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131967#comment-16131967
 ] 

ASF GitHub Bot commented on FLINK-7347:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4468
  
Thanks a lot for contributing this fix, @ymost!  

I merged into master, could you please close this PR?


> "removeAll" is extremely inefficient in 
> MessageAcknowledgingSourceBase.notifyCheckpointComplete
> ---
>
> Key: FLINK-7347
> URL: https://issues.apache.org/jira/browse/FLINK-7347
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.1
>Reporter: Yonatan Most
>Assignee: Yonatan Most
> Fix For: 1.4.0
>
>
> Observe this line in 
> {{MessageAcknowledgingSourceBase.notifyCheckpointComplete}}:
> {code}
> idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
> {code}
> The implementation of {{removeAll}} is such that if the set is smaller than 
> the collection to remove, then the set is iterated and every item is checked 
> for containment in the collection. The type of {{checkpoint.f1}} here is 
> {{ArrayList}}, so the {{contains}} action is very inefficient, and it is 
> performed for every item in {{idsProcessedButNotAcknowledged}}.
> In our pipeline we had about 10 million events processed, and the checkpoint 
> was stuck on the {{removeAll}} call for hours.
> A simple solution is to make {{idsForCurrentCheckpoint}} a {{HashSet}} 
> instead of an {{ArrayList}}. The fact that it's a list is not really used 
> anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7477) Use "hadoop classpath" to augment classpath when available

2017-08-18 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-7477:
---

 Summary: Use "hadoop classpath" to augment classpath when available
 Key: FLINK-7477
 URL: https://issues.apache.org/jira/browse/FLINK-7477
 Project: Flink
  Issue Type: Bug
  Components: Startup Shell Scripts
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


Currently, some cloud environments don't properly put the Hadoop jars into 
{{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should 
check in {{config.sh}} if the {{hadoop}} binary is on the path and augment our 
{{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in our 
scripts.

This will improve the out-of-box experience of users that otherwise have to 
manually set {{HADOOP_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7423) Always reuse an instance to get elements from the inputFormat

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131993#comment-16131993
 ] 

ASF GitHub Bot commented on FLINK-7423:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4525
  
The logic is currently not correct with the contract of the input formats. 
A return value of null is not an "end of split" indicator.

Also, the description mentions that this adds a test, which I cannot find 
in the diff...


> Always reuse an instance  to get elements from the inputFormat 
> ---
>
> Key: FLINK-7423
> URL: https://issues.apache.org/jira/browse/FLINK-7423
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Xu Pingyong
>Assignee: Xu Pingyong
>
> In InputFormatSourceFunction.java:
> {code:java}
> OUT nextElement = serializer.createInstance();
>   while (isRunning) {
>   format.open(splitIterator.next());
>   // for each element we also check if cancel
>   // was called by checking the isRunning flag
>   while (isRunning && !format.reachedEnd()) {
>   nextElement = 
> format.nextRecord(nextElement);
>   if (nextElement != null) {
>   ctx.collect(nextElement);
>   } else {
>   break;
>   }
>   }
>   format.close();
>   completedSplitsCounter.inc();
>   if (isRunning) {
>   isRunning = splitIterator.hasNext();
>   }
>   }
> {code}
> the format may return other element or null when nextRecord, that will may 
> cause exception.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4525: [FLINK-7423] Always reuse an instance to get elements fro...

2017-08-18 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4525
  
The logic is currently not correct with the contract of the input formats. 
A return value of null is not an "end of split" indicator.

Also, the description mentions that this adds a test, which I cannot find 
in the diff...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132010#comment-16132010
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133924725
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,172 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
--- End diff --

Yes, CountDistinct is just used for test case here.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-18 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133924725
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,172 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
--- End diff --

Yes, CountDistinct is just used for test case here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132041#comment-16132041
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4358
  
Rebased onto `master`, but had to drag in #4402 early to fix the end-to-end 
tests failing due to spurious warnings. The test failure you observed was 
actually a test instability introduced with #4238 for which I added a hotfix to 
this PR.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132042#comment-16132042
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on the issue:

https://github.com/apache/flink/pull/4355
  
1. Use HeapMapView/HeapListView as default implementation
2. add initialize and cleanUp interface to GeneratedAggregations


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4534: [FLINK-7358][table]Add implicitly converts support for Us...

2017-08-18 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/4534
  
Hi, @fhueske Thank you for reminding me :-). I updated the description. and 
feel free to tell me if there is any inappropriate description. 

Thanks, jincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7358) Add implicitly converts support for User-defined function

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132056#comment-16132056
 ] 

ASF GitHub Bot commented on FLINK-7358:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/4534
  
Hi, @fhueske Thank you for reminding me :-). I updated the description. and 
feel free to tell me if there is any inappropriate description. 

Thanks, jincheng


> Add  implicitly converts support for User-defined function
> --
>
> Key: FLINK-7358
> URL: https://issues.apache.org/jira/browse/FLINK-7358
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently if user defined a UDF as follows:
> {code}
> object Func extends ScalarFunction {
>   def eval(a: Int, b: Long): String = {
> ...
>   }
> }
> {code}
> And if the table schema is (a: Int, b: int, c: String), then we can not call 
> the UDF `Func('a, 'b)`. So
> I want add implicitly converts when we call UDF. The implicitly convert rule 
> is:
> BYTE_TYPE_INFO -> SHORT_TYPE_INFO -> INT_TYPE_INFO -> LONG_TYPE_INFO -> 
> FLOAT_TYPE_INFO -> DOUBLE_TYPE_INFO
> *Note:
> In this JIRA. only for TableAPI, And SQL will be fixed in 
> https://issues.apache.org/jira/browse/CALCITE-1908.*
> What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4525: [FLINK-7423] Always reuse an instance to get elements fro...

2017-08-18 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4525
  
@StephanEwen why are `null` values permitted if not in the contract of the 
input formats?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7423) Always reuse an instance to get elements from the inputFormat

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132062#comment-16132062
 ] 

ASF GitHub Bot commented on FLINK-7423:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4525
  
@StephanEwen why are `null` values permitted if not in the contract of the 
input formats?


> Always reuse an instance  to get elements from the inputFormat 
> ---
>
> Key: FLINK-7423
> URL: https://issues.apache.org/jira/browse/FLINK-7423
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Xu Pingyong
>Assignee: Xu Pingyong
>
> In InputFormatSourceFunction.java:
> {code:java}
> OUT nextElement = serializer.createInstance();
>   while (isRunning) {
>   format.open(splitIterator.next());
>   // for each element we also check if cancel
>   // was called by checking the isRunning flag
>   while (isRunning && !format.reachedEnd()) {
>   nextElement = 
> format.nextRecord(nextElement);
>   if (nextElement != null) {
>   ctx.collect(nextElement);
>   } else {
>   break;
>   }
>   }
>   format.close();
>   completedSplitsCounter.inc();
>   if (isRunning) {
>   isRunning = splitIterator.hasNext();
>   }
>   }
> {code}
> the format may return other element or null when nextRecord, that will may 
> cause exception.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131996#comment-16131996
 ] 

ASF GitHub Bot commented on FLINK-6442:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r133920847
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -19,9 +19,21 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
-class QueryConfig private[table] extends Serializable {}
+class QueryConfig private[table] extends Serializable {
--- End diff --

There're more than one place need this get default QueryConfig from table 
env.


> Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in 
> SQL
> ---
>
> Key: FLINK-6442
> URL: https://issues.apache.org/jira/browse/FLINK-6442
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> Currently in TableAPI  there’s only registration method for source table,  
> when we use SQL writing a streaming job, we should add additional part for 
> the sink, like TableAPI does:
> {code}
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> val t = StreamTestData.getSmall3TupleDataStream(env)
> tEnv.registerDataStream("MyTable", t)
> // one way: invoke tableAPI’s writeToSink method directly
> val result = tEnv.sql(sqlQuery)
> result.writeToSink(new YourStreamSink)
> // another way: convert to datastream first and then invoke addSink 
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
> result.addSink(new StreamITCase.StringSink)
> {code}
> From the api we can see the sink table always be a derived table because its 
> 'schema' is inferred from the result type of upstream query.
> Compare to traditional RDBMS which support DML syntax, a query with a target 
> output could be written like this:
> {code}
> insert into table target_table_name
> [(column_name [ ,...n ])]
> query
> {code}
> The equivalent form of the example above is as follows:
> {code}
> tEnv.registerTableSink("targetTable", new YourSink)
> val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
> val result = tEnv.sql(sql)
> {code}
> It is supported by Calcite’s grammar: 
> {code}
>  insert:( INSERT | UPSERT ) INTO tablePrimary
>  [ '(' column [, column ]* ')' ]
>  query
> {code}
> I'd like to extend Flink TableAPI to support such feature.  see design doc: 
> https://goo.gl/n3phK5



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-18 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r133920847
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -19,9 +19,21 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
-class QueryConfig private[table] extends Serializable {}
+class QueryConfig private[table] extends Serializable {
--- End diff --

There're more than one place need this get default QueryConfig from table 
env.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-18 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r133921697
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
--- End diff --

SqlParser.parseStmt() actually call the SqlParser.parseQuery, so they're 
the same. Could not help us to distinguish the sql type, so use SqlKind here, 
SqlKind.QUERY consists of: SELECT, EXCEPT, INTERSECT, UNION, VALUES, ORDER_BY, 
EXPLICIT_TABLE.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-18 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133925097
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -405,6 +481,17 @@ class AggregationCodeGenerator(
   }
 }
 
+val aggFuncCode = Seq(
+  genSetAggregationResults,
+  genAccumulate,
+  genRetract,
+  genCreateAccumulators,
+  genSetForwardedFields,
+  genSetConstantFlags,
+  genCreateOutputRow,
+  genMergeAccumulatorsPair,
+  genResetAccumulator).mkString("\n")
--- End diff --

It make sense,I have look at ProcessFunctionWithCleanupState,the 
cleanUp should be called whenever cleanupState is called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132017#comment-16132017
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133925176
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -179,13 +214,19 @@ class AggregationCodeGenerator(
|  ${aggMapping(i)},
|  (${accTypes(i)}) accs.getField($i));""".stripMargin
   } else {
+val setDataView = if (accConfig.isDefined && 
accConfig.get.isUseState) {
--- End diff --

Yes, it make sense.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4454: [hotfix][docs] Add section in docs about writing unit/int...

2017-08-18 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4454
  
@twalthr, kindly reminder :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4468: [FLINK-7347] [streaming] Keep ids for current checkpoint ...

2017-08-18 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4468
  
Thanks a lot for contributing this fix, @ymost! 👍 

I merged into master, could you please close this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-7347) "removeAll" is extremely inefficient in MessageAcknowledgingSourceBase.notifyCheckpointComplete

2017-08-18 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7347.
---
Resolution: Fixed

Implemented in 76f1022884fe7b291fe81028a29896fb5b5ca5c9 on master.

> "removeAll" is extremely inefficient in 
> MessageAcknowledgingSourceBase.notifyCheckpointComplete
> ---
>
> Key: FLINK-7347
> URL: https://issues.apache.org/jira/browse/FLINK-7347
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.1
>Reporter: Yonatan Most
>Assignee: Yonatan Most
> Fix For: 1.4.0
>
>
> Observe this line in 
> {{MessageAcknowledgingSourceBase.notifyCheckpointComplete}}:
> {code}
> idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
> {code}
> The implementation of {{removeAll}} is such that if the set is smaller than 
> the collection to remove, then the set is iterated and every item is checked 
> for containment in the collection. The type of {{checkpoint.f1}} here is 
> {{ArrayList}}, so the {{contains}} action is very inefficient, and it is 
> performed for every item in {{idsProcessedButNotAcknowledged}}.
> In our pipeline we had about 10 million events processed, and the checkpoint 
> was stuck on the {{removeAll}} call for hours.
> A simple solution is to make {{idsForCurrentCheckpoint}} a {{HashSet}} 
> instead of an {{ArrayList}}. The fact that it's a list is not really used 
> anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7475) ListState support update

2017-08-18 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7475:

Component/s: DataStream API

> ListState support update
> 
>
> Key: FLINK-7475
> URL: https://issues.apache.org/jira/browse/FLINK-7475
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataStream API
>Reporter: yf
>
> If I want to update the list. 
> I have to do two steps: 
> listState.clear() 
> for (Element e : myList) { 
> listState.add(e); 
> } 
> Why not I update the state by: 
> listState.update(myList) ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131995#comment-16131995
 ] 

ASF GitHub Bot commented on FLINK-6442:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r133921697
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
--- End diff --

SqlParser.parseStmt() actually call the SqlParser.parseQuery, so they're 
the same. Could not help us to distinguish the sql type, so use SqlKind here, 
SqlKind.QUERY consists of: SELECT, EXCEPT, INTERSECT, UNION, VALUES, ORDER_BY, 
EXPLICIT_TABLE.


> Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in 
> SQL
> ---
>
> Key: FLINK-6442
> URL: https://issues.apache.org/jira/browse/FLINK-6442
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> Currently in TableAPI  there’s only registration method for source table,  
> when we use SQL writing a streaming job, we should add additional part for 
> the sink, like TableAPI does:
> {code}
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> val t = StreamTestData.getSmall3TupleDataStream(env)
> tEnv.registerDataStream("MyTable", t)
> // one way: invoke tableAPI’s writeToSink method directly
> val result = tEnv.sql(sqlQuery)
> result.writeToSink(new YourStreamSink)
> // another way: convert to datastream first and then invoke addSink 
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
> result.addSink(new StreamITCase.StringSink)
> {code}
> From the api we can see the sink table always be a derived table because its 
> 'schema' is inferred from the result type of upstream query.
> Compare to traditional RDBMS which support DML syntax, a query with a target 
> output could be written like this:
> {code}
> insert into table target_table_name
> [(column_name [ ,...n ])]
> query
> {code}
> The equivalent form of the example above is as follows:
> {code}
> tEnv.registerTableSink("targetTable", new YourSink)
> val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
> val result = tEnv.sql(sql)
> {code}
> It is supported by Calcite’s grammar: 
> {code}
>  insert:( INSERT | UPSERT ) INTO tablePrimary
>  [ '(' column [, column ]* ')' ]
>  query
> {code}
> 

[GitHub] flink issue #4554: [FLINK-7442] Add option for using child-first classloader...

2017-08-18 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4554
  
I think what you mentioned is one more reason to not use this in too many 
places for now, but only inside the TaskManager / Tasks. Let's introduce that 
as a tool that users can use to resolve conflicts and gather some feedback 
before we pull that into client / queryableStateClient / etc...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131997#comment-16131997
 ] 

ASF GitHub Bot commented on FLINK-7442:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4554
  
I think what you mentioned is one more reason to not use this in too many 
places for now, but only inside the TaskManager / Tasks. Let's introduce that 
as a tool that users can use to resolve conflicts and gather some feedback 
before we pull that into client / queryableStateClient / etc...


> Add option for using a child-first classloader for loading user code
> 
>
> Key: FLINK-7442
> URL: https://issues.apache.org/jira/browse/FLINK-7442
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7478) Update documentation for sql insert and api change in TableAPI & SQL

2017-08-18 Thread lincoln.lee (JIRA)
lincoln.lee created FLINK-7478:
--

 Summary: Update documentation for sql insert and api change in 
TableAPI & SQL
 Key: FLINK-7478
 URL: https://issues.apache.org/jira/browse/FLINK-7478
 Project: Flink
  Issue Type: New Feature
Reporter: lincoln.lee
Assignee: lincoln.lee
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-18 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133925176
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -179,13 +214,19 @@ class AggregationCodeGenerator(
|  ${aggMapping(i)},
|  (${accTypes(i)}) accs.getField($i));""".stripMargin
   } else {
+val setDataView = if (accConfig.isDefined && 
accConfig.get.isUseState) {
--- End diff --

Yes, it make sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132016#comment-16132016
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133925097
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -405,6 +481,17 @@ class AggregationCodeGenerator(
   }
 }
 
+val aggFuncCode = Seq(
+  genSetAggregationResults,
+  genAccumulate,
+  genRetract,
+  genCreateAccumulators,
+  genSetForwardedFields,
+  genSetConstantFlags,
+  genCreateOutputRow,
+  genMergeAccumulatorsPair,
+  genResetAccumulator).mkString("\n")
--- End diff --

It make sense,I have look at ProcessFunctionWithCleanupState,the cleanUp 
should be called whenever cleanupState is called.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4358: [FLINK-7068][blob] change BlobService sub-classes for per...

2017-08-18 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4358
  
Rebased onto `master`, but had to drag in #4402 early to fix the end-to-end 
tests failing due to spurious warnings. The test failure you observed was 
actually a test instability introduced with #4238 for which I added a hotfix to 
this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4355: [FLINK-7206] [table] Implementation of DataView to suppor...

2017-08-18 Thread kaibozhou
Github user kaibozhou commented on the issue:

https://github.com/apache/flink/pull/4355
  
1. Use HeapMapView/HeapListView as default implementation
2. add initialize and cleanUp interface to GeneratedAggregations


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7479) Support to retrieve the past event by an offset

2017-08-18 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133007#comment-16133007
 ] 

Dian Fu commented on FLINK-7479:


This is to support the {{PREV}} clause in {{MATCH_RECOGNIZE}}. As described in 
the [doc|https://docs.oracle.com/database/121/DWHSG/pattern.htm#DWHSG8996], 
{{PREV}} defines an expression using a previous row by physical offset. This 
means that in the {{filter}} method of {{IterativeCondition}}, we may also need 
to access the past events by physical offset and the accessed events may be not 
matched by any pattern.

> Support to retrieve the past event by an offset 
> 
>
> Key: FLINK-7479
> URL: https://issues.apache.org/jira/browse/FLINK-7479
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, it's already able to retrieve events matched to the specifed 
> pattern in {{IterativeCondition.Context}}. While there are also requirements 
> to retrieve events by an physical offset. The retrieved events may not be 
> matched to any pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7479) Support to retrieve the past event by physical offset

2017-08-18 Thread Dian Fu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-7479:
---
Summary: Support to retrieve the past event by physical offset   (was: 
Support to retrieve the past event by an offset )

> Support to retrieve the past event by physical offset 
> --
>
> Key: FLINK-7479
> URL: https://issues.apache.org/jira/browse/FLINK-7479
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, it's already able to retrieve events matched to the specifed 
> pattern in {{IterativeCondition.Context}}. While there are also requirements 
> to retrieve events by an physical offset. The retrieved events may not be 
> matched to any pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7471) Improve bounded OVER support non-retract method AGG

2017-08-18 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133031#comment-16133031
 ] 

Jark Wu commented on FLINK-7471:


Hi [~fhueske], I think [~sunjincheng121] mean that retraction for over 
aggregates is an optimization in most cases but not all cases. For example, 
min/max with retraction needs to store all records of the group in the 
min/max's accumulator. It's hard to say the overload of retraction mode (store 
all records) is certainly less than non-retraction mode (compute all records of 
a group).  That's why the over window can support non-retract AGG. The over 
window mode (retract/non-retract) is depends on whether all the aggregates 
implement the retract methods. And the retract method is not mandatory anymore 
for over window.

> Improve bounded OVER support non-retract method AGG
> ---
>
> Key: FLINK-7471
> URL: https://issues.apache.org/jira/browse/FLINK-7471
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently BOUNDED OVER WINDOW only support have {{retract}} method AGG. In 
> this JIRA. will add non-retract method support.
> What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7477) Use "hadoop classpath" to augment classpath when available

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133044#comment-16133044
 ] 

ASF GitHub Bot commented on FLINK-7477:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4566

[FLINK-7477] [FLINK-7480] Various improvements to Flink scripts



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink hadoop-env-improvements

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4566.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4566


commit 6b4d7e5e09dcd913fbb9c84c59fc8a10e6c662cc
Author: Aljoscha Krettek 
Date:   2017-08-18T14:39:41Z

[FLINK-7477] Use "hadoop classpath" to augment classpath when available

This improves the out-of-box experience on GCE and AWS, both of which
don't set a HADOOP_CLASSPATH but have "hadoop" available on the $PATH.

commit f63e2d03d739014f0cd94634d731e552a02c76d9
Author: Aljoscha Krettek 
Date:   2017-08-18T14:40:55Z

[FLINK-7480] Set HADOOP_CONF_DIR to sane default if not set

This improves the out-of-box experience on GCE and AWS, both of which
don't set HADOOP_CONF_DIR by default but use /etc/hadoop/conf




> Use "hadoop classpath" to augment classpath when available
> --
>
> Key: FLINK-7477
> URL: https://issues.apache.org/jira/browse/FLINK-7477
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Currently, some cloud environments don't properly put the Hadoop jars into 
> {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should 
> check in {{config.sh}} if the {{hadoop}} binary is on the path and augment 
> our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in 
> our scripts.
> This will improve the out-of-box experience of users that otherwise have to 
> manually set {{HADOOP_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4566: [FLINK-7477] [FLINK-7480] Various improvements to ...

2017-08-18 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4566

[FLINK-7477] [FLINK-7480] Various improvements to Flink scripts



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink hadoop-env-improvements

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4566.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4566


commit 6b4d7e5e09dcd913fbb9c84c59fc8a10e6c662cc
Author: Aljoscha Krettek 
Date:   2017-08-18T14:39:41Z

[FLINK-7477] Use "hadoop classpath" to augment classpath when available

This improves the out-of-box experience on GCE and AWS, both of which
don't set a HADOOP_CLASSPATH but have "hadoop" available on the $PATH.

commit f63e2d03d739014f0cd94634d731e552a02c76d9
Author: Aljoscha Krettek 
Date:   2017-08-18T14:40:55Z

[FLINK-7480] Set HADOOP_CONF_DIR to sane default if not set

This improves the out-of-box experience on GCE and AWS, both of which
don't set HADOOP_CONF_DIR by default but use /etc/hadoop/conf




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133056#comment-16133056
 ] 

ASF GitHub Bot commented on FLINK-6630:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4555#discussion_r133974899
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+
+/**
+ * Entry point for Mesos per-job clusters.
+ */
+public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
+
+   public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+   // 

+   //  Command-line options
+   // 

+
+   private static final Options ALL_OPTIONS;
+
+   static {
+   ALL_OPTIONS =
+   new Options()
+   
.addOption(BootstrapTools.newDynamicPropertiesOption());
+   }
+
+   private MesosConfiguration schedulerConfiguration;
+
+   private MesosServices mesosServices;
+
+   private MesosTaskManagerParameters taskManagerParameters;
+
+   private ContainerSpecification taskManagerContainerSpec;
+
+   public MesosJobClusterEntrypoint(Configuration config) {
+   super(config);
+   }
+
+   @Override
+   protected void initializeServices(Configuration config) throws 
Exception {
+   super.initializeServices(config);
+
+   final String hostname = 
config.getString(JobManagerOptions.ADDRESS);
+
+   // Mesos configuration
+   schedulerConfiguration = 

[GitHub] flink pull request #4555: [FLINK-6630] [Mesos] Implement FLIP-6 MesosAppMast...

2017-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4555#discussion_r133974899
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+
+/**
+ * Entry point for Mesos per-job clusters.
+ */
+public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
+
+   public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+   // 

+   //  Command-line options
+   // 

+
+   private static final Options ALL_OPTIONS;
+
+   static {
+   ALL_OPTIONS =
+   new Options()
+   
.addOption(BootstrapTools.newDynamicPropertiesOption());
+   }
+
+   private MesosConfiguration schedulerConfiguration;
+
+   private MesosServices mesosServices;
+
+   private MesosTaskManagerParameters taskManagerParameters;
+
+   private ContainerSpecification taskManagerContainerSpec;
+
+   public MesosJobClusterEntrypoint(Configuration config) {
+   super(config);
+   }
+
+   @Override
+   protected void initializeServices(Configuration config) throws 
Exception {
+   super.initializeServices(config);
+
+   final String hostname = 
config.getString(JobManagerOptions.ADDRESS);
+
+   // Mesos configuration
+   schedulerConfiguration = 
MesosEntrypointUtils.createMesosSchedulerConfiguration(config, hostname);
+
+   // services
+   mesosServices = MesosServicesUtils.createMesosServices(config, 
hostname);
+
+   // TM configuration
+   

[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133058#comment-16133058
 ] 

ASF GitHub Bot commented on FLINK-6630:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4555#discussion_r133975100
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+
+/**
+ * Entry point for Mesos per-job clusters.
+ */
+public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
+
+   public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+   // 

+   //  Command-line options
+   // 

+
+   private static final Options ALL_OPTIONS;
+
+   static {
+   ALL_OPTIONS =
+   new Options()
+   
.addOption(BootstrapTools.newDynamicPropertiesOption());
+   }
+
+   private MesosConfiguration schedulerConfiguration;
+
+   private MesosServices mesosServices;
+
+   private MesosTaskManagerParameters taskManagerParameters;
+
+   private ContainerSpecification taskManagerContainerSpec;
+
+   public MesosJobClusterEntrypoint(Configuration config) {
+   super(config);
+   }
+
+   @Override
+   protected void initializeServices(Configuration config) throws 
Exception {
+   super.initializeServices(config);
+
+   final String hostname = 
config.getString(JobManagerOptions.ADDRESS);
+
+   // Mesos configuration
+   schedulerConfiguration = 

[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133057#comment-16133057
 ] 

ASF GitHub Bot commented on FLINK-6630:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4555#discussion_r133975661
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * The entry point for running a TaskManager in a Mesos container.
+ */
+public class MesosTaskExecutorRunner {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MesosTaskExecutorRunner.class);
+
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   private static final Options ALL_OPTIONS;
+
+   static {
+   ALL_OPTIONS =
+   new Options()
+   
.addOption(BootstrapTools.newDynamicPropertiesOption());
+   }
+
+   /** The process environment variables. */
+   private static final Map ENV = System.getenv();
+
+   public static void main(String[] args) throws Exception {
+   EnvironmentInformation.logEnvironmentInfo(LOG, 
MesosTaskExecutorRunner.class.getSimpleName(), args);
+   SignalHandler.register(LOG);
+   JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+   // try to parse the command line arguments
+   CommandLineParser parser = new PosixParser();
+   CommandLine cmd = parser.parse(ALL_OPTIONS, args);
+
+   final Configuration configuration;
+   try {
+   final Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
+   
GlobalConfiguration.setDynamicProperties(dynamicProperties);
+   LOG.debug("Mesos dynamic properties: {}", 
dynamicProperties);
+
+   configuration = GlobalConfiguration.loadConfiguration();
--- End diff --

Here I think we could use `MesosEntrypointUtils#loadConfiguration(cmd)`.


> Implement FLIP-6 MesosAppMasterRunner
> -
>
> Key: FLINK-6630
> URL: https://issues.apache.org/jira/browse/FLINK-6630
> Project: Flink
>  Issue Type: Sub-task
>  Components: Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> A new runner must be developed for the FLIP-6 RM.  Target the "single job" 
> scenario.
> Take some time to consider a general solution or a base implementation that 
> is shared with the old 

[jira] [Commented] (FLINK-7402) Ineffective null check in NettyMessage#write()

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133081#comment-16133081
 ] 

ASF GitHub Bot commented on FLINK-7402:
---

Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4562#discussion_r133980441
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -220,6 +220,10 @@ void releaseBuffer() {
 
@Override
ByteBuf write(ByteBufAllocator allocator) throws IOException {
+   if (null == buffer) {
+   throw new NullPointerException();
--- End diff --

Add exception message to provide more information.


> Ineffective null check in NettyMessage#write()
> --
>
> Key: FLINK-7402
> URL: https://issues.apache.org/jira/browse/FLINK-7402
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the null check in finally block:
> {code}
>   finally {
> if (buffer != null) {
>   buffer.recycle();
> }
> {code}
> But buffer has been dereferenced in the try block without guard.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4562: [FLINK-7402] Fix ineffective null check in NettyMe...

2017-08-18 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4562#discussion_r133980441
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -220,6 +220,10 @@ void releaseBuffer() {
 
@Override
ByteBuf write(ByteBufAllocator allocator) throws IOException {
+   if (null == buffer) {
+   throw new NullPointerException();
--- End diff --

Add exception message to provide more information.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4560: Flink 7077

2017-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4560#discussion_r133980216
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -960,12 +965,26 @@ public void handleError(final Exception exception) {
 
@Override
public void releaseResource(InstanceID instanceId) {
-   stopWorker(instanceId);
+   runAsync(new Runnable() {
+   @Override
+   public void run() {
+   for (Map.Entry entry : taskExecutors.entrySet()) {
+   if 
(entry.getValue().getInstanceID().equals(instanceId)) {
+   
stopWorker(entry.getKey());
--- End diff --

In the future we should make these ids being composed of each other. Then 
we should easily obtain the `ResourceID` from the `InstanceID`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133027#comment-16133027
 ] 

ASF GitHub Bot commented on FLINK-7245:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4530
  
Thanks for the comment, @aljoscha. IMO, making the `timeServiceManager` 
protected indeed will minimise the impact on `AbstractStreamOperator`, while 
that may introduce duplicated codes in the subclasses. We make some trade-offs 
here.


> Enhance the operators to support holding back watermarks
> 
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>   if (timeServiceManager != null) {
>   timeServiceManager.advanceWatermark(mark);
>   }
>   output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

2017-08-18 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4530
  
Thanks for the comment, @aljoscha. IMO, making the `timeServiceManager` 
protected indeed will minimise the impact on `AbstractStreamOperator`, while 
that may introduce duplicated codes in the subclasses. We make some trade-offs 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7480) Set HADOOP_CONF_DIR to sane default if not set

2017-08-18 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-7480:
---

 Summary: Set HADOOP_CONF_DIR to sane default if not set
 Key: FLINK-7480
 URL: https://issues.apache.org/jira/browse/FLINK-7480
 Project: Flink
  Issue Type: Improvement
  Components: Startup Shell Scripts
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
 Fix For: 1.4.0


Currently, both AWS and GCE don't have a {{HADOOP_CONF_DIR}} set by default. 
This makes the out-of-box experience on these cloud environments bad because 
not setting it results in errors that are not obviously clear.

In case {{HADOOP_CONF_DIR}} is not set we should check if {{/etc/hadoop/conf}} 
exits and set {{HADOOP_CONF_DIR}} to that.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7477) Use "hadoop classpath" to augment classpath when available

2017-08-18 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7477:

Fix Version/s: 1.4.0

> Use "hadoop classpath" to augment classpath when available
> --
>
> Key: FLINK-7477
> URL: https://issues.apache.org/jira/browse/FLINK-7477
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Currently, some cloud environments don't properly put the Hadoop jars into 
> {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should 
> check in {{config.sh}} if the {{hadoop}} binary is on the path and augment 
> our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in 
> our scripts.
> This will improve the out-of-box experience of users that otherwise have to 
> manually set {{HADOOP_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-18 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133050#comment-16133050
 ] 

Xingcan Cui commented on FLINK-7446:


Thanks for the response, [~fhueske]. From my perspective, I prefer the later 
option (create new interfaces/operators...). Maybe we can provide one or more 
default watermark generators. Users can directly set them by providing some 
parameters (e.g., the watermark interval and the expected delay to the latest 
rowtime). Moreover, if the provided watermark generators can not meet the 
requirements, users can implement their own ones. What do you think, [~jark]?

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133055#comment-16133055
 ] 

ASF GitHub Bot commented on FLINK-6630:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4555#discussion_r133974223
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.configuration.MesosOptions;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import 
org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
+import 
org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
+import 
org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay;
+import 
org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Utils for Mesos entrpoints.
+ */
+public class MesosEntrypointUtils {
+
+   @Deprecated
+   public static Configuration loadConfiguration(CommandLine cmd) {
+
+   // merge the dynamic properties from the command-line
+   Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
+   GlobalConfiguration.setDynamicProperties(dynamicProperties);
+   Configuration config = GlobalConfiguration.loadConfiguration();
+
+   return config;
+   }
+
+   /**
+* Loads and validates the Mesos scheduler configuration.
+* @param flinkConfig the global configuration.
+* @param hostname the hostname to advertise to the Mesos master.
+*/
+   public static MesosConfiguration 
createMesosSchedulerConfiguration(Configuration flinkConfig, String hostname) {
+
+   Protos.FrameworkInfo.Builder frameworkInfo = 
Protos.FrameworkInfo.newBuilder()
+   .setHostname(hostname);
+   Protos.Credential.Builder credential = null;
+
+   if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
+   throw new 
IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be 
configured.");
+   }
+   String masterUrl = 
flinkConfig.getString(MesosOptions.MASTER_URL);
+
+   Duration failoverTimeout = FiniteDuration.apply(
+   flinkConfig.getInteger(
+   MesosOptions.FAILOVER_TIMEOUT_SECONDS),
+   TimeUnit.SECONDS);
+   frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
+
+   frameworkInfo.setName(flinkConfig.getString(
+   MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
+
+   frameworkInfo.setRole(flinkConfig.getString(
+   MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE));
+
+   frameworkInfo.setUser(flinkConfig.getString(

[GitHub] flink pull request #4555: [FLINK-6630] [Mesos] Implement FLIP-6 MesosAppMast...

2017-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4555#discussion_r133974223
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.configuration.MesosOptions;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import 
org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
+import 
org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
+import 
org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay;
+import 
org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Utils for Mesos entrpoints.
+ */
+public class MesosEntrypointUtils {
+
+   @Deprecated
+   public static Configuration loadConfiguration(CommandLine cmd) {
+
+   // merge the dynamic properties from the command-line
+   Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
+   GlobalConfiguration.setDynamicProperties(dynamicProperties);
+   Configuration config = GlobalConfiguration.loadConfiguration();
+
+   return config;
+   }
+
+   /**
+* Loads and validates the Mesos scheduler configuration.
+* @param flinkConfig the global configuration.
+* @param hostname the hostname to advertise to the Mesos master.
+*/
+   public static MesosConfiguration 
createMesosSchedulerConfiguration(Configuration flinkConfig, String hostname) {
+
+   Protos.FrameworkInfo.Builder frameworkInfo = 
Protos.FrameworkInfo.newBuilder()
+   .setHostname(hostname);
+   Protos.Credential.Builder credential = null;
+
+   if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
+   throw new 
IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be 
configured.");
+   }
+   String masterUrl = 
flinkConfig.getString(MesosOptions.MASTER_URL);
+
+   Duration failoverTimeout = FiniteDuration.apply(
+   flinkConfig.getInteger(
+   MesosOptions.FAILOVER_TIMEOUT_SECONDS),
+   TimeUnit.SECONDS);
+   frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
+
+   frameworkInfo.setName(flinkConfig.getString(
+   MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
+
+   frameworkInfo.setRole(flinkConfig.getString(
+   MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE));
+
+   frameworkInfo.setUser(flinkConfig.getString(
+   MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER));
+
+   if 
(flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
+   

[GitHub] flink pull request #4555: [FLINK-6630] [Mesos] Implement FLIP-6 MesosAppMast...

2017-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4555#discussion_r133975661
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * The entry point for running a TaskManager in a Mesos container.
+ */
+public class MesosTaskExecutorRunner {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MesosTaskExecutorRunner.class);
+
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   private static final Options ALL_OPTIONS;
+
+   static {
+   ALL_OPTIONS =
+   new Options()
+   
.addOption(BootstrapTools.newDynamicPropertiesOption());
+   }
+
+   /** The process environment variables. */
+   private static final Map ENV = System.getenv();
+
+   public static void main(String[] args) throws Exception {
+   EnvironmentInformation.logEnvironmentInfo(LOG, 
MesosTaskExecutorRunner.class.getSimpleName(), args);
+   SignalHandler.register(LOG);
+   JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+   // try to parse the command line arguments
+   CommandLineParser parser = new PosixParser();
+   CommandLine cmd = parser.parse(ALL_OPTIONS, args);
+
+   final Configuration configuration;
+   try {
+   final Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
+   
GlobalConfiguration.setDynamicProperties(dynamicProperties);
+   LOG.debug("Mesos dynamic properties: {}", 
dynamicProperties);
+
+   configuration = GlobalConfiguration.loadConfiguration();
--- End diff --

Here I think we could use `MesosEntrypointUtils#loadConfiguration(cmd)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4555: [FLINK-6630] [Mesos] Implement FLIP-6 MesosAppMast...

2017-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4555#discussion_r133975100
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+
+/**
+ * Entry point for Mesos per-job clusters.
+ */
+public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
+
+   public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+   // 

+   //  Command-line options
+   // 

+
+   private static final Options ALL_OPTIONS;
+
+   static {
+   ALL_OPTIONS =
+   new Options()
+   
.addOption(BootstrapTools.newDynamicPropertiesOption());
+   }
+
+   private MesosConfiguration schedulerConfiguration;
+
+   private MesosServices mesosServices;
+
+   private MesosTaskManagerParameters taskManagerParameters;
+
+   private ContainerSpecification taskManagerContainerSpec;
+
+   public MesosJobClusterEntrypoint(Configuration config) {
+   super(config);
+   }
+
+   @Override
+   protected void initializeServices(Configuration config) throws 
Exception {
+   super.initializeServices(config);
+
+   final String hostname = 
config.getString(JobManagerOptions.ADDRESS);
+
+   // Mesos configuration
+   schedulerConfiguration = 
MesosEntrypointUtils.createMesosSchedulerConfiguration(config, hostname);
+
+   // services
+   mesosServices = MesosServicesUtils.createMesosServices(config, 
hostname);
+
+   // TM configuration
+   

[jira] [Commented] (FLINK-7300) End-to-end tests are instable on Travis

2017-08-18 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133053#comment-16133053
 ] 

Aljoscha Krettek commented on FLINK-7300:
-

The reason this time is an {{AskTimeoutException}}. The problem is that even a 
normal run of Flink can have exceptions and errors in the log. In our release 
testing we have this section about "running a cluster and verifying that the 
log and output are clear of exceptions and errors". I think in the real world 
the log is never clear of exceptions and errors, even in the case where 
everything wen't well.

[~till.rohrmann] You think we should maybe just not test for the log being 
clean? I could also add {{AskTimeoutException}} to the list of exceptions that 
we expect to occur. I'm guessing this just sometimes occurs with Akka?

> End-to-end tests are instable on Travis
> ---
>
> Key: FLINK-7300
> URL: https://issues.apache.org/jira/browse/FLINK-7300
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> It seems like the end-to-end tests are instable, causing the {{misc}} build 
> profile to sporadically fail.
> Incorrect matched output:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/258569408/log.txt?X-Amz-Expires=30=20170731T060526Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request=host=4ef9ff5e60fe06db53a84be8d73775a46cb595a8caeb806b05dbbf824d3b69e8
> Another failure example of a different cause then the above, also on the 
> end-to-end tests:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/258841693/log.txt?X-Amz-Expires=30=20170731T060007Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request=host=4a106b3990228b7628c250cc15407bc2c131c8332e1a94ad68d649fe8d32d726



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4560: Flink 7077

2017-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4560#discussion_r133980386
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
 ---
@@ -68,7 +68,9 @@ public boolean isIdle() {
}
 
public void markIdle() {
-   idleSince = System.currentTimeMillis();
+   if (!isIdle()) {
+   idleSince = System.currentTimeMillis();
+   }
--- End diff --

good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4564: [FLINK-7442] Add option for using child-first clas...

2017-08-18 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4564

[FLINK-7442] Add option for using child-first classloader for loading user 
code

This is an alternative to #4554 that does not make the client class loader 
configurable.

## What is the purpose of the change

This PR introduces a new core option (`classloader.resolve-order: 
child-first`) that allows using a child-first class loader for user code. The 
default is still to use a parent-first class loader.

This also does a minor refactoring in the way the blob manager retrieves 
the cleanup interval. It's now also read from the `Configuration`, since we 
already have the `Configuration` for the class loader settings.

## Brief change log

 - Introduce new option
 - Pass `Configuration` thought to all places where we previously created a 
user class loader
 - Instantiate correct class loader based on config

## Verifying this change

This PR introduces new end-to-end tests that verify the new feature in a 
complete Flink workflow, including starting the program using `bin/flink run`.

## Does this pull request potentially affect one of the following parts:


This affects class loader, which is quite important to get right.

## Documentation

 - the new flag is documented in the config documentation


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-7441-child-first-classloader-alternative

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4564.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4564


commit 33bd4cf5160ec64cbaace876b305922b804cc3a1
Author: Aljoscha Krettek 
Date:   2017-08-14T12:53:14Z

[FLINK-7442] Add option for using a child-first classloader for loading 
user code




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132168#comment-16132168
 ] 

ASF GitHub Bot commented on FLINK-7442:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4564

[FLINK-7442] Add option for using child-first classloader for loading user 
code

This is an alternative to #4554 that does not make the client class loader 
configurable.

## What is the purpose of the change

This PR introduces a new core option (`classloader.resolve-order: 
child-first`) that allows using a child-first class loader for user code. The 
default is still to use a parent-first class loader.

This also does a minor refactoring in the way the blob manager retrieves 
the cleanup interval. It's now also read from the `Configuration`, since we 
already have the `Configuration` for the class loader settings.

## Brief change log

 - Introduce new option
 - Pass `Configuration` thought to all places where we previously created a 
user class loader
 - Instantiate correct class loader based on config

## Verifying this change

This PR introduces new end-to-end tests that verify the new feature in a 
complete Flink workflow, including starting the program using `bin/flink run`.

## Does this pull request potentially affect one of the following parts:


This affects class loader, which is quite important to get right.

## Documentation

 - the new flag is documented in the config documentation


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-7441-child-first-classloader-alternative

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4564.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4564


commit 33bd4cf5160ec64cbaace876b305922b804cc3a1
Author: Aljoscha Krettek 
Date:   2017-08-14T12:53:14Z

[FLINK-7442] Add option for using a child-first classloader for loading 
user code




> Add option for using a child-first classloader for loading user code
> 
>
> Key: FLINK-7442
> URL: https://issues.apache.org/jira/browse/FLINK-7442
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >