[jira] [Commented] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2017-08-04 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16115259#comment-16115259
 ] 

mingleizhang commented on FLINK-4534:
-

[~tedyu] Thanks for reporting this! One thing I need confirm, in 
restoreState(), I can not find what you said the code {{for (BucketState 
bucketState : state.bucketStates.values()) {}}

All I can found is that the following, and I dont think it needs a 
synchronization for this, as it is just a local variable. What do you think of 
this ?
{code:java}

private void handleRestoredBucketState(State restoredState) {
Preconditions.checkNotNull(restoredState);

for (BucketState bucketState : 
restoredState.bucketStates.values()) {
{code}

Other than that {{state.bucketStates}} in {{close}} method indeed needs a 
synchronization. And I have done it.



> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2017-08-04 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-4534:
---

Assignee: mingleizhang

> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-04 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16115247#comment-16115247
 ] 

Xingcan Cui commented on FLINK-7337:


Thanks for the explanation, [~fhueske]. Everything's clear to me now.

> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6429) Bump up Calcite version to 1.13

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16115222#comment-16115222
 ] 

ASF GitHub Bot commented on FLINK-6429:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4373#discussion_r131512787
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
 ---
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
--- End diff --

Yes,  you are right. We should make sure it is a Calcite bug and will be 
fixed in the next release. 


> Bump up Calcite version to 1.13
> ---
>
> Key: FLINK-6429
> URL: https://issues.apache.org/jira/browse/FLINK-6429
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Haohui Mai
>
> This is an umbrella issue for all tasks that need to be done once Apache 
> Calcite 1.13 is released.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4373: [FLINK-6429] [table] Bump up Calcite version to 1....

2017-08-04 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4373#discussion_r131512787
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
 ---
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
--- End diff --

Yes,  you are right. We should make sure it is a Calcite bug and will be 
fixed in the next release. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16115200#comment-16115200
 ] 

ASF GitHub Bot commented on FLINK-4565:
---

Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4404#discussion_r131511583
  
--- Diff: docs/dev/table/sql.md ---
@@ -497,6 +497,23 @@ FROM (
 {% endhighlight %}
   
 
+
+
+  
+In
+Batch
+  
+  
+  Returns true if an expression exists in a given table sub-query. The 
sub-query table must consist of one column. This column must have the same data 
type as the expression.
--- End diff --

Should the syntax be enhanced so that user can specify one column in the 
table with multiple columns ?


> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
> Fix For: 1.4.0
>
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

2017-08-04 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4404#discussion_r131511583
  
--- Diff: docs/dev/table/sql.md ---
@@ -497,6 +497,23 @@ FROM (
 {% endhighlight %}
   
 
+
+
+  
+In
+Batch
+  
+  
+  Returns true if an expression exists in a given table sub-query. The 
sub-query table must consist of one column. This column must have the same data 
type as the expression.
--- End diff --

Should the syntax be enhanced so that user can specify one column in the 
table with multiple columns ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4499) Introduce findbugs maven plugin

2017-08-04 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4499:
--
Description: 
As suggested by Stephan in FLINK-4482, this issue is to add 
findbugs-maven-plugin into the build process so that we can detect lack of 
proper locking and other defects automatically.


We can begin with small set of rules.

  was:
As suggested by Stephan in FLINK-4482, this issue is to add 
findbugs-maven-plugin into the build process so that we can detect lack of 
proper locking and other defects automatically.

We can begin with small set of rules.


> Introduce findbugs maven plugin
> ---
>
> Key: FLINK-4499
> URL: https://issues.apache.org/jira/browse/FLINK-4499
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Suneel Marthi
>
> As suggested by Stephan in FLINK-4482, this issue is to add 
> findbugs-maven-plugin into the build process so that we can detect lack of 
> proper locking and other defects automatically.
> We can begin with small set of rules.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2017-08-04 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4534:
--
Description: 
Iteration over state.bucketStates is protected by synchronization in other 
methods, except for the following in restoreState():
{code}
for (BucketState bucketState : state.bucketStates.values()) {
{code}
and following in close():
{code}
for (Map.Entry entry : 
state.bucketStates.entrySet()) {
  closeCurrentPartFile(entry.getValue());
{code}
w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting 
line 752:
{code}
  Set pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
  LOG.debug("Moving pending files to final location on restore.");
  for (Long pastCheckpointId : pastCheckpointIds) {
{code}

  was:
Iteration over state.bucketStates is protected by synchronization in other 
methods, except for the following in restoreState():

{code}
for (BucketState bucketState : state.bucketStates.values()) {
{code}
and following in close():
{code}
for (Map.Entry entry : 
state.bucketStates.entrySet()) {
  closeCurrentPartFile(entry.getValue());
{code}
w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting 
line 752:
{code}
  Set pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
  LOG.debug("Moving pending files to final location on restore.");
  for (Long pastCheckpointId : pastCheckpointIds) {
{code}


> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-6333) Utilize Bloomfilters in RocksDb

2017-08-04 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16095490#comment-16095490
 ] 

Ted Yu edited comment on FLINK-6333 at 8/5/17 1:32 AM:
---

Looks like rocksdb issue 1964 can be closed.


was (Author: yuzhih...@gmail.com):
Looks like rocksdb issue 1964 can be closed .

> Utilize Bloomfilters in RocksDb
> ---
>
> Key: FLINK-6333
> URL: https://issues.apache.org/jira/browse/FLINK-6333
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Ted Yu
>
> Bloom Filters would speed up RocksDb lookups.
> When we upgrade to RocksDb 5.2.1+, we would be able to do:
> {code}
>   new BlockBasedTableConfig()
>   .setBlockCacheSize(blockCacheSize)
>   .setBlockSize(blockSize)
>   .setFilter(new BloomFilter())
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-6359) Utilize Hierarchical Timing Wheels for performant timer

2017-08-04 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15991975#comment-15991975
 ] 

Ted Yu edited comment on FLINK-6359 at 8/5/17 1:31 AM:
---

Interesting observations, Ben .


was (Author: yuzhih...@gmail.com):
Interesting observations, Ben.

> Utilize Hierarchical Timing Wheels for performant timer
> ---
>
> Key: FLINK-6359
> URL: https://issues.apache.org/jira/browse/FLINK-6359
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Local Runtime
>Reporter: Ted Yu
>
> In this thread on mailing list:
> http://search-hadoop.com/m/Flink/VkLeQPmRa31hd5cw
> Gyula Fóra mentioned that timer deletion becomes performance bottleneck due 
> to the usage of priority queue.
> Benjamin has an implementation for Hierarchical Timing Wheels (Apache 
> License) :
> https://github.com/ben-manes/caffeine/blob/master/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java
> {code}
>  * A hierarchical timer wheel to add, remove, and fire expiration events in 
> amortized O(1) time. The
>  * expiration events are deferred until the timer is advanced, which is 
> performed as part of the
>  * cache's maintenance cycle.
> {code}
> We should consider porting the above over to facilitate performant timer.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-6402) Consider removing annotation for REAPER_THREAD_LOCK in SafetyNetCloseableRegistry#doRegister()

2017-08-04 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15986965#comment-15986965
 ] 

Ted Yu edited comment on FLINK-6402 at 8/5/17 1:25 AM:
---

Pardon the typo.
The annotation can be dropped .


was (Author: yuzhih...@gmail.com):
Pardon the typo.
The annotation can be dropped.

> Consider removing annotation for REAPER_THREAD_LOCK in 
> SafetyNetCloseableRegistry#doRegister()
> --
>
> Key: FLINK-6402
> URL: https://issues.apache.org/jira/browse/FLINK-6402
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Ted Yu
>Priority: Minor
>
> Here is related code:
> {code}
> PhantomDelegatingCloseableRef phantomRef = new 
> PhantomDelegatingCloseableRef(
> wrappingProxyCloseable,
> this,
> REAPER_THREAD.referenceQueue);
> {code}
> Instantiation of REAPER_THREAD can be made visible by ctor.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7372) Remove ActorGateway from JobGraph

2017-08-04 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7372:


 Summary: Remove ActorGateway from JobGraph
 Key: FLINK-7372
 URL: https://issues.apache.org/jira/browse/FLINK-7372
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Minor


As a preliminary step for easier Flip-6 integration we should try to decouple 
as many components from the underlying RPC abstraction as possible. One of 
these components is the {{JobGraph}} which has a dependency on {{ActorGateway}} 
via its {{JobGraph#uploadUserJars}} method.

I propose to get rid of the {{ActorGateway}} parameter and passing instead the 
BlobServer's address as an {{InetSocketAddress}} instance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks

2017-08-04 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114952#comment-16114952
 ] 

Fabian Hueske commented on FLINK-7245:
--

Thanks a lot for looking into this [~xccui]!

A few comments:
- Even though we are talking about "holding back watermarks", we don't need to 
cache them. It is not necessary to emit exactly the same watermarks as we 
receive. What I meant by "holding back watermarks" is that we cannot 
immediately forward watermarks (as done in 
{{AbstractStreamOperator.processWatermark()}} but instead emit a smaller 
watermark. The emitted watermark must be smaller than the lowest timestamp that 
will be emitted in the future.
- An operator must track the smallest timestamps that will be emitted in the 
future. The future timestamps are those of records which are hold in the state. 
In addition there is a bound which depends on the operator (think of it as a 
window bound), the current watermark, and possibly an additional bound to 
tolerate late data. Imagine you have a 1 hour tumbling window and you want to 
emit records with a timestamp of the first record in that window. In such a 
case, the bound would be the current watermark minus 1 hour (- 30 minutes if 
you accept 30 minute late data).
- Because the semantics of the operator are not know at the 
{{AbstractStreamOperator}} and will depend on user-defined code, we need an 
interface to report future timestamps and emitted timestamps to the operator 
that emits watermarks. All those timestamps must be checkpointed in their 
corresponding keygroup.

Best, Fabian

> Enhance the operators to support holding back watermarks
> 
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>   if (timeServiceManager != null) {
>   timeServiceManager.advanceWatermark(mark);
>   }
>   output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7358) Add implicitly converts support for User-defined function

2017-08-04 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114920#comment-16114920
 ] 

Fabian Hueske commented on FLINK-7358:
--

Thanks [~sunjincheng121]. 
Sounds good to me. We should make sure that we have the same auto-cast 
semantics as SQL via Calcite.

> Add  implicitly converts support for User-defined function
> --
>
> Key: FLINK-7358
> URL: https://issues.apache.org/jira/browse/FLINK-7358
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently if user defined a UDF as follows:
> {code}
> object Func extends ScalarFunction {
>   def eval(a: Int, b: Long): String = {
> ...
>   }
> }
> {code}
> And if the table schema is (a: Int, b: int, c: String), then we can not call 
> the UDF `Func('a, 'b)`. So
> I want add implicitly converts when we call UDF. The implicitly convert rule 
> is:
> BYTE_TYPE_INFO -> SHORT_TYPE_INFO -> INT_TYPE_INFO -> LONG_TYPE_INFO -> 
> FLOAT_TYPE_INFO -> DOUBLE_TYPE_INFO
> *Note:
> In this JIRA. only for TableAPI, And SQL will be fixed in 
> https://issues.apache.org/jira/browse/CALCITE-1908.*
> What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-04 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114891#comment-16114891
 ] 

Fabian Hueske commented on FLINK-7337:
--

Hi [~xccui], no worries about your questions. They are all very valid.

1) The {{ProcessFunction}} copies the {{StreamRecord}} timestamp (this is the 
one set with the assigner) into the {{Row}} and removes the {{StreamRecord}} 
timestamp. So it's an exact copy. The watermarks are not affected by this. 
Watermarks are special records and not directly exposed to {{ProcessFunction}}. 
2) We use the watermarks to trigger computations when the result is assumed to 
be complete. They are a mechanism to control out-of-order data. So, yes. We 
could compute results without watermarks, but then we would have to sent many 
updates because we do not know at which point in time we received enough data 
for a good first result. This is the approach of Kafka Streams.

> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7365) warning of attempt to override final parameter: fs.s3.buffer.dir

2017-08-04 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114629#comment-16114629
 ] 

Bowen Li commented on FLINK-7365:
-

[~StephanEwen] Yes, you are right. It's logging too often and too much. Every 
time a checkpointing is run, the warning logging shows 100+ lines

> warning of attempt to override final parameter: fs.s3.buffer.dir
> 
>
> Key: FLINK-7365
> URL: https://issues.apache.org/jira/browse/FLINK-7365
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>
> I'm seeing hundreds of line of the following log in my JobManager log file:
> {code:java}
> 2017-08-03 19:48:45,330 WARN  org.apache.hadoop.conf.Configuration
>   - /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to 
> override final parameter: fs.s3.buffer.dir;  Ignoring.
> 2017-08-03 19:48:45,485 WARN  org.apache.hadoop.conf.Configuration
>   - /etc/hadoop/conf/core-site.xml:an attempt to override final 
> parameter: fs.s3.buffer.dir;  Ignoring.
> 2017-08-03 19:48:45,486 WARN  org.apache.hadoop.conf.Configuration
>   - /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to 
> override final parameter: fs.s3.buffer.dir;  Ignoring.
> 2017-08-03 19:48:45,626 WARN  org.apache.hadoop.conf.Configuration
>   - /etc/hadoop/conf/core-site.xml:an attempt to override final 
> parameter: fs.s3.buffer.dir;  Ignoring
> ..
> {code}
> Info of my Flink cluster:
> - Running on EMR with emr-5.6.0
> - Using FSStateBackend, writing checkpointing data files to s3
> - Configured s3 with S3AFileSystem according to 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/aws.html#set-s3-filesystem
> - AWS forbids resetting 'fs.s3.buffer.dir' value (it has a  tag on 
> this property in core-site.xml), so I set 'fs.s3a.buffer.dir' as '/tmp'
> Here's my core-site.xml file:
> {code:java}
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>   fs.s3.buffer.dir
>   /mnt/s3,/mnt1/s3
>   true
> 
> 
>   fs.s3.impl
>   org.apache.hadoop.fs.s3a.S3AFileSystem
> 
> 
>   fs.s3n.impl
>   com.amazon.ws.emr.hadoop.fs.EmrFileSystem
> 
>   
> ipc.client.connect.max.retries.on.timeouts
> 5
>   
>   
> hadoop.security.key.default.bitlength
> 256
>   
>   
> hadoop.proxyuser.hadoop.groups
> *
>   
>   
> hadoop.tmp.dir
> /mnt/var/lib/hadoop/tmp
>   
>   
> hadoop.proxyuser.hadoop.hosts
> *
>   
>   
> io.file.buffer.size
> 65536
>   
>   
> fs.AbstractFileSystem.s3.impl
> org.apache.hadoop.fs.s3.EMRFSDelegate
>   
>   
> fs.s3a.buffer.dir
> /tmp
>   
>   
> fs.s3bfs.impl
> org.apache.hadoop.fs.s3.S3FileSystem
>   
> 
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7365) excessive warning logs of attempt to override final parameter: fs.s3.buffer.dir

2017-08-04 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7365:

Summary: excessive warning logs of attempt to override final parameter: 
fs.s3.buffer.dir  (was: warning of attempt to override final parameter: 
fs.s3.buffer.dir)

> excessive warning logs of attempt to override final parameter: 
> fs.s3.buffer.dir
> ---
>
> Key: FLINK-7365
> URL: https://issues.apache.org/jira/browse/FLINK-7365
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>
> I'm seeing hundreds of line of the following log in my JobManager log file:
> {code:java}
> 2017-08-03 19:48:45,330 WARN  org.apache.hadoop.conf.Configuration
>   - /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to 
> override final parameter: fs.s3.buffer.dir;  Ignoring.
> 2017-08-03 19:48:45,485 WARN  org.apache.hadoop.conf.Configuration
>   - /etc/hadoop/conf/core-site.xml:an attempt to override final 
> parameter: fs.s3.buffer.dir;  Ignoring.
> 2017-08-03 19:48:45,486 WARN  org.apache.hadoop.conf.Configuration
>   - /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to 
> override final parameter: fs.s3.buffer.dir;  Ignoring.
> 2017-08-03 19:48:45,626 WARN  org.apache.hadoop.conf.Configuration
>   - /etc/hadoop/conf/core-site.xml:an attempt to override final 
> parameter: fs.s3.buffer.dir;  Ignoring
> ..
> {code}
> Info of my Flink cluster:
> - Running on EMR with emr-5.6.0
> - Using FSStateBackend, writing checkpointing data files to s3
> - Configured s3 with S3AFileSystem according to 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/aws.html#set-s3-filesystem
> - AWS forbids resetting 'fs.s3.buffer.dir' value (it has a  tag on 
> this property in core-site.xml), so I set 'fs.s3a.buffer.dir' as '/tmp'
> Here's my core-site.xml file:
> {code:java}
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>   fs.s3.buffer.dir
>   /mnt/s3,/mnt1/s3
>   true
> 
> 
>   fs.s3.impl
>   org.apache.hadoop.fs.s3a.S3AFileSystem
> 
> 
>   fs.s3n.impl
>   com.amazon.ws.emr.hadoop.fs.EmrFileSystem
> 
>   
> ipc.client.connect.max.retries.on.timeouts
> 5
>   
>   
> hadoop.security.key.default.bitlength
> 256
>   
>   
> hadoop.proxyuser.hadoop.groups
> *
>   
>   
> hadoop.tmp.dir
> /mnt/var/lib/hadoop/tmp
>   
>   
> hadoop.proxyuser.hadoop.hosts
> *
>   
>   
> io.file.buffer.size
> 65536
>   
>   
> fs.AbstractFileSystem.s3.impl
> org.apache.hadoop.fs.s3.EMRFSDelegate
>   
>   
> fs.s3a.buffer.dir
> /tmp
>   
>   
> fs.s3bfs.impl
> org.apache.hadoop.fs.s3.S3FileSystem
>   
> 
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7365) excessive warning logs of attempt to override final parameter: fs.s3.buffer.dir

2017-08-04 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7365:

Description: 
I'm seeing hundreds of line of the following log in my JobManager log file:


{code:java}
2017-08-03 19:48:45,330 WARN  org.apache.hadoop.conf.Configuration  
- /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to override 
final parameter: fs.s3.buffer.dir;  Ignoring.
2017-08-03 19:48:45,485 WARN  org.apache.hadoop.conf.Configuration  
- /etc/hadoop/conf/core-site.xml:an attempt to override final 
parameter: fs.s3.buffer.dir;  Ignoring.
2017-08-03 19:48:45,486 WARN  org.apache.hadoop.conf.Configuration  
- /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to override 
final parameter: fs.s3.buffer.dir;  Ignoring.
2017-08-03 19:48:45,626 WARN  org.apache.hadoop.conf.Configuration  
- /etc/hadoop/conf/core-site.xml:an attempt to override final 
parameter: fs.s3.buffer.dir;  Ignoring
..
{code}

Info of my Flink cluster:

- Running on EMR with emr-5.6.0
- Using FSStateBackend, writing checkpointing data files to s3
- Configured s3 with S3AFileSystem according to 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/aws.html#set-s3-filesystem
- AWS forbids resetting 'fs.s3.buffer.dir' value (it has a  tag on this 
property in core-site.xml), so I set 'fs.s3a.buffer.dir' as '/tmp'

Here's my core-site.xml file:


{code:java}




















  fs.s3.buffer.dir
  /mnt/s3,/mnt1/s3
  true



  fs.s3.impl
  org.apache.hadoop.fs.s3a.S3AFileSystem



  fs.s3n.impl
  com.amazon.ws.emr.hadoop.fs.EmrFileSystem


  
ipc.client.connect.max.retries.on.timeouts
5
  

  
hadoop.security.key.default.bitlength
256
  

  
hadoop.proxyuser.hadoop.groups
*
  

  
hadoop.tmp.dir
/mnt/var/lib/hadoop/tmp
  

  
hadoop.proxyuser.hadoop.hosts
*
  

  
io.file.buffer.size
65536
  

  
fs.AbstractFileSystem.s3.impl
org.apache.hadoop.fs.s3.EMRFSDelegate
  

  
fs.s3a.buffer.dir
/tmp
  

  
fs.s3bfs.impl
org.apache.hadoop.fs.s3.S3FileSystem
  


{code}

This bug is about excessive logging.


  was:
I'm seeing hundreds of line of the following log in my JobManager log file:


{code:java}
2017-08-03 19:48:45,330 WARN  org.apache.hadoop.conf.Configuration  
- /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to override 
final parameter: fs.s3.buffer.dir;  Ignoring.
2017-08-03 19:48:45,485 WARN  org.apache.hadoop.conf.Configuration  
- /etc/hadoop/conf/core-site.xml:an attempt to override final 
parameter: fs.s3.buffer.dir;  Ignoring.
2017-08-03 19:48:45,486 WARN  org.apache.hadoop.conf.Configuration  
- /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to override 
final parameter: fs.s3.buffer.dir;  Ignoring.
2017-08-03 19:48:45,626 WARN  org.apache.hadoop.conf.Configuration  
- /etc/hadoop/conf/core-site.xml:an attempt to override final 
parameter: fs.s3.buffer.dir;  Ignoring
..
{code}

Info of my Flink cluster:

- Running on EMR with emr-5.6.0
- Using FSStateBackend, writing checkpointing data files to s3
- Configured s3 with S3AFileSystem according to 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/aws.html#set-s3-filesystem
- AWS forbids resetting 'fs.s3.buffer.dir' value (it has a  tag on this 
property in core-site.xml), so I set 'fs.s3a.buffer.dir' as '/tmp'

Here's my core-site.xml file:


{code:java}




















  fs.s3.buffer.dir
  /mnt/s3,/mnt1/s3
  true



  fs.s3.impl
  org.apache.hadoop.fs.s3a.S3AFileSystem



  fs.s3n.impl
  com.amazon.ws.emr.hadoop.fs.EmrFileSystem


  
ipc.client.connect.max.retries.on.timeouts
5
  

  
hadoop.security.key.default.bitlength
256
  

  
hadoop.proxyuser.hadoop.groups
*
  

  
hadoop.tmp.dir
/mnt/var/lib/hadoop/tmp
  

  
hadoop.proxyuser.hadoop.hosts
*
  

  
io.file.buffer.size
65536
  

  
fs.AbstractFileSystem.s3.impl
org.apache.hadoop.fs.s3.EMRFSDelegate
  

  
fs.s3a.buffer.dir
/tmp
  

  
fs.s3bfs.impl
org.apache.hadoop.fs.s3.S3FileSystem
  


{code}





> excessive warning logs of attempt to override final parameter: 
> fs.s3.buffer.dir
> ---
>
> Key: FLINK-7365
> URL: https://issues.apache.org/jira/browse/FLINK-7365
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>
> I'm seeing hundreds of line of the following log in my JobManager log file:
> {code:java}
> 2017-08-03 19:48:45,330 WARN  org.apache.hadoop.conf.Configuration
>   - 

[GitHub] flink pull request #4234: [FLINK-7053][blob] improve code quality in some te...

2017-08-04 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4234#discussion_r131431976
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 ---
@@ -107,195 +96,89 @@ public static void stopServers() throws IOException {
if (BLOB_SSL_SERVER != null) {
BLOB_SSL_SERVER.close();
}
+   }
 
-   if (BLOB_SERVER != null) {
-   BLOB_SERVER.close();
-   }
+   protected Configuration getBlobClientConfig() {
+   return sslClientConfig;
+   }
+
+   protected BlobServer getBlobServer() {
+   return BLOB_SSL_SERVER;
}
 
/**
-* Prepares a test file for the unit tests, i.e. the methods fills the 
file with a particular byte patterns and
-* computes the file's BLOB key.
-*
-* @param file
-*the file to prepare for the unit tests
-* @return the BLOB key of the prepared file
-* @throws IOException
-* thrown if an I/O error occurs while writing to the test file
+* Verify ssl client to ssl server upload
 */
-   private static BlobKey prepareTestFile(File file) throws IOException {
-
-   MessageDigest md = BlobUtils.createMessageDigest();
-
-   final byte[] buf = new byte[TEST_BUFFER_SIZE];
-   for (int i = 0; i < buf.length; ++i) {
-   buf[i] = (byte) (i % 128);
-   }
-
-   FileOutputStream fos = null;
-   try {
-   fos = new FileOutputStream(file);
-
-   for (int i = 0; i < 20; ++i) {
-   fos.write(buf);
-   md.update(buf);
-   }
-
-   } finally {
-   if (fos != null) {
-   fos.close();
-   }
-   }
-
-   return new BlobKey(md.digest());
+   @Test
+   public void testUploadJarFilesHelper() throws Exception {
+   uploadJarFile(BLOB_SSL_SERVER, sslClientConfig);
}
 
/**
-* Validates the result of a GET operation by comparing the data from 
the retrieved input stream to the content of
-* the specified file.
-*
-* @param inputStream
-*the input stream returned from the GET operation
-* @param file
-*the file to compare the input stream's data to
-* @throws IOException
-* thrown if an I/O error occurs while reading the input stream 
or the file
+* Verify ssl client to non-ssl server failure
 */
-   private static void validateGet(final InputStream inputStream, final 
File file) throws IOException {
-
-   InputStream inputStream2 = null;
-   try {
-
-   inputStream2 = new FileInputStream(file);
-
-   while (true) {
-
-   final int r1 = inputStream.read();
-   final int r2 = inputStream2.read();
-
-   assertEquals(r2, r1);
-
-   if (r1 < 0) {
-   break;
-   }
-   }
-
-   } finally {
-   if (inputStream2 != null) {
-   inputStream2.close();
-   }
-   }
-
+   @Test(expected = IOException.class)
+   public void testSSLClientFailure() throws Exception {
+   // SSL client connected to non-ssl server
+   uploadJarFile(BLOB_SERVER, sslClientConfig);
}
 
/**
-* Tests the PUT/GET operations for content-addressable streams.
+* Verify ssl client to non-ssl server failure
 */
-   @Test
-   public void testContentAddressableStream() {
-
-   BlobClient client = null;
-   InputStream is = null;
-
-   try {
-   File testFile = File.createTempFile("testfile", ".dat");
-   testFile.deleteOnExit();
-
-   BlobKey origKey = prepareTestFile(testFile);
-
-   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SSL_SERVER.getPort());
-   client = new BlobClient(serverAddress, sslClientConfig);
-
-   // Store the data
-   is = new FileInputStream(testFile);
-   BlobKey receivedKey = client.put(is);
-   assertEquals(origKey, receivedKey);
-
-   is.close();
-   is = 

[jira] [Commented] (FLINK-7053) improve code quality in some tests

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114595#comment-16114595
 ] 

ASF GitHub Bot commented on FLINK-7053:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4234#discussion_r131431976
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 ---
@@ -107,195 +96,89 @@ public static void stopServers() throws IOException {
if (BLOB_SSL_SERVER != null) {
BLOB_SSL_SERVER.close();
}
+   }
 
-   if (BLOB_SERVER != null) {
-   BLOB_SERVER.close();
-   }
+   protected Configuration getBlobClientConfig() {
+   return sslClientConfig;
+   }
+
+   protected BlobServer getBlobServer() {
+   return BLOB_SSL_SERVER;
}
 
/**
-* Prepares a test file for the unit tests, i.e. the methods fills the 
file with a particular byte patterns and
-* computes the file's BLOB key.
-*
-* @param file
-*the file to prepare for the unit tests
-* @return the BLOB key of the prepared file
-* @throws IOException
-* thrown if an I/O error occurs while writing to the test file
+* Verify ssl client to ssl server upload
 */
-   private static BlobKey prepareTestFile(File file) throws IOException {
-
-   MessageDigest md = BlobUtils.createMessageDigest();
-
-   final byte[] buf = new byte[TEST_BUFFER_SIZE];
-   for (int i = 0; i < buf.length; ++i) {
-   buf[i] = (byte) (i % 128);
-   }
-
-   FileOutputStream fos = null;
-   try {
-   fos = new FileOutputStream(file);
-
-   for (int i = 0; i < 20; ++i) {
-   fos.write(buf);
-   md.update(buf);
-   }
-
-   } finally {
-   if (fos != null) {
-   fos.close();
-   }
-   }
-
-   return new BlobKey(md.digest());
+   @Test
+   public void testUploadJarFilesHelper() throws Exception {
+   uploadJarFile(BLOB_SSL_SERVER, sslClientConfig);
}
 
/**
-* Validates the result of a GET operation by comparing the data from 
the retrieved input stream to the content of
-* the specified file.
-*
-* @param inputStream
-*the input stream returned from the GET operation
-* @param file
-*the file to compare the input stream's data to
-* @throws IOException
-* thrown if an I/O error occurs while reading the input stream 
or the file
+* Verify ssl client to non-ssl server failure
 */
-   private static void validateGet(final InputStream inputStream, final 
File file) throws IOException {
-
-   InputStream inputStream2 = null;
-   try {
-
-   inputStream2 = new FileInputStream(file);
-
-   while (true) {
-
-   final int r1 = inputStream.read();
-   final int r2 = inputStream2.read();
-
-   assertEquals(r2, r1);
-
-   if (r1 < 0) {
-   break;
-   }
-   }
-
-   } finally {
-   if (inputStream2 != null) {
-   inputStream2.close();
-   }
-   }
-
+   @Test(expected = IOException.class)
+   public void testSSLClientFailure() throws Exception {
+   // SSL client connected to non-ssl server
+   uploadJarFile(BLOB_SERVER, sslClientConfig);
}
 
/**
-* Tests the PUT/GET operations for content-addressable streams.
+* Verify ssl client to non-ssl server failure
 */
-   @Test
-   public void testContentAddressableStream() {
-
-   BlobClient client = null;
-   InputStream is = null;
-
-   try {
-   File testFile = File.createTempFile("testfile", ".dat");
-   testFile.deleteOnExit();
-
-   BlobKey origKey = prepareTestFile(testFile);
-
-   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SSL_SERVER.getPort());
-   client = new BlobClient(serverAddress, sslClientConfig);
-
-   // Store the data
-  

[jira] [Commented] (FLINK-7053) improve code quality in some tests

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114593#comment-16114593
 ] 

ASF GitHub Bot commented on FLINK-7053:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4234#discussion_r131431731
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 ---
@@ -107,195 +96,89 @@ public static void stopServers() throws IOException {
if (BLOB_SSL_SERVER != null) {
BLOB_SSL_SERVER.close();
}
--- End diff --

good catch


> improve code quality in some tests
> --
>
> Key: FLINK-7053
> URL: https://issues.apache.org/jira/browse/FLINK-7053
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> * {{BlobClientTest}} and {{BlobClientSslTest}} share a lot of common code
> * the received buffers there are currently not verified for being equal to 
> the expected one
> * {{TemporaryFolder}} should be used throughout blob store tests



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7053) improve code quality in some tests

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114592#comment-16114592
 ] 

ASF GitHub Bot commented on FLINK-7053:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4234#discussion_r131431720
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
 ---
@@ -92,15 +100,15 @@ private void uploadFileGetTest(final Configuration 
config, boolean cacheWorksWit
BlobCache blobCache = null;
BlobStoreService blobStoreService = null;
try {
-   final Configuration cacheConfig;
-   if (cacheHasAccessToFs) {
-   cacheConfig = config;
-   } else {
-   // just in case parameters are still read from 
the server,
-   // create a separate configuration object for 
the cache
-   cacheConfig = new Configuration(config);
+   final Configuration cacheConfig = new 
Configuration(config);
+   
cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   if (!cacheHasAccessToFs) {
+   // make sure the cache cannot access the HA 
store directly
+   
cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   
temporaryFolder.newFolder().getAbsolutePath());
--- End diff --

seems that it wasn't used to its full extension - also the meaning of this 
variable was not what clear


> improve code quality in some tests
> --
>
> Key: FLINK-7053
> URL: https://issues.apache.org/jira/browse/FLINK-7053
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> * {{BlobClientTest}} and {{BlobClientSslTest}} share a lot of common code
> * the received buffers there are currently not verified for being equal to 
> the expected one
> * {{TemporaryFolder}} should be used throughout blob store tests



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4234: [FLINK-7053][blob] improve code quality in some te...

2017-08-04 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4234#discussion_r131431720
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
 ---
@@ -92,15 +100,15 @@ private void uploadFileGetTest(final Configuration 
config, boolean cacheWorksWit
BlobCache blobCache = null;
BlobStoreService blobStoreService = null;
try {
-   final Configuration cacheConfig;
-   if (cacheHasAccessToFs) {
-   cacheConfig = config;
-   } else {
-   // just in case parameters are still read from 
the server,
-   // create a separate configuration object for 
the cache
-   cacheConfig = new Configuration(config);
+   final Configuration cacheConfig = new 
Configuration(config);
+   
cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   if (!cacheHasAccessToFs) {
+   // make sure the cache cannot access the HA 
store directly
+   
cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   
temporaryFolder.newFolder().getAbsolutePath());
--- End diff --

seems that it wasn't used to its full extension - also the meaning of this 
variable was not what clear


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4234: [FLINK-7053][blob] improve code quality in some te...

2017-08-04 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4234#discussion_r131431731
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 ---
@@ -107,195 +96,89 @@ public static void stopServers() throws IOException {
if (BLOB_SSL_SERVER != null) {
BLOB_SSL_SERVER.close();
}
--- End diff --

good catch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114589#comment-16114589
 ] 

ASF GitHub Bot commented on FLINK-7213:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4353
  
I had a very rough look at it, and the conceptual rework looks very good.

This would need a detailed pass over the code changes, though, since it 
touches very sensitive code...


> Introduce state management by OperatorID in TaskManager
> ---
>
> Key: FLINK-7213
> URL: https://issues.apache.org/jira/browse/FLINK-7213
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4353: [FLINK-7213] Introduce state management by OperatorID in ...

2017-08-04 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4353
  
I had a very rough look at it, and the conceptual rework looks very good.

This would need a detailed pass over the code changes, though, since it 
touches very sensitive code...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3347) TaskManager (or its ActorSystem) need to restart in case they notice quarantine

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114576#comment-16114576
 ] 

ASF GitHub Bot commented on FLINK-3347:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4478


> TaskManager (or its ActorSystem) need to restart in case they notice 
> quarantine
> ---
>
> Key: FLINK-3347
> URL: https://issues.apache.org/jira/browse/FLINK-3347
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.0.0, 1.1.4, 1.3.0, 1.2.1
>
>
> There are cases where Akka quarantines remote actor systems. In that case, no 
> further communication is possible with that actor system unless one of the 
> two actor systems is restarted.
> The result is that a TaskManager is up and available, but cannot register at 
> the JobManager (Akka refuses connection because of the quarantined state), 
> making the TaskManager a useless process.
> I suggest to let the TaskManager restart itself once it notices that either 
> it quarantined the JobManager, or the JobManager quarantined it.
> It is possible to recognize that by listening to certain events in the actor 
> system event stream: 
> http://stackoverflow.com/questions/32471088/akka-cluster-detecting-quarantined-state



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4478: [hotfix][docs] add documentation for `taskmanager....

2017-08-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4478


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4478: [hotfix][docs] add documentation for `taskmanager.exit-on...

2017-08-04 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4478
  
Thanks for the fix @NicoK. Merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6995) Add a warning to outdated documentation

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114503#comment-16114503
 ] 

ASF GitHub Bot commented on FLINK-6995:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4480
  
Thanks to @alpinegizmo for review ~


> Add a warning to outdated documentation
> ---
>
> Key: FLINK-6995
> URL: https://issues.apache.org/jira/browse/FLINK-6995
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Timo Walther
>Assignee: mingleizhang
>
> When I search for "flink yarn" by Google, the first result is a outdated 0.8 
> release documentation page. We should add a warning to outdated documentation 
> pages.
> There are other problems as well:
> The main page only links to 1.3 and 1.4 but the flink-docs-master 
> documentation links to 1.3, 1.2, 1.1, and 1.0. But each of those packages 
> only links to older releases so if a user arrives on a 1.2 page they won't 
> see 1.3.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4480: [FLINK-6995] [docs] Enable is_latest attribute to false

2017-08-04 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4480
  
Thanks to @alpinegizmo for review ~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7364) Log exceptions from user code in streaming jobs

2017-08-04 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114473#comment-16114473
 ] 

Stephan Ewen commented on FLINK-7364:
-

Can you share some sample code where this happen?

So far, we were logging exceptions a lot (almost too much for some users), 
typically on the TaskManager, the JobManager's execution graph, and the Client.

I am a bit puzzled why your exception is not logged...

> Log exceptions from user code in streaming jobs
> ---
>
> Key: FLINK-7364
> URL: https://issues.apache.org/jira/browse/FLINK-7364
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.1
>Reporter: Elias Levy
>
> Currently, if an exception arises in user supplied code within an operator in 
> a streaming job, Flink terminates the job, but it fails to record the reason 
> for the termination.  The logs do not record that there was an exception at 
> all, much less recording the type of exception and where it occurred.  This 
> makes it difficult to debug jobs without implementing exception recording 
> code on all user supplied operators. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7365) warning of attempt to override final parameter: fs.s3.buffer.dir

2017-08-04 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114469#comment-16114469
 ] 

Stephan Ewen commented on FLINK-7365:
-

Do I understand correctly that the issue is that it is logged too often?
Or do you see another issue?

> warning of attempt to override final parameter: fs.s3.buffer.dir
> 
>
> Key: FLINK-7365
> URL: https://issues.apache.org/jira/browse/FLINK-7365
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>
> I'm seeing hundreds of line of the following log in my JobManager log file:
> {code:java}
> 2017-08-03 19:48:45,330 WARN  org.apache.hadoop.conf.Configuration
>   - /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to 
> override final parameter: fs.s3.buffer.dir;  Ignoring.
> 2017-08-03 19:48:45,485 WARN  org.apache.hadoop.conf.Configuration
>   - /etc/hadoop/conf/core-site.xml:an attempt to override final 
> parameter: fs.s3.buffer.dir;  Ignoring.
> 2017-08-03 19:48:45,486 WARN  org.apache.hadoop.conf.Configuration
>   - /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to 
> override final parameter: fs.s3.buffer.dir;  Ignoring.
> 2017-08-03 19:48:45,626 WARN  org.apache.hadoop.conf.Configuration
>   - /etc/hadoop/conf/core-site.xml:an attempt to override final 
> parameter: fs.s3.buffer.dir;  Ignoring
> ..
> {code}
> Info of my Flink cluster:
> - Running on EMR with emr-5.6.0
> - Using FSStateBackend, writing checkpointing data files to s3
> - Configured s3 with S3AFileSystem according to 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/aws.html#set-s3-filesystem
> - AWS forbids resetting 'fs.s3.buffer.dir' value (it has a  tag on 
> this property in core-site.xml), so I set 'fs.s3a.buffer.dir' as '/tmp'
> Here's my core-site.xml file:
> {code:java}
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>   fs.s3.buffer.dir
>   /mnt/s3,/mnt1/s3
>   true
> 
> 
>   fs.s3.impl
>   org.apache.hadoop.fs.s3a.S3AFileSystem
> 
> 
>   fs.s3n.impl
>   com.amazon.ws.emr.hadoop.fs.EmrFileSystem
> 
>   
> ipc.client.connect.max.retries.on.timeouts
> 5
>   
>   
> hadoop.security.key.default.bitlength
> 256
>   
>   
> hadoop.proxyuser.hadoop.groups
> *
>   
>   
> hadoop.tmp.dir
> /mnt/var/lib/hadoop/tmp
>   
>   
> hadoop.proxyuser.hadoop.hosts
> *
>   
>   
> io.file.buffer.size
> 65536
>   
>   
> fs.AbstractFileSystem.s3.impl
> org.apache.hadoop.fs.s3.EMRFSDelegate
>   
>   
> fs.s3a.buffer.dir
> /tmp
>   
>   
> fs.s3bfs.impl
> org.apache.hadoop.fs.s3.S3FileSystem
>   
> 
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7309) NullPointerException in CodeGenUtils.timePointToInternalCode() generated code

2017-08-04 Thread Yueting Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114463#comment-16114463
 ] 

Yueting Chen commented on FLINK-7309:
-

Hi [~llchen], I saw this JIRA today and I think you're right. So I opened a 
pull request to fix this issue. Feel free to review it!

> NullPointerException in CodeGenUtils.timePointToInternalCode() generated code
> -
>
> Key: FLINK-7309
> URL: https://issues.apache.org/jira/browse/FLINK-7309
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime, Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Liangliang Chen
>Priority: Critical
>
> The code generated by CodeGenUtils.timePointToInternalCode() will cause a 
> NullPointerException when SQL table field type is `TIMESTAMP` and the field 
> value is `null`.
> Example for reproduce:
> {code}
> object StreamSQLExample {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> // null field value
> val orderA: DataStream[Order] = env.fromCollection(Seq(
>   Order(null, "beer", 3)))
>   
> tEnv.registerDataStream("OrderA", orderA, 'ts, 'product, 'amount)
> val result = tEnv.sql("SELECT * FROM OrderA")
> result.toAppendStream[Order].print()
> 
> env.execute()
>   }
>   case class Order(ts: Timestamp, product: String, amount: Int)
> }
> {code}
> In the above example, timePointToInternalCode() will generated some 
> statements like this:
> {code}
> ...
>   long result$1 = 
> org.apache.calcite.runtime.SqlFunctions.toLong((java.sql.Timestamp) in1.ts());
>   boolean isNull$2 = (java.sql.Timestamp) in1.ts() == null;
> ...
> {code}
> so, the NPE will happen when in1.ts() is null.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7053) improve code quality in some tests

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114443#comment-16114443
 ] 

ASF GitHub Bot commented on FLINK-7053:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4234#discussion_r131402370
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 ---
@@ -107,195 +96,89 @@ public static void stopServers() throws IOException {
if (BLOB_SSL_SERVER != null) {
BLOB_SSL_SERVER.close();
}
+   }
 
-   if (BLOB_SERVER != null) {
-   BLOB_SERVER.close();
-   }
+   protected Configuration getBlobClientConfig() {
+   return sslClientConfig;
+   }
+
+   protected BlobServer getBlobServer() {
+   return BLOB_SSL_SERVER;
}
 
/**
-* Prepares a test file for the unit tests, i.e. the methods fills the 
file with a particular byte patterns and
-* computes the file's BLOB key.
-*
-* @param file
-*the file to prepare for the unit tests
-* @return the BLOB key of the prepared file
-* @throws IOException
-* thrown if an I/O error occurs while writing to the test file
+* Verify ssl client to ssl server upload
 */
-   private static BlobKey prepareTestFile(File file) throws IOException {
-
-   MessageDigest md = BlobUtils.createMessageDigest();
-
-   final byte[] buf = new byte[TEST_BUFFER_SIZE];
-   for (int i = 0; i < buf.length; ++i) {
-   buf[i] = (byte) (i % 128);
-   }
-
-   FileOutputStream fos = null;
-   try {
-   fos = new FileOutputStream(file);
-
-   for (int i = 0; i < 20; ++i) {
-   fos.write(buf);
-   md.update(buf);
-   }
-
-   } finally {
-   if (fos != null) {
-   fos.close();
-   }
-   }
-
-   return new BlobKey(md.digest());
+   @Test
+   public void testUploadJarFilesHelper() throws Exception {
+   uploadJarFile(BLOB_SSL_SERVER, sslClientConfig);
}
 
/**
-* Validates the result of a GET operation by comparing the data from 
the retrieved input stream to the content of
-* the specified file.
-*
-* @param inputStream
-*the input stream returned from the GET operation
-* @param file
-*the file to compare the input stream's data to
-* @throws IOException
-* thrown if an I/O error occurs while reading the input stream 
or the file
+* Verify ssl client to non-ssl server failure
 */
-   private static void validateGet(final InputStream inputStream, final 
File file) throws IOException {
-
-   InputStream inputStream2 = null;
-   try {
-
-   inputStream2 = new FileInputStream(file);
-
-   while (true) {
-
-   final int r1 = inputStream.read();
-   final int r2 = inputStream2.read();
-
-   assertEquals(r2, r1);
-
-   if (r1 < 0) {
-   break;
-   }
-   }
-
-   } finally {
-   if (inputStream2 != null) {
-   inputStream2.close();
-   }
-   }
-
+   @Test(expected = IOException.class)
+   public void testSSLClientFailure() throws Exception {
+   // SSL client connected to non-ssl server
+   uploadJarFile(BLOB_SERVER, sslClientConfig);
}
 
/**
-* Tests the PUT/GET operations for content-addressable streams.
+* Verify ssl client to non-ssl server failure
 */
-   @Test
-   public void testContentAddressableStream() {
-
-   BlobClient client = null;
-   InputStream is = null;
-
-   try {
-   File testFile = File.createTempFile("testfile", ".dat");
-   testFile.deleteOnExit();
-
-   BlobKey origKey = prepareTestFile(testFile);
-
-   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SSL_SERVER.getPort());
-   client = new BlobClient(serverAddress, sslClientConfig);
-
-   // Store the data
-   

[jira] [Commented] (FLINK-7053) improve code quality in some tests

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1611#comment-1611
 ] 

ASF GitHub Bot commented on FLINK-7053:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4234#discussion_r131401070
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
 ---
@@ -92,15 +100,15 @@ private void uploadFileGetTest(final Configuration 
config, boolean cacheWorksWit
BlobCache blobCache = null;
BlobStoreService blobStoreService = null;
try {
-   final Configuration cacheConfig;
-   if (cacheHasAccessToFs) {
-   cacheConfig = config;
-   } else {
-   // just in case parameters are still read from 
the server,
-   // create a separate configuration object for 
the cache
-   cacheConfig = new Configuration(config);
+   final Configuration cacheConfig = new 
Configuration(config);
+   
cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   if (!cacheHasAccessToFs) {
+   // make sure the cache cannot access the HA 
store directly
+   
cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   
temporaryFolder.newFolder().getAbsolutePath());
--- End diff --

isn't this redundant?


> improve code quality in some tests
> --
>
> Key: FLINK-7053
> URL: https://issues.apache.org/jira/browse/FLINK-7053
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> * {{BlobClientTest}} and {{BlobClientSslTest}} share a lot of common code
> * the received buffers there are currently not verified for being equal to 
> the expected one
> * {{TemporaryFolder}} should be used throughout blob store tests



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7053) improve code quality in some tests

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114445#comment-16114445
 ] 

ASF GitHub Bot commented on FLINK-7053:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4234#discussion_r131401420
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 ---
@@ -107,195 +96,89 @@ public static void stopServers() throws IOException {
if (BLOB_SSL_SERVER != null) {
BLOB_SSL_SERVER.close();
}
--- End diff --

What about shutting the non ssl blob server down?


> improve code quality in some tests
> --
>
> Key: FLINK-7053
> URL: https://issues.apache.org/jira/browse/FLINK-7053
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> * {{BlobClientTest}} and {{BlobClientSslTest}} share a lot of common code
> * the received buffers there are currently not verified for being equal to 
> the expected one
> * {{TemporaryFolder}} should be used throughout blob store tests



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4234: [FLINK-7053][blob] improve code quality in some te...

2017-08-04 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4234#discussion_r131401070
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
 ---
@@ -92,15 +100,15 @@ private void uploadFileGetTest(final Configuration 
config, boolean cacheWorksWit
BlobCache blobCache = null;
BlobStoreService blobStoreService = null;
try {
-   final Configuration cacheConfig;
-   if (cacheHasAccessToFs) {
-   cacheConfig = config;
-   } else {
-   // just in case parameters are still read from 
the server,
-   // create a separate configuration object for 
the cache
-   cacheConfig = new Configuration(config);
+   final Configuration cacheConfig = new 
Configuration(config);
+   
cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   if (!cacheHasAccessToFs) {
+   // make sure the cache cannot access the HA 
store directly
+   
cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   
temporaryFolder.newFolder().getAbsolutePath());
--- End diff --

isn't this redundant?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4234: [FLINK-7053][blob] improve code quality in some te...

2017-08-04 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4234#discussion_r131401420
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 ---
@@ -107,195 +96,89 @@ public static void stopServers() throws IOException {
if (BLOB_SSL_SERVER != null) {
BLOB_SSL_SERVER.close();
}
--- End diff --

What about shutting the non ssl blob server down?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4234: [FLINK-7053][blob] improve code quality in some te...

2017-08-04 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4234#discussion_r131402370
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 ---
@@ -107,195 +96,89 @@ public static void stopServers() throws IOException {
if (BLOB_SSL_SERVER != null) {
BLOB_SSL_SERVER.close();
}
+   }
 
-   if (BLOB_SERVER != null) {
-   BLOB_SERVER.close();
-   }
+   protected Configuration getBlobClientConfig() {
+   return sslClientConfig;
+   }
+
+   protected BlobServer getBlobServer() {
+   return BLOB_SSL_SERVER;
}
 
/**
-* Prepares a test file for the unit tests, i.e. the methods fills the 
file with a particular byte patterns and
-* computes the file's BLOB key.
-*
-* @param file
-*the file to prepare for the unit tests
-* @return the BLOB key of the prepared file
-* @throws IOException
-* thrown if an I/O error occurs while writing to the test file
+* Verify ssl client to ssl server upload
 */
-   private static BlobKey prepareTestFile(File file) throws IOException {
-
-   MessageDigest md = BlobUtils.createMessageDigest();
-
-   final byte[] buf = new byte[TEST_BUFFER_SIZE];
-   for (int i = 0; i < buf.length; ++i) {
-   buf[i] = (byte) (i % 128);
-   }
-
-   FileOutputStream fos = null;
-   try {
-   fos = new FileOutputStream(file);
-
-   for (int i = 0; i < 20; ++i) {
-   fos.write(buf);
-   md.update(buf);
-   }
-
-   } finally {
-   if (fos != null) {
-   fos.close();
-   }
-   }
-
-   return new BlobKey(md.digest());
+   @Test
+   public void testUploadJarFilesHelper() throws Exception {
+   uploadJarFile(BLOB_SSL_SERVER, sslClientConfig);
}
 
/**
-* Validates the result of a GET operation by comparing the data from 
the retrieved input stream to the content of
-* the specified file.
-*
-* @param inputStream
-*the input stream returned from the GET operation
-* @param file
-*the file to compare the input stream's data to
-* @throws IOException
-* thrown if an I/O error occurs while reading the input stream 
or the file
+* Verify ssl client to non-ssl server failure
 */
-   private static void validateGet(final InputStream inputStream, final 
File file) throws IOException {
-
-   InputStream inputStream2 = null;
-   try {
-
-   inputStream2 = new FileInputStream(file);
-
-   while (true) {
-
-   final int r1 = inputStream.read();
-   final int r2 = inputStream2.read();
-
-   assertEquals(r2, r1);
-
-   if (r1 < 0) {
-   break;
-   }
-   }
-
-   } finally {
-   if (inputStream2 != null) {
-   inputStream2.close();
-   }
-   }
-
+   @Test(expected = IOException.class)
+   public void testSSLClientFailure() throws Exception {
+   // SSL client connected to non-ssl server
+   uploadJarFile(BLOB_SERVER, sslClientConfig);
}
 
/**
-* Tests the PUT/GET operations for content-addressable streams.
+* Verify ssl client to non-ssl server failure
 */
-   @Test
-   public void testContentAddressableStream() {
-
-   BlobClient client = null;
-   InputStream is = null;
-
-   try {
-   File testFile = File.createTempFile("testfile", ".dat");
-   testFile.deleteOnExit();
-
-   BlobKey origKey = prepareTestFile(testFile);
-
-   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SSL_SERVER.getPort());
-   client = new BlobClient(serverAddress, sslClientConfig);
-
-   // Store the data
-   is = new FileInputStream(testFile);
-   BlobKey receivedKey = client.put(is);
-   assertEquals(origKey, receivedKey);
-
-   is.close();
-   is 

[jira] [Commented] (FLINK-5178) allow BlobCache to use a distributed file system irrespective of the HA mode

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114431#comment-16114431
 ] 

ASF GitHub Bot commented on FLINK-5178:
---

Github user NicoK closed the pull request at:

https://github.com/apache/flink/pull/3085


> allow BlobCache to use a distributed file system irrespective of the HA mode
> 
>
> Key: FLINK-5178
> URL: https://issues.apache.org/jira/browse/FLINK-5178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> After FLINK-5129, high availability (HA) mode adds the ability for the 
> BlobCache instances at the task managers to download blobs directly from the 
> distributed file system. It would be nice if this also worked in non-HA mode.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5178) allow BlobCache to use a distributed file system irrespective of the HA mode

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114430#comment-16114430
 ] 

ASF GitHub Bot commented on FLINK-5178:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3085
  
no, this is not part of FLIP-19


> allow BlobCache to use a distributed file system irrespective of the HA mode
> 
>
> Key: FLINK-5178
> URL: https://issues.apache.org/jira/browse/FLINK-5178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> After FLINK-5129, high availability (HA) mode adds the ability for the 
> BlobCache instances at the task managers to download blobs directly from the 
> distributed file system. It would be nice if this also worked in non-HA mode.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3085: [FLINK-5178] allow BlobCache to use a distributed file sy...

2017-08-04 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3085
  
no, this is not part of FLIP-19


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3085: [FLINK-5178] allow BlobCache to use a distributed ...

2017-08-04 Thread NicoK
Github user NicoK closed the pull request at:

https://github.com/apache/flink/pull/3085


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7316) always use off-heap network buffers

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114405#comment-16114405
 ] 

ASF GitHub Bot commented on FLINK-7316:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4481

[FLINK-7316][network] always use off-heap network buffers

## What is the purpose of the change

For now, network buffers may be on-heap or off-heap along with Flink memory 
settings. As a step towards passing our own (off-heap) buffers through netty to 
avoid unnecessary buffer copies, we make network buffers always off-heap

## Brief change log

- always use off-heap buffers for the `NetworkBufferPool`
- move `memoryType` from `NetworkEnvironmentConfiguration` to 
`TaskManagerServicesConfiguration`
- adapt heap size calculations in bash scripts and Java source code

## Verifying this change

This change is already covered by existing tests, such as: 
`TaskManagerServicesTest` for the heap szie calculations; tests under 
`flink/runtime/io/network` for most other aspects of the direct use of network 
buffers, especially `flink/runtime/io/network/buffer`; all integration tests 
with a full stack and non-local communication.

Actually, we even increase the test coverage since most network buffer 
tests only tested on-heap so far which now does not exist anymore. These tests 
now cover the only existing option: off-heap network buffers.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (yes - as in 
the network communication part)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes - memory settings, but 
they effectively do not change except for the network buffers being off-heap 
now)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (docs, JavaDocs)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-7316

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4481.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4481


commit d87206435cabf3bf29560083b639077b103708b8
Author: Nico Kruber 
Date:   2017-07-31T10:06:14Z

[hotfix] fix some typos

commit 4a46e615f38c3dde41d465ec3e26426dbf14df80
Author: Nico Kruber 
Date:   2017-08-02T09:34:54Z

[FLINK-7310][core] always use the HybridMemorySegment

Since we'd like to use our own off-heap buffers for network communication, 
we
cannot use HeapMemorySegment anymore and need to rely on 
HybridMemorySegment.
We thus drop any code that loads the HeapMemorySegment (it is still 
available
if needed) in favour of the HybridMemorySegment which is able to work on 
both
heap and off-heap memory.

For the performance penalty of this change compared to using 
HeapMemorySegment
alone, see this interesting blob article (from 2015):
https://flink.apache.org/news/2015/09/16/off-heap-memory.html

commit 1d838c82cef412a8ec143308e20a4d0d7882f3e8
Author: Nico Kruber 
Date:   2017-08-02T09:35:16Z

[hotfix][tests] add missing test descriptions

commit 70b0985a62082766498e847f7a4f25e84b6c1f06
Author: Nico Kruber 
Date:   2017-08-02T09:27:49Z

[hotfix][core] add additional final methods in final classes

This applies the scheme of HeapMemorySegment to HybridMemorySegment where 
core
methods are also marked "final" to be more future-proof.

commit bedf14708b7aba88761f05c19abaf7f26d16dd20
Author: Nico Kruber 
Date:   2017-08-04T13:15:32Z

[FLINK-7312][checkstyle] remove trailing whitespace

commit 67e37971a4f8d5c40290e7a9c8ae2e6a2e1deb68
Author: Nico Kruber 
Date:   2017-08-04T13:20:28Z

[FLINK-7312][checkstyle] organise imports

commit d8657c8f02ca16af1f9e08621a8f73adb5d26959
Author: Nico Kruber 
Date:   2017-08-04T13:24:16Z

[FLINK-7312][checkstyle] add, adapt and improve comments

commit 654b599569e3a3e5ac063d253311b67652a33c1d
Author: Nico Kruber 
Date:   2017-08-04T13:26:40Z

[FLINK-7312][checkstyle] remove redundant "public" keyword in interfaces

commit 7117de1adeefd624ae958370d9614162d18bd9ed
Author: Nico Kruber 

[GitHub] flink pull request #4481: [FLINK-7316][network] always use off-heap network ...

2017-08-04 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4481

[FLINK-7316][network] always use off-heap network buffers

## What is the purpose of the change

For now, network buffers may be on-heap or off-heap along with Flink memory 
settings. As a step towards passing our own (off-heap) buffers through netty to 
avoid unnecessary buffer copies, we make network buffers always off-heap

## Brief change log

- always use off-heap buffers for the `NetworkBufferPool`
- move `memoryType` from `NetworkEnvironmentConfiguration` to 
`TaskManagerServicesConfiguration`
- adapt heap size calculations in bash scripts and Java source code

## Verifying this change

This change is already covered by existing tests, such as: 
`TaskManagerServicesTest` for the heap szie calculations; tests under 
`flink/runtime/io/network` for most other aspects of the direct use of network 
buffers, especially `flink/runtime/io/network/buffer`; all integration tests 
with a full stack and non-local communication.

Actually, we even increase the test coverage since most network buffer 
tests only tested on-heap so far which now does not exist anymore. These tests 
now cover the only existing option: off-heap network buffers.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (yes - as in 
the network communication part)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes - memory settings, but 
they effectively do not change except for the network buffers being off-heap 
now)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (docs, JavaDocs)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-7316

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4481.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4481


commit d87206435cabf3bf29560083b639077b103708b8
Author: Nico Kruber 
Date:   2017-07-31T10:06:14Z

[hotfix] fix some typos

commit 4a46e615f38c3dde41d465ec3e26426dbf14df80
Author: Nico Kruber 
Date:   2017-08-02T09:34:54Z

[FLINK-7310][core] always use the HybridMemorySegment

Since we'd like to use our own off-heap buffers for network communication, 
we
cannot use HeapMemorySegment anymore and need to rely on 
HybridMemorySegment.
We thus drop any code that loads the HeapMemorySegment (it is still 
available
if needed) in favour of the HybridMemorySegment which is able to work on 
both
heap and off-heap memory.

For the performance penalty of this change compared to using 
HeapMemorySegment
alone, see this interesting blob article (from 2015):
https://flink.apache.org/news/2015/09/16/off-heap-memory.html

commit 1d838c82cef412a8ec143308e20a4d0d7882f3e8
Author: Nico Kruber 
Date:   2017-08-02T09:35:16Z

[hotfix][tests] add missing test descriptions

commit 70b0985a62082766498e847f7a4f25e84b6c1f06
Author: Nico Kruber 
Date:   2017-08-02T09:27:49Z

[hotfix][core] add additional final methods in final classes

This applies the scheme of HeapMemorySegment to HybridMemorySegment where 
core
methods are also marked "final" to be more future-proof.

commit bedf14708b7aba88761f05c19abaf7f26d16dd20
Author: Nico Kruber 
Date:   2017-08-04T13:15:32Z

[FLINK-7312][checkstyle] remove trailing whitespace

commit 67e37971a4f8d5c40290e7a9c8ae2e6a2e1deb68
Author: Nico Kruber 
Date:   2017-08-04T13:20:28Z

[FLINK-7312][checkstyle] organise imports

commit d8657c8f02ca16af1f9e08621a8f73adb5d26959
Author: Nico Kruber 
Date:   2017-08-04T13:24:16Z

[FLINK-7312][checkstyle] add, adapt and improve comments

commit 654b599569e3a3e5ac063d253311b67652a33c1d
Author: Nico Kruber 
Date:   2017-08-04T13:26:40Z

[FLINK-7312][checkstyle] remove redundant "public" keyword in interfaces

commit 7117de1adeefd624ae958370d9614162d18bd9ed
Author: Nico Kruber 
Date:   2017-08-04T13:27:36Z

[FLINK-7312][checkstyle] ignore some spurious warnings

commit dd150af85551f64c7f3a260a013d59b7d773f94a
Author: Nico Kruber 
Date:   2017-08-04T13:35:15Z

[FLINK-7312][checkstyle] 

[jira] [Closed] (FLINK-7369) Add more information for `Key group index out of range of key group range` exception

2017-08-04 Thread Benedict Jin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benedict Jin closed FLINK-7369.
---
   Resolution: Fixed
Fix Version/s: 2.0.0

> Add more information for `Key group index out of range of key group range` 
> exception
> 
>
> Key: FLINK-7369
> URL: https://issues.apache.org/jira/browse/FLINK-7369
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Benedict Jin
>Assignee: Benedict Jin
> Fix For: 2.0.0
>
>
> When i got the following exception log, it make me confused that the index is 
> more than `32` or less than `16`. So, we should add more information for it.
> ```java
> java.lang.IllegalArgumentException: Key group index out of range of key group 
> range [16, 32).
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.setMapForKeyGroup(NestedMapsStateTable.java:104)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.putAndGetOld(NestedMapsStateTable.java:218)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:207)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:145)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:72)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processElement(AbstractKeyedCEPPatternOperator.java:171)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> ```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7369) Add more information for `Key group index out of range of key group range` exception

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114347#comment-16114347
 ] 

ASF GitHub Bot commented on FLINK-7369:
---

Github user asdf2014 commented on the issue:

https://github.com/apache/flink/pull/4474
  
You are so polite that you do not have to be sorry :D


> Add more information for `Key group index out of range of key group range` 
> exception
> 
>
> Key: FLINK-7369
> URL: https://issues.apache.org/jira/browse/FLINK-7369
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Benedict Jin
>Assignee: Benedict Jin
>
> When i got the following exception log, it make me confused that the index is 
> more than `32` or less than `16`. So, we should add more information for it.
> ```java
> java.lang.IllegalArgumentException: Key group index out of range of key group 
> range [16, 32).
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.setMapForKeyGroup(NestedMapsStateTable.java:104)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.putAndGetOld(NestedMapsStateTable.java:218)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:207)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:145)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:72)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processElement(AbstractKeyedCEPPatternOperator.java:171)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> ```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4474: FLINK-7369: Add more information for `Key group index out...

2017-08-04 Thread asdf2014
Github user asdf2014 commented on the issue:

https://github.com/apache/flink/pull/4474
  
You are so polite that you do not have to be sorry :D


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7369) Add more information for `Key group index out of range of key group range` exception

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114337#comment-16114337
 ] 

ASF GitHub Bot commented on FLINK-7369:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4474
  
@asdf2014 Yes, and sorry again that I didn't see the PR fast enough. It was 
absolutely not my intention to ignore your work. Thanks again for your efforts!


> Add more information for `Key group index out of range of key group range` 
> exception
> 
>
> Key: FLINK-7369
> URL: https://issues.apache.org/jira/browse/FLINK-7369
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Benedict Jin
>Assignee: Benedict Jin
>
> When i got the following exception log, it make me confused that the index is 
> more than `32` or less than `16`. So, we should add more information for it.
> ```java
> java.lang.IllegalArgumentException: Key group index out of range of key group 
> range [16, 32).
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.setMapForKeyGroup(NestedMapsStateTable.java:104)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.putAndGetOld(NestedMapsStateTable.java:218)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:207)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:145)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:72)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processElement(AbstractKeyedCEPPatternOperator.java:171)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> ```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4474: FLINK-7369: Add more information for `Key group index out...

2017-08-04 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4474
  
@asdf2014 Yes, and sorry again that I didn't see the PR fast enough. It was 
absolutely not my intention to ignore your work. Thanks again for your efforts!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7369) Add more information for `Key group index out of range of key group range` exception

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114331#comment-16114331
 ] 

ASF GitHub Bot commented on FLINK-7369:
---

Github user asdf2014 commented on the issue:

https://github.com/apache/flink/pull/4474
  
Hi, @StefanRRichter . Thank you take the suggestion. I have already seen 
the improvement 
[merged](https://github.com/apache/flink/commit/04777538c0a54c113d0f9b49b5922ba47f6e1eb8)
 into master. :+1: 


> Add more information for `Key group index out of range of key group range` 
> exception
> 
>
> Key: FLINK-7369
> URL: https://issues.apache.org/jira/browse/FLINK-7369
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Benedict Jin
>Assignee: Benedict Jin
>
> When i got the following exception log, it make me confused that the index is 
> more than `32` or less than `16`. So, we should add more information for it.
> ```java
> java.lang.IllegalArgumentException: Key group index out of range of key group 
> range [16, 32).
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.setMapForKeyGroup(NestedMapsStateTable.java:104)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.putAndGetOld(NestedMapsStateTable.java:218)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:207)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:145)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:72)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processElement(AbstractKeyedCEPPatternOperator.java:171)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> ```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7369) Add more information for `Key group index out of range of key group range` exception

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114332#comment-16114332
 ] 

ASF GitHub Bot commented on FLINK-7369:
---

Github user asdf2014 closed the pull request at:

https://github.com/apache/flink/pull/4474


> Add more information for `Key group index out of range of key group range` 
> exception
> 
>
> Key: FLINK-7369
> URL: https://issues.apache.org/jira/browse/FLINK-7369
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Benedict Jin
>Assignee: Benedict Jin
>
> When i got the following exception log, it make me confused that the index is 
> more than `32` or less than `16`. So, we should add more information for it.
> ```java
> java.lang.IllegalArgumentException: Key group index out of range of key group 
> range [16, 32).
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.setMapForKeyGroup(NestedMapsStateTable.java:104)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.putAndGetOld(NestedMapsStateTable.java:218)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:207)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:145)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:72)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processElement(AbstractKeyedCEPPatternOperator.java:171)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> ```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7371) user defined aggregator assumes nr of arguments smaller or equal than number of row fields

2017-08-04 Thread Stefano Bortoli (JIRA)
Stefano Bortoli created FLINK-7371:
--

 Summary: user defined aggregator assumes nr of arguments smaller 
or equal than number of row fields
 Key: FLINK-7371
 URL: https://issues.apache.org/jira/browse/FLINK-7371
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.3.1
Reporter: Stefano Bortoli


The definition of user define aggregations with a number of parameters larger 
than the row fields causes ArrayIndexOutOfBoundsException because the indexing 
is based on a linear iteration over row fields. This does not consider cases 
where fields can be used more than once and constant values are passed to the 
aggregation function.

for example:
{code}
window(partition {} order by [2] rows between $5 PRECEDING and CURRENT ROW aggs 
[myAgg($0, $1, $3, $0, $4)])
{code}

where $3 and $4 are reference to constants, and $0 and $1 are fields causes:

{code}
java.lang.ArrayIndexOutOfBoundsException: 4
at 
org.apache.flink.table.plan.schema.RowSchema.mapIndex(RowSchema.scala:134)
at 
org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147)
at 
org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.flink.table.plan.schema.RowSchema.mapAggregateCall(RowSchema.scala:147)
at 
org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate$$anonfun$9.apply(DataStreamOverAggregate.scala:362)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4474: FLINK-7369: Add more information for `Key group index out...

2017-08-04 Thread asdf2014
Github user asdf2014 commented on the issue:

https://github.com/apache/flink/pull/4474
  
Hi, @StefanRRichter . Thank you take the suggestion. I have already seen 
the improvement 
[merged](https://github.com/apache/flink/commit/04777538c0a54c113d0f9b49b5922ba47f6e1eb8)
 into master. :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4474: FLINK-7369: Add more information for `Key group in...

2017-08-04 Thread asdf2014
Github user asdf2014 closed the pull request at:

https://github.com/apache/flink/pull/4474


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7310) always use HybridMemorySegment

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114322#comment-16114322
 ] 

ASF GitHub Bot commented on FLINK-7310:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4445
  
in a non-exhaustive mini benchmark, I ran `HashVsSortMiniBenchmark` and got 
the following results:

# Best out of 5 (in ms)

Test | `master` | `Flink-7310`
 | -- | --
Hash Build First | 5541 | 5629
Sort-Merge | 6194 | 6816
Hash Build | 3587 | 3629

# All results

## `master`

Test | 1 | 2 | 3 | 4 | 5
 | - | - | - | - | -
Hash Build First | 5772.0 | 5541.0 | 5707.0 | 5733.0 | 5751.0
Sort-Merge | 6704.0 | 7146.0 | 6194.0 | 6915.0 | 6445.0
Hash Build Second | 3834.0 | 3805.0 | 3811.0 | 3587.0 | 3563.0

## `FLINK-7310`

Test | 1 | 2 | 3 | 4 | 5
 | - | - | - | - | -
Hash Build First | 5816.0 | 5770.0 | 5629.0 | 5656.0 | 5745.0
Sort-Merge | 7284.0 | 7233.0 | 6816.0 | 6861.0 | 7218.0
Hash Build Second | 3802.0 | 3836.0 | 3629.0 | 3782.0 | 3804.0


> always use HybridMemorySegment
> --
>
> Key: FLINK-7310
> URL: https://issues.apache.org/jira/browse/FLINK-7310
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> For future changes to the network buffers (sending our own off-heap buffers 
> through to netty), we cannot use {{HeapMemorySegment}} anymore and need to 
> rely on {{HybridMemorySegment}} instead.
> We should thus drop any code that loads the {{HeapMemorySegment}} (it is 
> still available if needed) in favour of the {{HybridMemorySegment}} which is 
> able to work on both heap and off-heap memory.
> FYI: For the performance penalty of this change compared to using 
> {{HeapMemorySegment}} alone, see this interesting blob article (from 2015):
> https://flink.apache.org/news/2015/09/16/off-heap-memory.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4445: [FLINK-7310][core] always use the HybridMemorySegment

2017-08-04 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4445
  
in a non-exhaustive mini benchmark, I ran `HashVsSortMiniBenchmark` and got 
the following results:

# Best out of 5 (in ms)

Test | `master` | `Flink-7310`
 | -- | --
Hash Build First | 5541 | 5629
Sort-Merge | 6194 | 6816
Hash Build | 3587 | 3629

# All results

## `master`

Test | 1 | 2 | 3 | 4 | 5
 | - | - | - | - | -
Hash Build First | 5772.0 | 5541.0 | 5707.0 | 5733.0 | 5751.0
Sort-Merge | 6704.0 | 7146.0 | 6194.0 | 6915.0 | 6445.0
Hash Build Second | 3834.0 | 3805.0 | 3811.0 | 3587.0 | 3563.0

## `FLINK-7310`

Test | 1 | 2 | 3 | 4 | 5
 | - | - | - | - | -
Hash Build First | 5816.0 | 5770.0 | 5629.0 | 5656.0 | 5745.0
Sort-Merge | 7284.0 | 7233.0 | 6816.0 | 6861.0 | 7218.0
Hash Build Second | 3802.0 | 3836.0 | 3629.0 | 3782.0 | 3804.0


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7293) Support custom order by in PatternStream

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114294#comment-16114294
 ] 

ASF GitHub Bot commented on FLINK-7293:
---

Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4418
  
@kl0u Good catch. Have updated the PR according to the comments.


> Support custom order by in PatternStream
> 
>
> Key: FLINK-7293
> URL: https://issues.apache.org/jira/browse/FLINK-7293
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, when {{ProcessingTime}} is configured, the events are fed to NFA 
> in the order of the arriving time and when {{EventTime}} is configured, the 
> events are fed to NFA in the order of the event time. It should also allow 
> custom {{order by}} to allow users to define the order of the events besides 
> the above factors.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4418: [FLINK-7293] [cep] Support custom order by in PatternStre...

2017-08-04 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4418
  
@kl0u Good catch. Have updated the PR according to the comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6995) Add a warning to outdated documentation

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114246#comment-16114246
 ] 

ASF GitHub Bot commented on FLINK-6995:
---

Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/4480
  
+1


> Add a warning to outdated documentation
> ---
>
> Key: FLINK-6995
> URL: https://issues.apache.org/jira/browse/FLINK-6995
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Timo Walther
>Assignee: mingleizhang
>
> When I search for "flink yarn" by Google, the first result is a outdated 0.8 
> release documentation page. We should add a warning to outdated documentation 
> pages.
> There are other problems as well:
> The main page only links to 1.3 and 1.4 but the flink-docs-master 
> documentation links to 1.3, 1.2, 1.1, and 1.0. But each of those packages 
> only links to older releases so if a user arrives on a 1.2 page they won't 
> see 1.3.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4480: [FLINK-6995] [docs] Enable is_latest attribute to false

2017-08-04 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/4480
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6995) Add a warning to outdated documentation

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114239#comment-16114239
 ] 

ASF GitHub Bot commented on FLINK-6995:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/4480

[FLINK-6995] [docs] Enable is_latest attribute to false

## What is the purpose of the change

Add a warning to flink1.2 document. This PR is based on 
https://github.com/apache/flink-web/pull/72,  and to make the warning message 
enable and appear. 

## Verifying this change

This change is a trivial work without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-6995

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4480.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4480


commit 85eb7acbfc19a410e8f9bf488b102ea40f23d12b
Author: zhangminglei 
Date:   2017-08-04T10:37:16Z

[FLINK-6995] [docs] Enable is_latest attribute to false




> Add a warning to outdated documentation
> ---
>
> Key: FLINK-6995
> URL: https://issues.apache.org/jira/browse/FLINK-6995
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Timo Walther
>Assignee: mingleizhang
>
> When I search for "flink yarn" by Google, the first result is a outdated 0.8 
> release documentation page. We should add a warning to outdated documentation 
> pages.
> There are other problems as well:
> The main page only links to 1.3 and 1.4 but the flink-docs-master 
> documentation links to 1.3, 1.2, 1.1, and 1.0. But each of those packages 
> only links to older releases so if a user arrives on a 1.2 page they won't 
> see 1.3.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4480: [FLINK-6995] [docs] Enable is_latest attribute to ...

2017-08-04 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/4480

[FLINK-6995] [docs] Enable is_latest attribute to false

## What is the purpose of the change

Add a warning to flink1.2 document. This PR is based on 
https://github.com/apache/flink-web/pull/72,  and to make the warning message 
enable and appear. 

## Verifying this change

This change is a trivial work without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-6995

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4480.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4480


commit 85eb7acbfc19a410e8f9bf488b102ea40f23d12b
Author: zhangminglei 
Date:   2017-08-04T10:37:16Z

[FLINK-6995] [docs] Enable is_latest attribute to false




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4454: [hotfix][docs] Add section in docs about writing unit/int...

2017-08-04 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4454
  
@pnowojski adding a link to `TwoPhaseCommitSinkFunctionTest` might make 
sense for testing exactly-once delivery.

There are also often cases where the user, in particular, wants to check 
that their operator state semantics are exactly-once.

However, I think we can also wait on other's opinions on whether or not 
this information is really required. Instructions for watermark / timestamp 
testing makes sense because that's something users frequently mess up with 
incorrect extractor implementations. Exactly-once semantics, however, isn't 
something the user scope can touch.

I'm mentioning it because we sometimes have users in the mailing lists 
testing exactly-once incorrectly (maybe also because they misunderstood some 
concepts) and asking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6995) Add a warning to outdated documentation

2017-08-04 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114213#comment-16114213
 ] 

mingleizhang commented on FLINK-6995:
-

Ahhha. I see. Thanks ~

> Add a warning to outdated documentation
> ---
>
> Key: FLINK-6995
> URL: https://issues.apache.org/jira/browse/FLINK-6995
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Timo Walther
>Assignee: mingleizhang
>
> When I search for "flink yarn" by Google, the first result is a outdated 0.8 
> release documentation page. We should add a warning to outdated documentation 
> pages.
> There are other problems as well:
> The main page only links to 1.3 and 1.4 but the flink-docs-master 
> documentation links to 1.3, 1.2, 1.1, and 1.0. But each of those packages 
> only links to older releases so if a user arrives on a 1.2 page they won't 
> see 1.3.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6995) Add a warning to outdated documentation

2017-08-04 Thread David Anderson (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114205#comment-16114205
 ] 

David Anderson commented on FLINK-6995:
---

[~mingleizhang] What we need is a PR against the release-1.2 branch that 
modifies 

https://github.com/apache/flink/blob/release-1.2/docs/_config.yml#L48

so that site.is_latest is false. Then the warning will appear and point to the 
1.3 docs.


> Add a warning to outdated documentation
> ---
>
> Key: FLINK-6995
> URL: https://issues.apache.org/jira/browse/FLINK-6995
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Timo Walther
>Assignee: mingleizhang
>
> When I search for "flink yarn" by Google, the first result is a outdated 0.8 
> release documentation page. We should add a warning to outdated documentation 
> pages.
> There are other problems as well:
> The main page only links to 1.3 and 1.4 but the flink-docs-master 
> documentation links to 1.3, 1.2, 1.1, and 1.0. But each of those packages 
> only links to older releases so if a user arrives on a 1.2 page they won't 
> see 1.3.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-04 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114196#comment-16114196
 ] 

Xingcan Cui commented on FLINK-7337:


Hi [~fhueske], sorry for the silly questions. 

In my mind, to convert a DataStream to a Table, the timestamps and watermarks 
must be assigned in advance with an assigner. Then (1) what's the relationship 
between the timestamps assigned before and those set by the new 
{{ProcessFunction}}, and (2) could the timestamps dynamically set to the 
{{StreamRecord}} work without the watermarks? 



> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4472: FLINK-7368: MetricStore makes cpu spin at 100%

2017-08-04 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4472
  
The problem is that the `MetricFetcher` isn't synchronizing on the 
`MetricStore´ object in `MetricFetcher#addMetrics()` as it should.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7368) MetricStore makes cpu spin at 100%

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114193#comment-16114193
 ] 

ASF GitHub Bot commented on FLINK-7368:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4472
  
The problem is that the `MetricFetcher` isn't synchronizing on the 
`MetricStore´ object in `MetricFetcher#addMetrics()` as it should.


> MetricStore makes cpu spin at 100%
> --
>
> Key: FLINK-7368
> URL: https://issues.apache.org/jira/browse/FLINK-7368
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Nico Chen
> Attachments: jm-jstack.log, MyHashMapInfiniteLoopTest.java, 
> MyHashMap.java
>
>
> Flink's `MetricStore` is not thread-safe. multi-treads may acess java' 
> hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. 
> Recently I met the case that flink jobmanager consumed 100% cpu. A part of 
> stacktrace is shown below. The full jstack is in the attachment.
> {code:java}
> "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 
> runnable [0x7fbd7d1c2000]
>java.lang.Thread.State: RUNNABLE
> at java.util.HashMap.put(HashMap.java:494)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58)
> at 
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188)
> at akka.dispatch.OnSuccess.internal(Future.scala:212)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at 
> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
> at 
> scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117)
> at 
> scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at 
> java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
> at 
> java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
> at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784)
> at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
> at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
> {code}
> There are 24 threads show same stacktrace as above to indicate they are 
> spining at HashMap.put(HashMap.java:494) (I am using Java 1.7.0_6). Many 
> posts indicate multi-threads accessing hashmap cause this problem and I 
> reproduce the case as well. The test code is attached. I only modify the 
> HashMap.transfer() by adding concurrent barriers for different treads in 
> order to simulate the timing of creation of cycles in hashmap's Entry.  My 
> program's stacktrace shows it hangs at same line of 
> HashMap(HashMap.put(HashMap.java:494)) as the stacktrace I post above.
>  Even through `MetricFetcher` has a 10 seconds minimum inteverl between each 
> metrics qurey, it still cannot guarntee query responses do not acess 
> `MtricStore`'s hashmap concurrently.  Thus I think it's a bug to fix.
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4454: [hotfix][docs] Add section in docs about writing u...

2017-08-04 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4454#discussion_r131354369
  
--- Diff: docs/dev/testing.md ---
@@ -0,0 +1,189 @@
+---
+title: "Testing"
+nav-parent_id: dev
+nav-id: testing
+nav-pos: 99
+---
+
+
+This page briefly discusses how to test Flink application in the local 
environment.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Unit testing
+
+It is encouraged to test your classes with unit tests as much as possible. 
For example if one implement following `ReduceFunction`:
+
+~~~java
+public class SumReduce implements ReduceFunction {
+@Override
+public Long reduce(Long value1, Long value2) throws Exception {
+return value1 + value2;
+}
+}
+~~~
+
+it is very easy to unit test it with your favorite framework:
+
+~~~java
+public class SumReduceTest {
+@Test
+public void testSum() throws Exception {
+SumReduce sumReduce = new SumReduce();
+
+assertEquals(42L, sumReduce.reduce(40L, 2L));
+}
+}
+~~~
+
+Or in scala:
+
+~~~scala
+class SumReduce extends ReduceFunction[Long] {
+override def reduce(value1: java.lang.Long,
+value2: java.lang.Long): java.lang.Long = value1 + 
value2
+}
+~~~
+
+~~~scala
+class SumReduceTest extends FlatSpec with Matchers {
+"SumReduce" should "add values" in {
+val sumReduce: SumReduce = new SumReduce()
+sumReduce.reduce(40L, 2L) should be (42L)
+}
+}
+~~~
+
+## Integration testing
+
+You also can write integration tests that are executed against local Flink 
mini cluster.
+In order to do so add a test dependency `flink-test-utils`.
+
+~~~ xml
+
+  org.apache.flink
+  flink-test-utils{{site.scala_version_suffix}}
+  {{site.version}}
+
+~~~
--- End diff --

To get this to work in my outside-of-flink project, I had to modify this as:


  org.apache.flink
  
flink-test-utils_{{site.scala_version_suffix}}
  ${flink.version}
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7309) NullPointerException in CodeGenUtils.timePointToInternalCode() generated code

2017-08-04 Thread Liangliang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114191#comment-16114191
 ] 

Liangliang Chen commented on FLINK-7309:


@yestinchen ???

> NullPointerException in CodeGenUtils.timePointToInternalCode() generated code
> -
>
> Key: FLINK-7309
> URL: https://issues.apache.org/jira/browse/FLINK-7309
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime, Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Liangliang Chen
>Priority: Critical
>
> The code generated by CodeGenUtils.timePointToInternalCode() will cause a 
> NullPointerException when SQL table field type is `TIMESTAMP` and the field 
> value is `null`.
> Example for reproduce:
> {code}
> object StreamSQLExample {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> // null field value
> val orderA: DataStream[Order] = env.fromCollection(Seq(
>   Order(null, "beer", 3)))
>   
> tEnv.registerDataStream("OrderA", orderA, 'ts, 'product, 'amount)
> val result = tEnv.sql("SELECT * FROM OrderA")
> result.toAppendStream[Order].print()
> 
> env.execute()
>   }
>   case class Order(ts: Timestamp, product: String, amount: Int)
> }
> {code}
> In the above example, timePointToInternalCode() will generated some 
> statements like this:
> {code}
> ...
>   long result$1 = 
> org.apache.calcite.runtime.SqlFunctions.toLong((java.sql.Timestamp) in1.ts());
>   boolean isNull$2 = (java.sql.Timestamp) in1.ts() == null;
> ...
> {code}
> so, the NPE will happen when in1.ts() is null.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6995) Add a warning to outdated documentation

2017-08-04 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114184#comment-16114184
 ] 

mingleizhang commented on FLINK-6995:
-

Hi, [~alpinegizmo] What you said : We have to be enabled by setting 
{{site.is_latest}} to false. My question is, where to set it in our code ? I 
checked {{release-1.2 branch}}. The warning was already existing. And the 
https://flink.apache.org/q/stable-docs.html had already pointed to the 1.3. I 
probably think we have finished this work. But not sure. Could you take a look 
on it ?

> Add a warning to outdated documentation
> ---
>
> Key: FLINK-6995
> URL: https://issues.apache.org/jira/browse/FLINK-6995
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Timo Walther
>Assignee: mingleizhang
>
> When I search for "flink yarn" by Google, the first result is a outdated 0.8 
> release documentation page. We should add a warning to outdated documentation 
> pages.
> There are other problems as well:
> The main page only links to 1.3 and 1.4 but the flink-docs-master 
> documentation links to 1.3, 1.2, 1.1, and 1.0. But each of those packages 
> only links to older releases so if a user arrives on a 1.2 page they won't 
> see 1.3.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4454: [hotfix][docs] Add section in docs about writing unit/int...

2017-08-04 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4454
  
@tzulitai maybe I will add a link to `TwoPhaseCommitSinkFunctionTest` for 
how to test `exactly-once`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4454: [hotfix][docs] Add section in docs about writing u...

2017-08-04 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/4454#discussion_r131352677
  
--- Diff: docs/dev/testing.md ---
@@ -90,13 +121,69 @@ public class ExampleIntegrationTest extends 
StreamingMultipleProgramsTestBase {
 public static final List values = new ArrayList<>();
 
 @Override
-public void invoke(Long value) throws Exception {
+public synchronized void invoke(Long value) throws Exception {
 values.add(value);
 }
 }
 }
 ~~~
 
-Static variable in `CollectSink` is required because Flink serializes all 
operators before distributing them across a cluster.
+or in Scala:
+
+~~~scala
+class MultiplyByTwo extends MapFunction[Long, Long] {
+  override def map(value: java.lang.Long): java.lang.Long = value * 2
+}
+~~~
+
+~~~scala
+class ExampleIntegrationTest extends FlatSpec with Matchers {
+"MultiplyByTwo" should "multiply it input by two" in {
+val env: StreamExecutionEnvironment =
+StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+// values are collected on a static variable
+CollectSink.values.clear()
+env
+.fromElements(1L, 21L, 22L)
+.map(new MultiplyByTwo())
+.addSink(new CollectSink())
+env.execute()
+CollectSink.values should be (Lists.newArrayList(2L, 42L, 44L))
+}
+}
+
+object CollectSink {
+// must be static
+val values: List[Long] = new ArrayList()
+}
+
+class CollectSink extends SinkFunction[Long] {
+override def invoke(value: java.lang.Long): Unit = {
+synchronized {
+values.add(value)
+}
+}
+}
+~~~
+
+Static variable in `CollectSink` is used here because Flink serializes all 
operators before distributing them across a cluster.
+Communicating with operators instantiated by a local flink mini cluster 
via static variables is one way around this issue.
 Alternatively in your test sink you could for example write the data to 
files in a temporary directory.
 Of course you could use your own custom sources and sinks, which can emit 
watermarks.
+
+## Testing checkpointing and state handling
+
+One way to test state handling is to enable checkpointing in integration 
tests. You can do that by
+configuring `environment` in the test:
+~~~java
+env.enableCheckpointing(500);
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
+~~~
+and for example adding to your Flink application an identity mapper 
operator that will throw and exception
--- End diff --

"throw an exception" (typo)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4454: [hotfix][docs] Add section in docs about writing u...

2017-08-04 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4454#discussion_r131343119
  
--- Diff: docs/dev/testing.md ---
@@ -0,0 +1,102 @@
+---
+title: "Testing"
+nav-parent_id: dev
+nav-pos: 110
+---
+
+
+This page briefly discusses how to test Flink application in the local 
environment.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Unit testing
+
+It is encouraged to test your classes with unit tests as much as possible. 
For example if one implement following `ReduceFunction`:
+~~~java
+public class SumReduce implements ReduceFunction {
+@Override
+public Long reduce(Long value1, Long value2) throws Exception {
+return value1 + value2;
+}
+}
+~~~
+it is very easy to unit test it with your favorite framework:
+~~~java
+public class SumReduceTest {
+@Test
+public void testSum() throws Exception {
+SumReduce sumReduce = new SumReduce();
+
+assertEquals(42L, sumReduce.reduce(40L, 2L));
+}
+}
+~~~
+
+## Integration testing
+
+You also can write integration tests that are executed against local Flink 
mini cluster.
+In order to do so add a test dependency `flink-test-utils`. For example if 
you want to
+test following `MapFunction`:
+
+~~~java
+public class MultiplyByTwo implements MapFunction {
+@Override
+public Long map(Long value) throws Exception {
+return value * 2;
+}
+}
+~~~
+
+You could write following integration test:
+
+~~~java
+public class ExampleIntegrationTest extends 
StreamingMultipleProgramsTestBase {
+@Test
+public void testSum() throws Exception {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(1);
+
+// values are collected on a static variable
+CollectSink.values.clear();
+
+env.fromElements(1L, 21L, 22L)
+.map(new MultiplyByTwo())
+.addSink(new CollectSink());
+env.execute();
+
+assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values);
+}
+
+private static class CollectSink implements SinkFunction {
+// must be static
+public static final List values = new ArrayList<>();
+
+@Override
+public void invoke(Long value) throws Exception {
+values.add(value);
--- End diff --

It depends on the parallelism, but ok, it will be safer to synchronize it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4454: [hotfix][docs] Add section in docs about writing u...

2017-08-04 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4454#discussion_r131337294
  
--- Diff: docs/dev/testing.md ---
@@ -0,0 +1,102 @@
+---
+title: "Testing"
+nav-parent_id: dev
+nav-pos: 110
+---
+
+
+This page briefly discusses how to test Flink application in the local 
environment.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Unit testing
+
+It is encouraged to test your classes with unit tests as much as possible. 
For example if one implement following `ReduceFunction`:
+~~~java
+public class SumReduce implements ReduceFunction {
+@Override
+public Long reduce(Long value1, Long value2) throws Exception {
+return value1 + value2;
+}
+}
+~~~
+it is very easy to unit test it with your favorite framework:
--- End diff --

I don't see a reason why.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4454: [hotfix][docs] Add section in docs about writing u...

2017-08-04 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4454#discussion_r131344197
  
--- Diff: docs/dev/testing.md ---
@@ -0,0 +1,102 @@
+---
+title: "Testing"
+nav-parent_id: dev
+nav-pos: 110
+---
+
+
+This page briefly discusses how to test Flink application in the local 
environment.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Unit testing
+
+It is encouraged to test your classes with unit tests as much as possible. 
For example if one implement following `ReduceFunction`:
+~~~java
+public class SumReduce implements ReduceFunction {
+@Override
+public Long reduce(Long value1, Long value2) throws Exception {
+return value1 + value2;
+}
+}
+~~~
+it is very easy to unit test it with your favorite framework:
+~~~java
+public class SumReduceTest {
+@Test
+public void testSum() throws Exception {
+SumReduce sumReduce = new SumReduce();
+
+assertEquals(42L, sumReduce.reduce(40L, 2L));
+}
+}
+~~~
+
+## Integration testing
+
+You also can write integration tests that are executed against local Flink 
mini cluster.
+In order to do so add a test dependency `flink-test-utils`. For example if 
you want to
+test following `MapFunction`:
+
+~~~java
+public class MultiplyByTwo implements MapFunction {
+@Override
+public Long map(Long value) throws Exception {
+return value * 2;
+}
+}
+~~~
+
+You could write following integration test:
+
+~~~java
+public class ExampleIntegrationTest extends 
StreamingMultipleProgramsTestBase {
+@Test
+public void testSum() throws Exception {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(1);
+
+// values are collected on a static variable
+CollectSink.values.clear();
+
+env.fromElements(1L, 21L, 22L)
+.map(new MultiplyByTwo())
+.addSink(new CollectSink());
+env.execute();
+
+assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values);
+}
+
+private static class CollectSink implements SinkFunction {
+// must be static
+public static final List values = new ArrayList<>();
+
+@Override
+public void invoke(Long value) throws Exception {
+values.add(value);
+}
+}
+}
+~~~
+
+Static variable in `CollectSink` is required because Flink serializes all 
operators before distributing them across a cluster.
--- End diff --

I don't want to drop it, because such construct with `static` fields used 
in test was a little bit confusing for me when I started working with Flink.

I have rephrased this section a little bit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4454: [hotfix][docs] Add section in docs about writing u...

2017-08-04 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4454#discussion_r131342944
  
--- Diff: docs/dev/testing.md ---
@@ -0,0 +1,102 @@
+---
+title: "Testing"
+nav-parent_id: dev
+nav-pos: 110
+---
+
+
+This page briefly discusses how to test Flink application in the local 
environment.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Unit testing
+
+It is encouraged to test your classes with unit tests as much as possible. 
For example if one implement following `ReduceFunction`:
+~~~java
--- End diff --

bah... Done, that were my first Scala lines written in my life


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7309) NullPointerException in CodeGenUtils.timePointToInternalCode() generated code

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114166#comment-16114166
 ] 

ASF GitHub Bot commented on FLINK-7309:
---

GitHub user yestinchen opened a pull request:

https://github.com/apache/flink/pull/4479

[FLINK-7309][hotfix] fix NullPointerException when selecting null fields

## What is the purpose of the change

This pull request addresses FLINK-7309, adds null check before applying 
unboxing on input fields.

## Brief change log

- Add null check before applying unboxing on input fields.

## Verifying this change

This change added tests and can be verified as follows:
 - Added test case: select null field from a Timestamp type field.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yestinchen/flink FLINK-7309

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4479.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4479


commit 7cbbd13b723df11e72ccb115b5266104b0b01183
Author: Yestin <873915...@qq.com>
Date:   2017-08-04T09:21:03Z

[FLINK-7309][hotfix] fix NullPointerException when selecting null fields.




> NullPointerException in CodeGenUtils.timePointToInternalCode() generated code
> -
>
> Key: FLINK-7309
> URL: https://issues.apache.org/jira/browse/FLINK-7309
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime, Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Liangliang Chen
>Priority: Critical
>
> The code generated by CodeGenUtils.timePointToInternalCode() will cause a 
> NullPointerException when SQL table field type is `TIMESTAMP` and the field 
> value is `null`.
> Example for reproduce:
> {code}
> object StreamSQLExample {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> // null field value
> val orderA: DataStream[Order] = env.fromCollection(Seq(
>   Order(null, "beer", 3)))
>   
> tEnv.registerDataStream("OrderA", orderA, 'ts, 'product, 'amount)
> val result = tEnv.sql("SELECT * FROM OrderA")
> result.toAppendStream[Order].print()
> 
> env.execute()
>   }
>   case class Order(ts: Timestamp, product: String, amount: Int)
> }
> {code}
> In the above example, timePointToInternalCode() will generated some 
> statements like this:
> {code}
> ...
>   long result$1 = 
> org.apache.calcite.runtime.SqlFunctions.toLong((java.sql.Timestamp) in1.ts());
>   boolean isNull$2 = (java.sql.Timestamp) in1.ts() == null;
> ...
> {code}
> so, the NPE will happen when in1.ts() is null.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks

2017-08-04 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114167#comment-16114167
 ] 

Xingcan Cui commented on FLINK-7245:


Hi all, I'd like to throw out some basic ideas about the design.

# To support holding back watermarks, I plan to cache all the received 
watermarks as a priority queue in the {{InternalTimeServiceManager}} and expose 
some methods needed (e.g.,  the {{peek()}} and {{poll()}}).
# For the {{advanceWatermark()}} method in {{InternalTimeServiceManager}}, I 
think we can add a boolean parameter to indicate whether the watermark should 
be cached.
# A {{triggerWatermark()}} method, which can contain a default emitting 
mechanism (i.e., remove some watermarks from the cache and emit them) or be 
(partially) user-defined in the future, should be added to a new 
{{WatermarkPostponableOperator}}.
# Now the {{processWatermark()}} method in {{AbstractStreamOperator}} can be 
overridden in the {{WatermarkPostponableOperator}}.
# The watermarks can be snapshotted and restored with the 
{{snapshotStateForKeyGroup()}} and {{restoreStateForKeyGroup()}} methods in 
{{InternalTimeServiceManager}}.

There's a question. For an operator with two inputs, the current 
{{AbstractStreamOperator}} deals with their watermarks by merging them in 
advance, i.e., 
{code:java}
public void processWatermark1(Watermark mark) throws Exception {
input1Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
}
}

public void processWatermark2(Watermark mark) throws Exception {
input2Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
}
}
{code}
I'm not sure if we should add two separate queues for them or just keep the 
current mechanism.

What do you think? [~fhueske], [~aljoscha], and [~jark].

Best, Xingcan

> Enhance the operators to support holding back watermarks
> 
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>   if (timeServiceManager != null) {
>   timeServiceManager.advanceWatermark(mark);
>   }
>   output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4479: [FLINK-7309][hotfix] fix NullPointerException when...

2017-08-04 Thread yestinchen
GitHub user yestinchen opened a pull request:

https://github.com/apache/flink/pull/4479

[FLINK-7309][hotfix] fix NullPointerException when selecting null fields

## What is the purpose of the change

This pull request addresses FLINK-7309, adds null check before applying 
unboxing on input fields.

## Brief change log

- Add null check before applying unboxing on input fields.

## Verifying this change

This change added tests and can be verified as follows:
 - Added test case: select null field from a Timestamp type field.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yestinchen/flink FLINK-7309

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4479.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4479


commit 7cbbd13b723df11e72ccb115b5266104b0b01183
Author: Yestin <873915...@qq.com>
Date:   2017-08-04T09:21:03Z

[FLINK-7309][hotfix] fix NullPointerException when selecting null fields.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-7309) NullPointerException in CodeGenUtils.timePointToInternalCode() generated code

2017-08-04 Thread Liangliang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112834#comment-16112834
 ] 

Liangliang Chen edited comment on FLINK-7309 at 8/4/17 9:32 AM:


hi, [~twalthr], I'm not very familiar with Scala-Lang, so I write a test 
program in Java as below:
{code}
public class TestNullSQL {
  public static void main(String[] args) throws Exception {

// set up execution environment
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = 
StreamTableEnvironment.getTableEnvironment(env);

TypeInformation[] types = {BasicTypeInfo.INT_TYPE_INFO, 
SqlTimeTypeInfo.TIMESTAMP};
String names[] = {"id", "ts"};
RowTypeInfo typeInfo = new RowTypeInfo(types, names);

// we assign a null value here!!
DataStream input = env.fromElements(Row.of(1001, 
null)).returns(typeInfo);

tEnv.registerDataStream("test_table", input);
Table table = tEnv.sql("SELECT id, ts FROM test_table");
DataStream result = tEnv.toAppendStream(table, Row.class);
result.print();

env.execute();
  }
}
{code}

I use a {{Row}} type in this example and the exception occurs again.


was (Author: llchen):
hi, [~twalthr], I'm not very familiar with Scala-Lang, so I rewrite a test 
example with Java as below:
{code}
public class TestNullSQL {
public static void main(String[] args) throws Exception {

// set up execution environment
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = 
StreamTableEnvironment.getTableEnvironment(env);

TypeInformation[] types = {BasicTypeInfo.INT_TYPE_INFO, 
SqlTimeTypeInfo.TIMESTAMP};
String names[] = {"id", "ts"};
RowTypeInfo typeInfo = new RowTypeInfo(types, names);

// we assign a null value here!!
DataStream input = env.fromElements(Row.of(1001, 
null)).returns(typeInfo);

tEnv.registerDataStream("test_table", input);
Table table = tEnv.sql("SELECT id, ts FROM test_table");
DataStream result = tEnv.toAppendStream(table, Row.class);
result.print();

env.execute();
}
}
{code}

I use a row type in this example and the exception will still happens.The Row 
data type supports an arbitrary number of fields and fields with 
{quote}null{quote} values, so I think the generated code has some problems. And 
what do you think about?

> NullPointerException in CodeGenUtils.timePointToInternalCode() generated code
> -
>
> Key: FLINK-7309
> URL: https://issues.apache.org/jira/browse/FLINK-7309
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime, Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Liangliang Chen
>Priority: Critical
>
> The code generated by CodeGenUtils.timePointToInternalCode() will cause a 
> NullPointerException when SQL table field type is `TIMESTAMP` and the field 
> value is `null`.
> Example for reproduce:
> {code}
> object StreamSQLExample {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> // null field value
> val orderA: DataStream[Order] = env.fromCollection(Seq(
>   Order(null, "beer", 3)))
>   
> tEnv.registerDataStream("OrderA", orderA, 'ts, 'product, 'amount)
> val result = tEnv.sql("SELECT * FROM OrderA")
> result.toAppendStream[Order].print()
> 
> env.execute()
>   }
>   case class Order(ts: Timestamp, product: String, amount: Int)
> }
> {code}
> In the above example, timePointToInternalCode() will generated some 
> statements like this:
> {code}
> ...
>   long result$1 = 
> org.apache.calcite.runtime.SqlFunctions.toLong((java.sql.Timestamp) in1.ts());
>   boolean isNull$2 = (java.sql.Timestamp) in1.ts() == null;
> ...
> {code}
> so, the NPE will happen when in1.ts() is null.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-3347) TaskManager (or its ActorSystem) need to restart in case they notice quarantine

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114159#comment-16114159
 ] 

ASF GitHub Bot commented on FLINK-3347:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4478

[hotfix][docs] add documentation for `taskmanager.exit-on-fatal-akka-error`

## What is the purpose of the change

When the quarantine monitor was added as of FLINK-3347, documentation for 
enabling it only went into the backport for the 1.2 and 1.1 branches, not into 
master and therefore not into the 1.3 release either. This adds it again and 
should also be applied to the `release-1.3` branch.

## Brief change log

- add configuration documentation for `taskmanager.exit-on-fatal-akka-error`

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink hotfix_quarantine_monitor_config

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4478.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4478


commit 6111a0626e13b85b8996dcdf9f3d741c23739cf5
Author: Nico Kruber 
Date:   2017-08-04T09:11:35Z

[hotfix][docs] add documentation for `taskmanager.exit-on-fatal-akka-error`

When the quarantine monitor was added as of FLINK-3347, this documentation 
for
enabling it only went into the backport for the 1.2 branch, not into master 
and
therefore not into the 1.3 release either. This adds it again.




> TaskManager (or its ActorSystem) need to restart in case they notice 
> quarantine
> ---
>
> Key: FLINK-3347
> URL: https://issues.apache.org/jira/browse/FLINK-3347
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.0.0, 1.1.4, 1.3.0, 1.2.1
>
>
> There are cases where Akka quarantines remote actor systems. In that case, no 
> further communication is possible with that actor system unless one of the 
> two actor systems is restarted.
> The result is that a TaskManager is up and available, but cannot register at 
> the JobManager (Akka refuses connection because of the quarantined state), 
> making the TaskManager a useless process.
> I suggest to let the TaskManager restart itself once it notices that either 
> it quarantined the JobManager, or the JobManager quarantined it.
> It is possible to recognize that by listening to certain events in the actor 
> system event stream: 
> http://stackoverflow.com/questions/32471088/akka-cluster-detecting-quarantined-state



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4478: [hotfix][docs] add documentation for `taskmanager....

2017-08-04 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4478

[hotfix][docs] add documentation for `taskmanager.exit-on-fatal-akka-error`

## What is the purpose of the change

When the quarantine monitor was added as of FLINK-3347, documentation for 
enabling it only went into the backport for the 1.2 and 1.1 branches, not into 
master and therefore not into the 1.3 release either. This adds it again and 
should also be applied to the `release-1.3` branch.

## Brief change log

- add configuration documentation for `taskmanager.exit-on-fatal-akka-error`

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink hotfix_quarantine_monitor_config

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4478.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4478


commit 6111a0626e13b85b8996dcdf9f3d741c23739cf5
Author: Nico Kruber 
Date:   2017-08-04T09:11:35Z

[hotfix][docs] add documentation for `taskmanager.exit-on-fatal-akka-error`

When the quarantine monitor was added as of FLINK-3347, this documentation 
for
enabling it only went into the backport for the 1.2 branch, not into master 
and
therefore not into the 1.3 release either. This adds it again.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4373: [FLINK-6429] [table] Bump up Calcite version to 1....

2017-08-04 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4373#discussion_r131346038
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
 ---
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
--- End diff --

Can you add a comment here when we can remove this file again 
(corresponding CALCITE- issue?).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6429) Bump up Calcite version to 1.13

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114154#comment-16114154
 ] 

ASF GitHub Bot commented on FLINK-6429:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4373#discussion_r131346038
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
 ---
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
--- End diff --

Can you add a comment here when we can remove this file again 
(corresponding CALCITE- issue?).


> Bump up Calcite version to 1.13
> ---
>
> Key: FLINK-6429
> URL: https://issues.apache.org/jira/browse/FLINK-6429
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Haohui Mai
>
> This is an umbrella issue for all tasks that need to be done once Apache 
> Calcite 1.13 is released.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7272) Support SQL IN with more than 20 elements in streaming

2017-08-04 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-7272:

Description: 
During the implementation of FLINK-4565 I noticed that an "IN" operation with 
more than 20 elements is converted into {{LogicalValues}} + {{LogicalJoin}} 
operation. Since this is not possible in streaming, we should add some rule 
that merges both operators either into a Correlate or Calc.

This problem also occurs when null is contained in the testing set. So this 
should get higher priority.

  was:During the implementation of FLINK-4565 I noticed that an "IN" operation 
with more than 20 elements is converted into {{LogicalValues}} + 
{{LogicalJoin}} operation. Since this is not possible in streaming, we should 
add some rule that merges both operators either into a Correlate or Calc.


> Support SQL IN with more than 20 elements in streaming
> --
>
> Key: FLINK-7272
> URL: https://issues.apache.org/jira/browse/FLINK-7272
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Timo Walther
>
> During the implementation of FLINK-4565 I noticed that an "IN" operation with 
> more than 20 elements is converted into {{LogicalValues}} + {{LogicalJoin}} 
> operation. Since this is not possible in streaming, we should add some rule 
> that merges both operators either into a Correlate or Calc.
> This problem also occurs when null is contained in the testing set. So this 
> should get higher priority.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4474: FLINK-7369: Add more information for `Key group index out...

2017-08-04 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4474
  
Hi,

sorry that I saw this PR too late because Jira was hanging and I only saw 
the email of the opened issue. I already merged a fix for this to master and 
1.3. I would have also suggested to remove the offset information from the 
message. From the user, the bounds and the index should be enough information.

Can you please close the PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7369) Add more information for `Key group index out of range of key group range` exception

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114146#comment-16114146
 ] 

ASF GitHub Bot commented on FLINK-7369:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4474
  
Hi,

sorry that I saw this PR too late because Jira was hanging and I only saw 
the email of the opened issue. I already merged a fix for this to master and 
1.3. I would have also suggested to remove the offset information from the 
message. From the user, the bounds and the index should be enough information.

Can you please close the PR?


> Add more information for `Key group index out of range of key group range` 
> exception
> 
>
> Key: FLINK-7369
> URL: https://issues.apache.org/jira/browse/FLINK-7369
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Benedict Jin
>Assignee: Benedict Jin
>
> When i got the following exception log, it make me confused that the index is 
> more than `32` or less than `16`. So, we should add more information for it.
> ```java
> java.lang.IllegalArgumentException: Key group index out of range of key group 
> range [16, 32).
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.setMapForKeyGroup(NestedMapsStateTable.java:104)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.putAndGetOld(NestedMapsStateTable.java:218)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:207)
>   at 
> org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:145)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:72)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processElement(AbstractKeyedCEPPatternOperator.java:171)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> ```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

2017-08-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4404


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-4565) Support for SQL IN operator

2017-08-04 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-4565.
-
   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed in 1.4.0: 42fd6cae68ea98668bac30cb02206a21852d3528 & 
4f776a2fd9fb29ce040837893b3d0f9f3c6f0a83

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
> Fix For: 1.4.0
>
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7370) rework operator documentation

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114140#comment-16114140
 ] 

ASF GitHub Bot commented on FLINK-7370:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4477

[FLINK-7370][docs] rework the operator documentation structure

## What is the purpose of the change

Improve the docs by re-structuring the operators documentation.

## Brief change log

- create category `Streaming/Operators`
- move `Streaming/Overview/DataStream Transformations` to 
`Streaming/Operators/Overview`
- move `ProcessFunction`, `Windows`, and `Async IO` to `Streaming/Operators`
- update previous links in the documentation
- create any necessary redirects for old URLs

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-7370

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4477.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4477


commit ce4f1ce928e6d21131b23cbc96c85a63adfaf941
Author: Nico Kruber 
Date:   2017-08-04T08:56:58Z

[FLINK-7370][docs] rework the operator documentation structure

- create category `Streaming/Operators`
- move `Streaming/Overview/DataStream Transformations` to 
`Streaming/Operators/Overview`
- move `ProcessFunction`, `Windows`, and `Async IO` to `Streaming/Operators`
- update previous links in the documentation
- create any necessary redirects for old URLs




> rework operator documentation
> -
>
> Key: FLINK-7370
> URL: https://issues.apache.org/jira/browse/FLINK-7370
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The structure of the operator documentation could be improved the following 
> way:
> - Create category {{Streaming/Operators}}.
> - Move {{Streaming/Overview/DataStream Transformations}} to 
> {{Streaming/Operators/Overview}}.
> - Move {{ProcessFunction}}, {{Windows}}, and {{Async IO}} to 
> {{Streaming/Operators}}
> - create any necessary redirects for old URLs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4477: [FLINK-7370][docs] rework the operator documentati...

2017-08-04 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4477

[FLINK-7370][docs] rework the operator documentation structure

## What is the purpose of the change

Improve the docs by re-structuring the operators documentation.

## Brief change log

- create category `Streaming/Operators`
- move `Streaming/Overview/DataStream Transformations` to 
`Streaming/Operators/Overview`
- move `ProcessFunction`, `Windows`, and `Async IO` to `Streaming/Operators`
- update previous links in the documentation
- create any necessary redirects for old URLs

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-7370

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4477.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4477


commit ce4f1ce928e6d21131b23cbc96c85a63adfaf941
Author: Nico Kruber 
Date:   2017-08-04T08:56:58Z

[FLINK-7370][docs] rework the operator documentation structure

- create category `Streaming/Operators`
- move `Streaming/Overview/DataStream Transformations` to 
`Streaming/Operators/Overview`
- move `ProcessFunction`, `Windows`, and `Async IO` to `Streaming/Operators`
- update previous links in the documentation
- create any necessary redirects for old URLs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114141#comment-16114141
 ] 

ASF GitHub Bot commented on FLINK-4565:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4404


> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
> Fix For: 1.4.0
>
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114142#comment-16114142
 ] 

ASF GitHub Bot commented on FLINK-4565:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3502


> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
> Fix For: 1.4.0
>
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3502: [FLINK-4565] Support for SQL IN operator

2017-08-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3502


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7307) Add proper command line parsing tool to ClusterEntrypoint

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114138#comment-16114138
 ] 

ASF GitHub Bot commented on FLINK-7307:
---

GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/4476

[FLINK-7307] Add proper command line parsing tool to ClusterEntrypoint

## What is the purpose of the change

Add a proper command line parsing tool `CommandLineParser` to the entry 
point of the `ClusterEntrypoint#parseArguments`


## Brief change log

  - *Add command line parsing tool `CommandLineParser`*
  - *Use `CommandLineParser` instead of `ParameterTool` in 
`ClusterEntrypoint#parseArguments`*


## Verifying this change

*(Please pick either of the following options)*
This change added tests and can be verified as follows:

*(example:)*
  - *Added test case `CommandLineParserTest`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-7307

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4476.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4476


commit 9074a186b7961e7218a053ed211876317514a6ee
Author: zjureel 
Date:   2017-08-04T08:49:35Z

[FLINK-7307][improvement] Add proper command line parsing tool to 
ClusterEntrypoint




> Add proper command line parsing tool to ClusterEntrypoint
> -
>
> Key: FLINK-7307
> URL: https://issues.apache.org/jira/browse/FLINK-7307
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Fang Yong
>  Labels: flip-6
>
> We need to add a proper command line parsing tool to the entry point of the 
> {{ClusterEntrypoint#parseArguments}}. At the moment, we are simply using the 
> {{ParameterTool}} as a temporary solution.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4476: [FLINK-7307] Add proper command line parsing tool ...

2017-08-04 Thread zjureel
GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/4476

[FLINK-7307] Add proper command line parsing tool to ClusterEntrypoint

## What is the purpose of the change

Add a proper command line parsing tool `CommandLineParser` to the entry 
point of the `ClusterEntrypoint#parseArguments`


## Brief change log

  - *Add command line parsing tool `CommandLineParser`*
  - *Use `CommandLineParser` instead of `ParameterTool` in 
`ClusterEntrypoint#parseArguments`*


## Verifying this change

*(Please pick either of the following options)*
This change added tests and can be verified as follows:

*(example:)*
  - *Added test case `CommandLineParserTest`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-7307

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4476.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4476


commit 9074a186b7961e7218a053ed211876317514a6ee
Author: zjureel 
Date:   2017-08-04T08:49:35Z

[FLINK-7307][improvement] Add proper command line parsing tool to 
ClusterEntrypoint




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >