[GitHub] flink issue #4513: [FLINK-6938] [cep] IterativeCondition should support Rich...

2017-08-21 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4513
  
@dawidwys I have updated the PR and it currently only contains changes of 
the RichFunction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6938) IterativeCondition should support RichFunction interface

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136338#comment-16136338
 ] 

ASF GitHub Bot commented on FLINK-6938:
---

Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4513
  
@dawidwys I have updated the PR and it currently only contains changes of 
the RichFunction.


> IterativeCondition should support RichFunction interface
> 
>
> Key: FLINK-6938
> URL: https://issues.apache.org/jira/browse/FLINK-6938
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Jark Wu
>Assignee: Jark Wu
> Fix For: 1.4.0
>
>
> In FLIP-20, we need IterativeCondition to support an {{open()}} method to 
> compile the generated code once. We do not want to insert a if condition  in 
> the {{filter()}} method. So I suggest make IterativeCondition support 
> {{RichFunction}} interface.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7398) Table API operators/UDFs must not store Logger

2017-08-21 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136280#comment-16136280
 ] 

Haohui Mai commented on FLINK-7398:
---

+1 on logging trait. I'll submit a PR.

Adding a checkstyle rule is also a good idea.


> Table API operators/UDFs must not store Logger
> --
>
> Key: FLINK-7398
> URL: https://issues.apache.org/jira/browse/FLINK-7398
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Haohui Mai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: Example.png
>
>
> Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in 
> an instance field (c.f. 
> https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39).
>  This means that the {{Logger}} will be serialised with the UDF and sent to 
> the cluster. This in itself does not sound right and leads to problems when 
> the slf4j configuration on the Client is different from the cluster 
> environment.
> This is an example of a user running into that problem: 
> https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E.
>  Here, they have Logback on the client but the Logback classes are not 
> available on the cluster and so deserialisation of the UDFs fails with a 
> {{ClassNotFoundException}}.
> This is a rough list of the involved classes:
> {code}
> src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: 
>  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45:
>   private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28:  
> val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28:  
> val LOGICAL: Convention = new Convention.Impl("LOGICAL", 
> classOf[FlinkLogicalRel])
> src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38:  val 
> LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
> src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48:
>   val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66:
>   val LOG = 

[jira] [Commented] (FLINK-7423) Always reuse an instance to get elements from the inputFormat

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136247#comment-16136247
 ] 

ASF GitHub Bot commented on FLINK-7423:
---

Github user XuPingyong commented on the issue:

https://github.com/apache/flink/pull/4525
  
Thanks @StephanEwen ,  so I think `null` values can be returned from 
`InputFormat#nextRecord` anytime, even not end. 

Can `null` values be passed to `InputFormat#nextRecord`,  or checkNull(if 
`null`, give a new object) before passed?


> Always reuse an instance  to get elements from the inputFormat 
> ---
>
> Key: FLINK-7423
> URL: https://issues.apache.org/jira/browse/FLINK-7423
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Xu Pingyong
>Assignee: Xu Pingyong
>
> In InputFormatSourceFunction.java:
> {code:java}
> OUT nextElement = serializer.createInstance();
>   while (isRunning) {
>   format.open(splitIterator.next());
>   // for each element we also check if cancel
>   // was called by checking the isRunning flag
>   while (isRunning && !format.reachedEnd()) {
>   nextElement = 
> format.nextRecord(nextElement);
>   if (nextElement != null) {
>   ctx.collect(nextElement);
>   } else {
>   break;
>   }
>   }
>   format.close();
>   completedSplitsCounter.inc();
>   if (isRunning) {
>   isRunning = splitIterator.hasNext();
>   }
>   }
> {code}
> the format may return other element or null when nextRecord, that will may 
> cause exception.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4525: [FLINK-7423] Always reuse an instance to get elements fro...

2017-08-21 Thread XuPingyong
Github user XuPingyong commented on the issue:

https://github.com/apache/flink/pull/4525
  
Thanks @StephanEwen ,  so I think `null` values can be returned from 
`InputFormat#nextRecord` anytime, even not end. 

Can `null` values be passed to `InputFormat#nextRecord`,  or checkNull(if 
`null`, give a new object) before passed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4570: [FLINK-7438][DataStream API]Remove useless import,...

2017-08-21 Thread yew1eb
GitHub user yew1eb opened a pull request:

https://github.com/apache/flink/pull/4570

[FLINK-7438][DataStream API]Remove useless import, avoid warnings

## What is the purpose of the change
Avoid warnings,details: [ISSUE 
#FLINK-7438](https://issues.apache.org/jira/browse/FLINK-7438).

## Brief change log
Remove useless "import org.apache.flink.util.OutputTag" in 
DataStream.scala, AllWindowedStream.scala, WindowedStream.scala

## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/flink FLINK-7438

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4570.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4570


commit d28f4b8eca85230faea8c3b8c9b868741fc63886
Author: yew1eb 
Date:   2017-08-22T03:16:51Z

Remove useless import, avoid warnings




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7438) Some classes are eclipsed by classes in package scala

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136217#comment-16136217
 ] 

ASF GitHub Bot commented on FLINK-7438:
---

GitHub user yew1eb opened a pull request:

https://github.com/apache/flink/pull/4570

[FLINK-7438][DataStream API]Remove useless import, avoid warnings

## What is the purpose of the change
Avoid warnings,details: [ISSUE 
#FLINK-7438](https://issues.apache.org/jira/browse/FLINK-7438).

## Brief change log
Remove useless "import org.apache.flink.util.OutputTag" in 
DataStream.scala, AllWindowedStream.scala, WindowedStream.scala

## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/flink FLINK-7438

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4570.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4570


commit d28f4b8eca85230faea8c3b8c9b868741fc63886
Author: yew1eb 
Date:   2017-08-22T03:16:51Z

Remove useless import, avoid warnings




> Some classes are eclipsed by classes in package scala
> -
>
> Key: FLINK-7438
> URL: https://issues.apache.org/jira/browse/FLINK-7438
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, DataStream API
>Reporter: Ted Yu
>Priority: Minor
>
> Noticed the following during compilation:
> {code}
> [WARNING] 
> /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33:
>  warning: imported `OutputTag' is permanently  hidden by definition of 
> object OutputTag in package scala
> [WARNING] import org.apache.flink.util.{Collector, OutputTag}
> [WARNING]  ^
> [WARNING] 
> /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33:
>  warning: imported `OutputTag' is permanently  hidden by definition of 
> class OutputTag in package scala
> [WARNING] import org.apache.flink.util.{Collector, OutputTag}
> {code}
> We should avoid the warning e.r.t. OutputTag.
> There may be other occurrences of similar warning.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7479) Support to retrieve the past event by physical offset

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136210#comment-16136210
 ] 

ASF GitHub Bot commented on FLINK-7479:
---

Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4563
  
@dawidwys Have updated the PR according to your comments.


> Support to retrieve the past event by physical offset 
> --
>
> Key: FLINK-7479
> URL: https://issues.apache.org/jira/browse/FLINK-7479
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, it's already able to retrieve events matched to the specifed 
> pattern in {{IterativeCondition.Context}}. While there are also requirements 
> to retrieve events by an physical offset. The retrieved events may not be 
> matched to any pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4563: [FLINK-7479] [cep] Support to retrieve the past event by ...

2017-08-21 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4563
  
@dawidwys Have updated the PR according to your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4422) Convert all time interval measurements to System.nanoTime()

2017-08-21 Thread Hai Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136194#comment-16136194
 ] 

Hai Zhou commented on FLINK-4422:
-

Hi [~StephanEwen], This is still a issue?
If yes, I would like to work on this issue.
Use the methods provided by 
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/SystemClock.java]
 to replace the time measurement code, such as relative / intervals.

> Convert all time interval measurements to System.nanoTime()
> ---
>
> Key: FLINK-4422
> URL: https://issues.apache.org/jira/browse/FLINK-4422
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Jin Mingjian
>Priority: Minor
>
> In contrast to {{System.currentTimeMillis()}}, {{System.nanoTime()}} is 
> monotonous. To measure delays and time intervals, {{System.nanoTime()}} is 
> hence reliable, while {{System.currentTimeMillis()}} is not.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI

2017-08-21 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136174#comment-16136174
 ] 

Jark Wu commented on FLINK-7465:


Hi [~sunjincheng121], 
Regarding to the when to configure the accuracy, I think maybe before the 
function is registered is better. (1). the accuracy parameter can be checked 
before runtime  (2) do not need to check whether the bitarray and hash function 
is initialized when every time the accumulator is called. 

Regarding to the de/serialize, 

- de/serialization bitArray every call the accumulate
   It maybe to expensive as I mentioned before.
- de/serialization bitArray in check point 
  We can't we do as I know. We implement the UDAGG by {{State}} interface. If 
we use Heap backend, the deserialization only happens in checkpoint, but if it 
is RocksDB backend, the de/serialization happens in every update.
- de/serialization bitArray in open/close
  Do you mean not use State in UDAGG? How can we do the exactly-once then? 

I'm designing an implementation which is based on MapView(i.e. MapView) , it acts like {{long[]}} which can be used as bitarray or bitmap, but 
have a better performance. In this way, we only have to deserialize several 
longs in every {{accumulate}} call. What do you think? [~fhueske] 
[~sunjincheng121]

> Add build-in BloomFilterCount on TableAPI
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136167#comment-16136167
 ] 

ASF GitHub Bot commented on FLINK-7367:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4473
  
@tzulitai any more feedbacks? We have a ticket on my company for this task, 
and I'd like to mark it as finished if possible :)


> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...

2017-08-21 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4473
  
@tzulitai any more feedbacks? We have a ticket on my company for this task, 
and I'd like to mark it as finished if possible :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7366) Upgrade kinesis producer library in flink-connector-kinesis

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136166#comment-16136166
 ] 

ASF GitHub Bot commented on FLINK-7366:
---

GitHub user bowenli86 reopened a pull request:

https://github.com/apache/flink/pull/4522

[FLINK-7366][kinesis connector] Upgrade kinesis producer library in 
flink-connector-kinesis

## What is the purpose of the change

We need to upgrade KPL and KCL to pick up the enhanced performance and 
stability for Flink to work better  with Kinesis. Upgrading KPL is specially 
necessary, because the KPL version Flink uses is old, and doesn't have good 
retry and error handling logic.

**Upgrade KPL:**

flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, 
which is released in Nov 2015 by AWS. It's old. It's the fourth release, and 
thus problematic. It doesn't even have good retry logic, therefore Flink fails 
really frequently (about every 10 mins as we observed) when Flink writes too 
fast to Kinesis and receives RateLimitExceededException,

Quotes from https://github.com/awslabs/amazon-kinesis-producer/issues/56, 
"With the newer version of the KPL it uses the AWS C++ SDK which should offer 
additional retries." on Oct 2016. 0.12.5, the version we are upgrading to, is 
released in May 2017 and should have the enhanced retry logic.

**Upgrade KCL:**

Upgrade KCL from 1.6.2 to 1.8.1

**Upgrade AWS SDK:**

from 1.10.71 to 1.11.171

## Verifying this change

This change is already covered by existing tests

## Does this pull request potentially affect one of the following parts:

There should be any impact outside flink-connector-kinesis, because 1) KPL 
and KCL is only used in flink-connector-kinesis, and 2) AWS SDK in 
flink-connector-kinesis is shaded 

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7366

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4522.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4522


commit 7486878631e7238283eabfdd575387d32b210f91
Author: Bowen Li 
Date:   2017-08-10T22:09:56Z

FLINK-7366 Upgrade kinesis-producer-library in flink-connector-kinesis from 
0.10.2 to 0.12.5

commit ba9fb5f1e189e31f8085bd384edbb682a69aae7f
Author: Bowen Li 
Date:   2017-08-10T23:46:59Z

upgrade KCL and AWS SDK

commit 88a16efcaf4c7e2addcf978a98f8ffbea4281639
Author: Bowen Li 
Date:   2017-08-13T05:49:26Z

revert changes to KCL




> Upgrade kinesis producer library in flink-connector-kinesis
> ---
>
> Key: FLINK-7366
> URL: https://issues.apache.org/jira/browse/FLINK-7366
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> We need to upgrade KPL and KCL to pick up the enhanced performance and 
> stability for Flink to work better  with Kinesis. Upgrading KPL is specially 
> necessary, because the KPL version Flink uses is old, and doesn't have good 
> retry and error handling logic.
> *KPL:*
> flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, which 
> is released in Nov 2015 by AWS. It's old. It's the fourth release, and thus 
> problematic. It doesn't even have good retry logic, therefore Flink fails 
> really frequently (about every 10 mins as we observed) when Flink writes too 
> fast to Kinesis and receives RateLimitExceededException, 
> Quotes from https://github.com/awslabs/amazon-kinesis-producer/issues/56, 
> "*With the newer version of the KPL it uses the AWS C++ SDK which should 
> offer additional retries.*" on Oct 2016. 0.12.5, the version we are upgrading 
> to, is released in May 2017 and should have the enhanced retry logic.
> *KCL:*
> Upgrade KCL from 1.6.2 to 1.8.1
> *AWS SDK*
> from 1.10.71 to 1.11.171



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7366) Upgrade kinesis producer library in flink-connector-kinesis

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136163#comment-16136163
 ] 

ASF GitHub Bot commented on FLINK-7366:
---

Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/4522


> Upgrade kinesis producer library in flink-connector-kinesis
> ---
>
> Key: FLINK-7366
> URL: https://issues.apache.org/jira/browse/FLINK-7366
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> We need to upgrade KPL and KCL to pick up the enhanced performance and 
> stability for Flink to work better  with Kinesis. Upgrading KPL is specially 
> necessary, because the KPL version Flink uses is old, and doesn't have good 
> retry and error handling logic.
> *KPL:*
> flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, which 
> is released in Nov 2015 by AWS. It's old. It's the fourth release, and thus 
> problematic. It doesn't even have good retry logic, therefore Flink fails 
> really frequently (about every 10 mins as we observed) when Flink writes too 
> fast to Kinesis and receives RateLimitExceededException, 
> Quotes from https://github.com/awslabs/amazon-kinesis-producer/issues/56, 
> "*With the newer version of the KPL it uses the AWS C++ SDK which should 
> offer additional retries.*" on Oct 2016. 0.12.5, the version we are upgrading 
> to, is released in May 2017 and should have the enhanced retry logic.
> *KCL:*
> Upgrade KCL from 1.6.2 to 1.8.1
> *AWS SDK*
> from 1.10.71 to 1.11.171



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4522: [FLINK-7366][kinesis connector] Upgrade kinesis pr...

2017-08-21 Thread bowenli86
GitHub user bowenli86 reopened a pull request:

https://github.com/apache/flink/pull/4522

[FLINK-7366][kinesis connector] Upgrade kinesis producer library in 
flink-connector-kinesis

## What is the purpose of the change

We need to upgrade KPL and KCL to pick up the enhanced performance and 
stability for Flink to work better  with Kinesis. Upgrading KPL is specially 
necessary, because the KPL version Flink uses is old, and doesn't have good 
retry and error handling logic.

**Upgrade KPL:**

flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, 
which is released in Nov 2015 by AWS. It's old. It's the fourth release, and 
thus problematic. It doesn't even have good retry logic, therefore Flink fails 
really frequently (about every 10 mins as we observed) when Flink writes too 
fast to Kinesis and receives RateLimitExceededException,

Quotes from https://github.com/awslabs/amazon-kinesis-producer/issues/56, 
"With the newer version of the KPL it uses the AWS C++ SDK which should offer 
additional retries." on Oct 2016. 0.12.5, the version we are upgrading to, is 
released in May 2017 and should have the enhanced retry logic.

**Upgrade KCL:**

Upgrade KCL from 1.6.2 to 1.8.1

**Upgrade AWS SDK:**

from 1.10.71 to 1.11.171

## Verifying this change

This change is already covered by existing tests

## Does this pull request potentially affect one of the following parts:

There should be any impact outside flink-connector-kinesis, because 1) KPL 
and KCL is only used in flink-connector-kinesis, and 2) AWS SDK in 
flink-connector-kinesis is shaded 

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7366

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4522.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4522


commit 7486878631e7238283eabfdd575387d32b210f91
Author: Bowen Li 
Date:   2017-08-10T22:09:56Z

FLINK-7366 Upgrade kinesis-producer-library in flink-connector-kinesis from 
0.10.2 to 0.12.5

commit ba9fb5f1e189e31f8085bd384edbb682a69aae7f
Author: Bowen Li 
Date:   2017-08-10T23:46:59Z

upgrade KCL and AWS SDK

commit 88a16efcaf4c7e2addcf978a98f8ffbea4281639
Author: Bowen Li 
Date:   2017-08-13T05:49:26Z

revert changes to KCL




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7366) Upgrade kinesis producer library in flink-connector-kinesis

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136162#comment-16136162
 ] 

ASF GitHub Bot commented on FLINK-7366:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4522
  
@tzulitai any more feedbacks? We have a ticket on my company for this task, 
and I'd like to mark it as finished if possible :)


> Upgrade kinesis producer library in flink-connector-kinesis
> ---
>
> Key: FLINK-7366
> URL: https://issues.apache.org/jira/browse/FLINK-7366
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> We need to upgrade KPL and KCL to pick up the enhanced performance and 
> stability for Flink to work better  with Kinesis. Upgrading KPL is specially 
> necessary, because the KPL version Flink uses is old, and doesn't have good 
> retry and error handling logic.
> *KPL:*
> flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, which 
> is released in Nov 2015 by AWS. It's old. It's the fourth release, and thus 
> problematic. It doesn't even have good retry logic, therefore Flink fails 
> really frequently (about every 10 mins as we observed) when Flink writes too 
> fast to Kinesis and receives RateLimitExceededException, 
> Quotes from https://github.com/awslabs/amazon-kinesis-producer/issues/56, 
> "*With the newer version of the KPL it uses the AWS C++ SDK which should 
> offer additional retries.*" on Oct 2016. 0.12.5, the version we are upgrading 
> to, is released in May 2017 and should have the enhanced retry logic.
> *KCL:*
> Upgrade KCL from 1.6.2 to 1.8.1
> *AWS SDK*
> from 1.10.71 to 1.11.171



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4522: [FLINK-7366][kinesis connector] Upgrade kinesis pr...

2017-08-21 Thread bowenli86
Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/4522


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4522: [FLINK-7366][kinesis connector] Upgrade kinesis producer ...

2017-08-21 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4522
  
@tzulitai any more feedbacks? We have a ticket on my company for this task, 
and I'd like to mark it as finished if possible :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-7398) Table API operators/UDFs must not store Logger

2017-08-21 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136140#comment-16136140
 ] 

Jark Wu edited comment on FLINK-7398 at 8/22/17 1:47 AM:
-

+1 Logging trait. 

BTW, I think to avoid the misusage of {{LOG}} by inherit the {{Logging}} trait 
is not perfect. What if user not inherit the Logging trait?  I would like to 
add a checkstyle to avoid using slf4j Logger. 


was (Author: jark):
+1 Logging trait. 

BTW, I think to avoid the misusage of {{LOG}} by inherit the {{Logging}} trait 
is not perfect. What if user not inherit the Logging trait?  I would like to 
add a checkstyle to avoid using slf4j Logger directly. 

> Table API operators/UDFs must not store Logger
> --
>
> Key: FLINK-7398
> URL: https://issues.apache.org/jira/browse/FLINK-7398
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Haohui Mai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: Example.png
>
>
> Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in 
> an instance field (c.f. 
> https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39).
>  This means that the {{Logger}} will be serialised with the UDF and sent to 
> the cluster. This in itself does not sound right and leads to problems when 
> the slf4j configuration on the Client is different from the cluster 
> environment.
> This is an example of a user running into that problem: 
> https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E.
>  Here, they have Logback on the client but the Logback classes are not 
> available on the cluster and so deserialisation of the UDFs fails with a 
> {{ClassNotFoundException}}.
> This is a rough list of the involved classes:
> {code}
> src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: 
>  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45:
>   private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28:  
> val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28:  
> val LOGICAL: Convention = new Convention.Impl("LOGICAL", 
> classOf[FlinkLogicalRel])
> src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38:  val 
> LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
> src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52:
>   val LOG = LoggerFactory.getLogger(this.getClass)

[jira] [Commented] (FLINK-7398) Table API operators/UDFs must not store Logger

2017-08-21 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136140#comment-16136140
 ] 

Jark Wu commented on FLINK-7398:


+1 Logging trait. 

BTW, I think to avoid the misusage of {{LOG}} by inherit the {{Logging}} trait 
is not perfect. What if user not inherit the Logging trait?  I would like to 
add a checkstyle to avoid using slf4j Logger directly. 

> Table API operators/UDFs must not store Logger
> --
>
> Key: FLINK-7398
> URL: https://issues.apache.org/jira/browse/FLINK-7398
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Haohui Mai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: Example.png
>
>
> Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in 
> an instance field (c.f. 
> https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39).
>  This means that the {{Logger}} will be serialised with the UDF and sent to 
> the cluster. This in itself does not sound right and leads to problems when 
> the slf4j configuration on the Client is different from the cluster 
> environment.
> This is an example of a user running into that problem: 
> https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E.
>  Here, they have Logback on the client but the Logback classes are not 
> available on the cluster and so deserialisation of the UDFs fails with a 
> {{ClassNotFoundException}}.
> This is a rough list of the involved classes:
> {code}
> src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: 
>  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45:
>   private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28:  
> val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28:  
> val LOGICAL: Convention = new Convention.Impl("LOGICAL", 
> classOf[FlinkLogicalRel])
> src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38:  val 
> LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
> src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48:
>   val LOG: Logger = 

[jira] [Updated] (FLINK-7488) TaskManagerHeapSizeCalculationJavaBashTest sometimes fails

2017-08-21 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7488:
--
Description: 
{code}
compareNetworkBufShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest)
  Time elapsed: 0.239 sec  <<< FAILURE!
org.junit.ComparisonFailure: Different network buffer memory sizes with 
configuration: {taskmanager.network.memory.fraction=0.1, 
taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, 
taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, 
taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} 
expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf 
because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to 
augment the Hadoop classpath: 
/usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600>
  at org.junit.Assert.assertEquals(Assert.java:115)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:235)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:81)

compareHeapSizeShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest)
  Time elapsed: 0.16 sec  <<< FAILURE!
org.junit.ComparisonFailure: Different heap sizes with configuration: 
{taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, 
taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, 
taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, 
taskmanager.network.memory.min=67108864} expected:<[]1000> but was:<[Setting 
HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the 
result of 'hadoop classpath' to augment the Hadoop classpath: 
/usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]1000>
  at org.junit.Assert.assertEquals(Assert.java:115)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:275)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:110){code}
$HADOOP_CONF_DIR was not set prior to running the test.

  was:
{code}
  
TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeShellScriptWithJava:110->compareHeapSizeJavaVsScript:275
 Different heap sizes with configuration: 
{taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, 
taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, 
taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, 
taskmanager.network.memory.min=67108864} expected:<[]1000> but was:<[Setting 
HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the 
result of 'hadoop classpath' to augment the Hadoop classpath: 
/usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]1000>
  
TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava:81->compareNetworkBufJavaVsScript:235
 Different network buffer memory sizes with configuration: 
{taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, 
taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, 
taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, 
taskmanager.network.memory.min=67108864} expected:<[]104857600> but 
was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was 
set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: 

[jira] [Created] (FLINK-7488) TaskManagerHeapSizeCalculationJavaBashTest sometimes fails

2017-08-21 Thread Ted Yu (JIRA)
Ted Yu created FLINK-7488:
-

 Summary: TaskManagerHeapSizeCalculationJavaBashTest sometimes fails
 Key: FLINK-7488
 URL: https://issues.apache.org/jira/browse/FLINK-7488
 Project: Flink
  Issue Type: Test
Reporter: Ted Yu
Priority: Minor


{code}
  
TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeShellScriptWithJava:110->compareHeapSizeJavaVsScript:275
 Different heap sizes with configuration: 
{taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, 
taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, 
taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, 
taskmanager.network.memory.min=67108864} expected:<[]1000> but was:<[Setting 
HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the 
result of 'hadoop classpath' to augment the Hadoop classpath: 
/usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]1000>
  
TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava:81->compareNetworkBufJavaVsScript:235
 Different network buffer memory sizes with configuration: 
{taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, 
taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, 
taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, 
taskmanager.network.memory.min=67108864} expected:<[]104857600> but 
was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was 
set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: 
/usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600>
{code}
$HADOOP_CONF_DIR was not set prior to running the test.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI

2017-08-21 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136062#comment-16136062
 ] 

sunjincheng commented on FLINK-7465:


[~fhueske] I want add accuracy and maxElement as function parameter,the 
function signature looks like: 
{code}
count-bf(accuracy:Double, maxKeyCount, col:Any)
{code}
 
And we will use the following formula to calculate the bitarray size(bsize):
{code}
(-maxKeyCount * Math.log(accuracy) / (Math.log(2) * Math.log(2)))
{code}
And we will use the following formula to calculate the cont of hash function:
{code}
Math.max(1, Math.round(bsize.asInstanceOf[Double] / maxKeyCount * Math.log(2)))
{code}
The formula same as the reference of the JIRA. description. 
That mean we configure the accuracy when the function is used. Is this make 
sense for you? [~fhueske]

I think {{count-min}}  is very useful in some certain cases. so does the 
{{HyperLogLog}} (cardinality counting). After we complete the this JIRA. we can 
discuss these implementations.

[~jark] The de/serialize of bitArray if very important in the implementation. I 
think the best way is do the de/serialization at check point or in 
{{open/close}} method, but currently we can not access the {{RuntimeContext}} 
from {{FunctionContext}},we need do some change. OR using DataView.  Currently 
In my mind we have some choices as follows:
* de/serialization bitArray every call the {{accumulate}}(bitArray as member of 
ACC)
* de/serialization bitArray in check point.( bitArray as member of AGG)
* de/serialization bitArray in {{open/close}} .( bitArray as member of AGG)

What do you think? [~jark] [~fhueske]



> Add build-in BloomFilterCount on TableAPI
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7482) StringWriter to support compression

2017-08-21 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-7482.
-
Resolution: Not A Bug

Hi [~felixcheung]. This is a great question for the dev or user [mailing 
lists|https://flink.apache.org/community.html#mailing-lists].

> StringWriter to support compression
> ---
>
> Key: FLINK-7482
> URL: https://issues.apache.org/jira/browse/FLINK-7482
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.3.2
>Reporter: Felix Cheung
>
> Is it possible to have StringWriter support compression like 
> AvroKeyValueSinkWriter or SequenceFileWriter?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7454) update 'Monitoring Current Event Time' section of Flink doc

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136025#comment-16136025
 ] 

ASF GitHub Bot commented on FLINK-7454:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4547
  
any feedback?


> update 'Monitoring Current Event Time' section of Flink doc
> ---
>
> Key: FLINK-7454
> URL: https://issues.apache.org/jira/browse/FLINK-7454
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> Since FLINK-3427 is done, there's no need to have the following doc in 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_event_time.html#monitoring-current-event-time
> "There are plans (see FLINK-3427) to show the current low watermark for each 
> operator in the Flink web interface.
> Until this feature is implemented the current low watermark for each task can 
> be accessed through the metrics system."
> We can replace it with something like "Low watermarks of each task can be 
> accessed either from Flink web interface or Flink metric system."



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4547: [FLINK-7454][docs] update 'Monitoring Current Event Time'...

2017-08-21 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4547
  
any feedback?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-7398) Table API operators/UDFs must not store Logger

2017-08-21 Thread Jacob Park (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Park updated FLINK-7398:
--
Attachment: Example.png

> Table API operators/UDFs must not store Logger
> --
>
> Key: FLINK-7398
> URL: https://issues.apache.org/jira/browse/FLINK-7398
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Haohui Mai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: Example.png
>
>
> Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in 
> an instance field (c.f. 
> https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39).
>  This means that the {{Logger}} will be serialised with the UDF and sent to 
> the cluster. This in itself does not sound right and leads to problems when 
> the slf4j configuration on the Client is different from the cluster 
> environment.
> This is an example of a user running into that problem: 
> https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E.
>  Here, they have Logback on the client but the Logback classes are not 
> available on the cluster and so deserialisation of the UDFs fails with a 
> {{ClassNotFoundException}}.
> This is a rough list of the involved classes:
> {code}
> src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: 
>  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45:
>   private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28:  
> val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28:  
> val LOGICAL: Convention = new Convention.Impl("LOGICAL", 
> classOf[FlinkLogicalRel])
> src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38:  val 
> LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
> src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48:
>   val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> 

[jira] [Comment Edited] (FLINK-7398) Table API operators/UDFs must not store Logger

2017-08-21 Thread Jacob Park (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135534#comment-16135534
 ] 

Jacob Park edited comment on FLINK-7398 at 8/21/17 6:28 PM:


[~wheat9] Why not follow Apache Spark's example for this problem?
 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/Logging.scala

By having a Logging trait with a transient lazy LOG. If you want safe logging, 
you inherit the trait, and the current existing code will produce a 
compile-time error by the conflict, which you can use to fix the bad logging.

Edit: Refer to my Example.png.

!https://issues.apache.org/jira/secure/attachment/12882940/Example.png!

I can take on this task if you want.


was (Author: jparkie):
[~wheat9] Why not follow Apache Spark's example for this problem?
 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/Logging.scala

By having a Logging trait with a transient lazy LOG. If you want safe logging, 
you inherit the trait, and the current existing code will produce a 
compile-time error by the conflict, which you can use to fix the bad logging.

Edit: Refer to my Example.png.

!https://issues.apache.org/jira/secure/attachment/12882932/Example.png!

I can take on this task if you want.

> Table API operators/UDFs must not store Logger
> --
>
> Key: FLINK-7398
> URL: https://issues.apache.org/jira/browse/FLINK-7398
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Haohui Mai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: Example.png
>
>
> Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in 
> an instance field (c.f. 
> https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39).
>  This means that the {{Logger}} will be serialised with the UDF and sent to 
> the cluster. This in itself does not sound right and leads to problems when 
> the slf4j configuration on the Client is different from the cluster 
> environment.
> This is an example of a user running into that problem: 
> https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E.
>  Here, they have Logback on the client but the Logback classes are not 
> available on the cluster and so deserialisation of the UDFs fails with a 
> {{ClassNotFoundException}}.
> This is a rough list of the involved classes:
> {code}
> src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: 
>  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45:
>   private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28:  
> val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28:  
> val LOGICAL: Convention = new Convention.Impl("LOGICAL", 
> classOf[FlinkLogicalRel])
> src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38:  val 
> LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
> src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> 

[jira] [Updated] (FLINK-7398) Table API operators/UDFs must not store Logger

2017-08-21 Thread Jacob Park (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Park updated FLINK-7398:
--
Attachment: (was: Example.png)

> Table API operators/UDFs must not store Logger
> --
>
> Key: FLINK-7398
> URL: https://issues.apache.org/jira/browse/FLINK-7398
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Haohui Mai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in 
> an instance field (c.f. 
> https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39).
>  This means that the {{Logger}} will be serialised with the UDF and sent to 
> the cluster. This in itself does not sound right and leads to problems when 
> the slf4j configuration on the Client is different from the cluster 
> environment.
> This is an example of a user running into that problem: 
> https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E.
>  Here, they have Logback on the client but the Logback classes are not 
> available on the cluster and so deserialisation of the UDFs fails with a 
> {{ClassNotFoundException}}.
> This is a rough list of the involved classes:
> {code}
> src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: 
>  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45:
>   private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28:  
> val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28:  
> val LOGICAL: Convention = new Convention.Impl("LOGICAL", 
> classOf[FlinkLogicalRel])
> src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38:  val 
> LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
> src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48:
>   val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala:61:
>   

[jira] [Comment Edited] (FLINK-7398) Table API operators/UDFs must not store Logger

2017-08-21 Thread Jacob Park (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135534#comment-16135534
 ] 

Jacob Park edited comment on FLINK-7398 at 8/21/17 6:17 PM:


[~wheat9] Why not follow Apache Spark's example for this problem?
 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/Logging.scala

By having a Logging trait with a transient lazy LOG. If you want safe logging, 
you inherit the trait, and the current existing code will produce a 
compile-time error by the conflict, which you can use to fix the bad logging.

Edit: Refer to my Example.png.

!Example.png|thumbnail!

I can take on this task if you want.


was (Author: jparkie):
[~wheat9] Why not follow Apache Spark's example for this problem?
 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/Logging.scala

By having a Logging trait with a transient lazy LOG. If you want safe logging, 
you inherit the trait, and the current existing code will produce a 
compile-time error by the conflict, which you can use to fix the bad logging.

> Table API operators/UDFs must not store Logger
> --
>
> Key: FLINK-7398
> URL: https://issues.apache.org/jira/browse/FLINK-7398
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Haohui Mai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: Example.png
>
>
> Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in 
> an instance field (c.f. 
> https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39).
>  This means that the {{Logger}} will be serialised with the UDF and sent to 
> the cluster. This in itself does not sound right and leads to problems when 
> the slf4j configuration on the Client is different from the cluster 
> environment.
> This is an example of a user running into that problem: 
> https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E.
>  Here, they have Logback on the client but the Logback classes are not 
> available on the cluster and so deserialisation of the UDFs fails with a 
> {{ClassNotFoundException}}.
> This is a rough list of the involved classes:
> {code}
> src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: 
>  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45:
>   private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28:  
> val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28:  
> val LOGICAL: Convention = new Convention.Impl("LOGICAL", 
> classOf[FlinkLogicalRel])
> src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38:  val 
> LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
> src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> 

[jira] [Comment Edited] (FLINK-7398) Table API operators/UDFs must not store Logger

2017-08-21 Thread Jacob Park (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135534#comment-16135534
 ] 

Jacob Park edited comment on FLINK-7398 at 8/21/17 6:17 PM:


[~wheat9] Why not follow Apache Spark's example for this problem?
 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/Logging.scala

By having a Logging trait with a transient lazy LOG. If you want safe logging, 
you inherit the trait, and the current existing code will produce a 
compile-time error by the conflict, which you can use to fix the bad logging.

Edit: Refer to my Example.png.

!https://issues.apache.org/jira/secure/attachment/12882932/Example.png!

I can take on this task if you want.


was (Author: jparkie):
[~wheat9] Why not follow Apache Spark's example for this problem?
 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/Logging.scala

By having a Logging trait with a transient lazy LOG. If you want safe logging, 
you inherit the trait, and the current existing code will produce a 
compile-time error by the conflict, which you can use to fix the bad logging.

Edit: Refer to my Example.png.

!Example.png|thumbnail!

I can take on this task if you want.

> Table API operators/UDFs must not store Logger
> --
>
> Key: FLINK-7398
> URL: https://issues.apache.org/jira/browse/FLINK-7398
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Haohui Mai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: Example.png
>
>
> Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in 
> an instance field (c.f. 
> https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39).
>  This means that the {{Logger}} will be serialised with the UDF and sent to 
> the cluster. This in itself does not sound right and leads to problems when 
> the slf4j configuration on the Client is different from the cluster 
> environment.
> This is an example of a user running into that problem: 
> https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E.
>  Here, they have Logback on the client but the Logback classes are not 
> available on the cluster and so deserialisation of the UDFs fails with a 
> {{ClassNotFoundException}}.
> This is a rough list of the involved classes:
> {code}
> src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: 
>  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45:
>   private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28:  
> val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28:  
> val LOGICAL: Convention = new Convention.Impl("LOGICAL", 
> classOf[FlinkLogicalRel])
> src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38:  val 
> LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
> src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> 

[jira] [Updated] (FLINK-7398) Table API operators/UDFs must not store Logger

2017-08-21 Thread Jacob Park (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Park updated FLINK-7398:
--
Attachment: Example.png

[~wheat9] Why not follow Apache Spark's example for this problem?
 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/Logging.scala

By having a Logging trait with a transient lazy LOG. If you want safe logging, 
you inherit the trait, and the current existing code will produce a 
compile-time error by the conflict, which you can use to fix the bad logging.

> Table API operators/UDFs must not store Logger
> --
>
> Key: FLINK-7398
> URL: https://issues.apache.org/jira/browse/FLINK-7398
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Haohui Mai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: Example.png
>
>
> Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in 
> an instance field (c.f. 
> https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39).
>  This means that the {{Logger}} will be serialised with the UDF and sent to 
> the cluster. This in itself does not sound right and leads to problems when 
> the slf4j configuration on the Client is different from the cluster 
> environment.
> This is an example of a user running into that problem: 
> https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E.
>  Here, they have Logback on the client but the Logback classes are not 
> available on the cluster and so deserialisation of the UDFs fails with a 
> {{ClassNotFoundException}}.
> This is a rough list of the involved classes:
> {code}
> src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: 
>  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45:
>   private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28:  
> val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28:  
> val LOGICAL: Convention = new Convention.Impl("LOGICAL", 
> classOf[FlinkLogicalRel])
> src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38:  val 
> LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
> src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55:
>   val LOG = 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135429#comment-16135429
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
The thing is that I use the 
```PythonEnvironmentConfig.pythonTmpCachePath``` attribute to deliver 
information from the ```PythonStreamBinder``` to a class that is called from 
the python script. 
How would you suggest to do it otherwise?


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-08-21 Thread zohar-mizrahi
Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
The thing is that I use the 
```PythonEnvironmentConfig.pythonTmpCachePath``` attribute to deliver 
information from the ```PythonStreamBinder``` to a class that is called from 
the python script. 
How would you suggest to do it otherwise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7487) test instability in ClassLoaderITCase (no resources available)

2017-08-21 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-7487:
--

 Summary: test instability in ClassLoaderITCase (no resources 
available)
 Key: FLINK-7487
 URL: https://issues.apache.org/jira/browse/FLINK-7487
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.4.0
Reporter: Nico Kruber


This is the stack trace from https://travis-ci.org/NicoK/flink/jobs/266772103 
which contains quite some changes but the error itself should be unrelated:

{code}
testKMeansJobWithCustomClassLoader(org.apache.flink.test.classloading.ClassLoaderITCase)
  Time elapsed: 0.604 sec  <<< ERROR!
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at 
org.apache.flink.test.classloading.ClassLoaderITCase.testKMeansJobWithCustomClassLoader(ClassLoaderITCase.java:232)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:930)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not 
enough free slots available to run the job. You can decrease the operator 
parallelism or increase the number of slots per TaskManager in the 
configuration. Task to schedule: < Attempt #0 (Map (Map at 
main(KMeansForTest.java:67)) (4/4)) @ (unassigned) - [SCHEDULED] > with groupID 
< 704e8c44f1c3edc91e03431408eb561d > in sharing group < SlotSharingGroup 
[727c589bfbe7c65aa4ffc75585a1e7e7, f82d7994fbfdd0aecab2c7f54e58f0c1, 
62039db00aa28f9de4fa3df3b89fbc7d, 704e8c44f1c3edc91e03431408eb561d, 
208a859a78f987562b4e8dcad6e90582, 9b9f002f990306532d6f153b38835b6f, 
30f3d92eacc3068d3545693fe084a6b8, 74da3f65164120b4781de360723e60c0] >. 
Resources available to scheduler: Number of instances=2, total number of 
slots=4, available slots=0
at 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:261)
at 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:138)
at 
org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:362)
at 
org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:304)
at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596)
at 
org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleOrUpdateConsumers$4(Execution.java:567)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

It seems that the job started in `testDisposeSavepointWithCustomKvState` is not 
properly shut down after the test method exits and (parts of) it remain and 
block resources for following tests. Copying the relevant parts of the log here:
{code}
13:46:30,887 INFO  org.apache.flink.test.classloading.ClassLoaderITCase 
 - 

Test 

[jira] [Updated] (FLINK-7487) test instability in ClassLoaderITCase (no resources available)

2017-08-21 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-7487:
---
Labels: test-stability  (was: )

> test instability in ClassLoaderITCase (no resources available)
> --
>
> Key: FLINK-7487
> URL: https://issues.apache.org/jira/browse/FLINK-7487
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>  Labels: test-stability
>
> This is the stack trace from https://travis-ci.org/NicoK/flink/jobs/266772103 
> which contains quite some changes but the error itself should be unrelated:
> {code}
> testKMeansJobWithCustomClassLoader(org.apache.flink.test.classloading.ClassLoaderITCase)
>   Time elapsed: 0.604 sec  <<< ERROR!
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>   at 
> org.apache.flink.test.classloading.ClassLoaderITCase.testKMeansJobWithCustomClassLoader(ClassLoaderITCase.java:232)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:930)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Not enough free slots available to run the job. You can decrease the operator 
> parallelism or increase the number of slots per TaskManager in the 
> configuration. Task to schedule: < Attempt #0 (Map (Map at 
> main(KMeansForTest.java:67)) (4/4)) @ (unassigned) - [SCHEDULED] > with 
> groupID < 704e8c44f1c3edc91e03431408eb561d > in sharing group < 
> SlotSharingGroup [727c589bfbe7c65aa4ffc75585a1e7e7, 
> f82d7994fbfdd0aecab2c7f54e58f0c1, 62039db00aa28f9de4fa3df3b89fbc7d, 
> 704e8c44f1c3edc91e03431408eb561d, 208a859a78f987562b4e8dcad6e90582, 
> 9b9f002f990306532d6f153b38835b6f, 30f3d92eacc3068d3545693fe084a6b8, 
> 74da3f65164120b4781de360723e60c0] >. Resources available to scheduler: Number 
> of instances=2, total number of slots=4, available slots=0
>   at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:261)
>   at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:138)
>   at 
> org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:362)
>   at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:304)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596)
>   at 
> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleOrUpdateConsumers$4(Execution.java:567)
>   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> It seems that the job started in `testDisposeSavepointWithCustomKvState` is 
> 

[jira] [Closed] (FLINK-5671) Test ClassLoaderITCase#testJobsWithCustomClassLoader fails

2017-08-21 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber closed FLINK-5671.
--
Resolution: Auto Closed

Closing this now since there is not enough information to track this down and 
also the test class evolved and may exhibit different behaviour now. Feel free 
to re-open or create a new report.

> Test ClassLoaderITCase#testJobsWithCustomClassLoader fails
> --
>
> Key: FLINK-5671
> URL: https://issues.apache.org/jira/browse/FLINK-5671
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
> Environment: Ubuntu 16.04
>Reporter: Anton Solovev
>  Labels: test-stability
>
> {code}
> testJobsWithCustomClassLoader(org.apache.flink.test.classloading.ClassLoaderITCase)
>   Time elapsed: 41.75 sec  <<< FAILURE!
> java.lang.AssertionError: The program execution failed: Job execution failed.
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.test.classloading.ClassLoaderITCase.testJobsWithCustomClassLoader(ClassLoaderITCase.java:221)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134248050
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -19,9 +19,21 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
-class QueryConfig private[table] extends Serializable {}
+class QueryConfig private[table] extends Serializable {
+}
+
+object QueryConfig {
+  def getQueryConfigFromTableEnv(tableEnv: TableEnvironment): QueryConfig 
= {
--- End diff --

I'm not sure if we should move the `queryConfig` method to TableEnvironment 
or leave it as an utility method, what do you think? @fhueske 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135246#comment-16135246
 ] 

ASF GitHub Bot commented on FLINK-6442:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134248050
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -19,9 +19,21 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
-class QueryConfig private[table] extends Serializable {}
+class QueryConfig private[table] extends Serializable {
+}
+
+object QueryConfig {
+  def getQueryConfigFromTableEnv(tableEnv: TableEnvironment): QueryConfig 
= {
--- End diff --

I'm not sure if we should move the `queryConfig` method to TableEnvironment 
or leave it as an utility method, what do you think? @fhueske 


> Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in 
> SQL
> ---
>
> Key: FLINK-6442
> URL: https://issues.apache.org/jira/browse/FLINK-6442
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> Currently in TableAPI  there’s only registration method for source table,  
> when we use SQL writing a streaming job, we should add additional part for 
> the sink, like TableAPI does:
> {code}
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> val t = StreamTestData.getSmall3TupleDataStream(env)
> tEnv.registerDataStream("MyTable", t)
> // one way: invoke tableAPI’s writeToSink method directly
> val result = tEnv.sql(sqlQuery)
> result.writeToSink(new YourStreamSink)
> // another way: convert to datastream first and then invoke addSink 
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
> result.addSink(new StreamITCase.StringSink)
> {code}
> From the api we can see the sink table always be a derived table because its 
> 'schema' is inferred from the result type of upstream query.
> Compare to traditional RDBMS which support DML syntax, a query with a target 
> output could be written like this:
> {code}
> insert into table target_table_name
> [(column_name [ ,...n ])]
> query
> {code}
> The equivalent form of the example above is as follows:
> {code}
> tEnv.registerTableSink("targetTable", new YourSink)
> val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
> val result = tEnv.sql(sql)
> {code}
> It is supported by Calcite’s grammar: 
> {code}
>  insert:( INSERT | UPSERT ) INTO tablePrimary
>  [ '(' column [, column ]* ')' ]
>  query
> {code}
> I'd like to extend Flink TableAPI to support such feature.  see design doc: 
> https://goo.gl/n3phK5



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135244#comment-16135244
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4569
  
@tillrohrmann @kl0u Would be great if you could take a look.


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4569: [FLINK-7040] [REST] Add basics for REST communication

2017-08-21 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4569
  
@tillrohrmann @kl0u Would be great if you could take a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135238#comment-16135238
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/4569

[FLINK-7040] [REST] Add basics for REST communication

## What is the purpose of the change

This PR implements the fundamentals for generic Client/Server REST 
communication that will be used for the client/cluster communication, the 
WebRuntimeMonitor and queryable state.

 Endpoints

`Endpoints` are the main runtime component that have to be started before 
any communication can happen. Their primary purpose is setting up the 
underlying netty stack.

The `RestClientEndpoint` is a fully-functional class that provides an 
asynchronous API for sending requests/receiving responses based around 
`CompleteFutures`. Requests are sent in a synchronous fashion; a new request is 
only sent out after a response was received for the previous request (for 
simplicity).

The `RestServerEndpoint` is an abstract class that is very similar to the 
`WebRuntimeMonitor`. Implementations have to implement a single method 
`abstract Collection> 
initializeHandlers();` that returns a collection of handlers that should be 
registered.

 Messages

To send a request the client accepts 3 arguments:
* a `MessageHeaders`
* a `RequestBody`
* a `ParameterMapper`

`RequestBodies` represent the http message body, and will be converted to 
JSON using jackson-databind.

`ParameterMappers` are used to assemble the final url, including query and 
path parameters

`MessageHeaders` are stateless objects that define a link between requests 
and responses as well as provide meta-data about the communication for this 
particular pair. Headers have generic type arguments for requests and responses 
and provide some level of type-safeness; if the client and server use the same 
headers class the request/response type, url, http status codes etc. are all 
well-defined.
Essentially, headers provide a tight coupling between handlers, clients, 
requests and responses (provided that implementations don't define arbitrary 
headers on the fly).

For example, to define the communication for submitting a job one would 
define a `JobSubmitRequestBody` that contains the serialized job graph, a 
`JobSubmitResponseBody` containing the URL to track the job status and a 
`JobSubmitHeaders` class that define the HTTP method (POST), the url (e.g 
`/submit`, the response http status code (ACCEPTED).

 Handlers

Someone has to deal with requests and send out responses, which is where 
the `AbstractRestHandler`  comes into play. This is an abstract class that 
manages all the netty stuff, and, just like the `MessageHeaders`, has generic 
type arguments for a `RequestBody` and `ResponseBody`. Only a single method 
must be implemented, which accepts a request and returns a response: `abstract 
CompletableFuture handleRequest(@Nonnull HandlerRequest 
request);`

A `HandlerRequest` contains a `RequestBody` and maps for path/query 
parameters.
A `HandlerResponse` contains either a `ResponseBody`, or an `ErrorResponse`.

## Brief change log

  - Add `MessageHeaders`, `Request-/ResponseBody` for modeling messages
  - Add `AbstractRestHandler`, `HandlerRequest/-Response` for message 
processing
  - Add `RestClient-/-ServerEndpoint` that setup netty

## Verifying this change

This change added tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink rest_client_final

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4569.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4569


commit 6085639ad5438228ff56d66a4988cf52cbe850b2
Author: zentol 
Date:   2017-08-16T13:17:45Z

[FLINK-7040] [rest] Add basics for REST communication

commit 

[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-21 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/4569

[FLINK-7040] [REST] Add basics for REST communication

## What is the purpose of the change

This PR implements the fundamentals for generic Client/Server REST 
communication that will be used for the client/cluster communication, the 
WebRuntimeMonitor and queryable state.

 Endpoints

`Endpoints` are the main runtime component that have to be started before 
any communication can happen. Their primary purpose is setting up the 
underlying netty stack.

The `RestClientEndpoint` is a fully-functional class that provides an 
asynchronous API for sending requests/receiving responses based around 
`CompleteFutures`. Requests are sent in a synchronous fashion; a new request is 
only sent out after a response was received for the previous request (for 
simplicity).

The `RestServerEndpoint` is an abstract class that is very similar to the 
`WebRuntimeMonitor`. Implementations have to implement a single method 
`abstract Collection> 
initializeHandlers();` that returns a collection of handlers that should be 
registered.

 Messages

To send a request the client accepts 3 arguments:
* a `MessageHeaders`
* a `RequestBody`
* a `ParameterMapper`

`RequestBodies` represent the http message body, and will be converted to 
JSON using jackson-databind.

`ParameterMappers` are used to assemble the final url, including query and 
path parameters

`MessageHeaders` are stateless objects that define a link between requests 
and responses as well as provide meta-data about the communication for this 
particular pair. Headers have generic type arguments for requests and responses 
and provide some level of type-safeness; if the client and server use the same 
headers class the request/response type, url, http status codes etc. are all 
well-defined.
Essentially, headers provide a tight coupling between handlers, clients, 
requests and responses (provided that implementations don't define arbitrary 
headers on the fly).

For example, to define the communication for submitting a job one would 
define a `JobSubmitRequestBody` that contains the serialized job graph, a 
`JobSubmitResponseBody` containing the URL to track the job status and a 
`JobSubmitHeaders` class that define the HTTP method (POST), the url (e.g 
`/submit`, the response http status code (ACCEPTED).

 Handlers

Someone has to deal with requests and send out responses, which is where 
the `AbstractRestHandler`  comes into play. This is an abstract class that 
manages all the netty stuff, and, just like the `MessageHeaders`, has generic 
type arguments for a `RequestBody` and `ResponseBody`. Only a single method 
must be implemented, which accepts a request and returns a response: `abstract 
CompletableFuture handleRequest(@Nonnull HandlerRequest 
request);`

A `HandlerRequest` contains a `RequestBody` and maps for path/query 
parameters.
A `HandlerResponse` contains either a `ResponseBody`, or an `ErrorResponse`.

## Brief change log

  - Add `MessageHeaders`, `Request-/ResponseBody` for modeling messages
  - Add `AbstractRestHandler`, `HandlerRequest/-Response` for message 
processing
  - Add `RestClient-/-ServerEndpoint` that setup netty

## Verifying this change

This change added tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink rest_client_final

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4569.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4569


commit 6085639ad5438228ff56d66a4988cf52cbe850b2
Author: zentol 
Date:   2017-08-16T13:17:45Z

[FLINK-7040] [rest] Add basics for REST communication

commit af95d0729595d1c707ef3041b8fbdbc21c0d0d4a
Author: zentol 
Date:   2017-08-21T09:53:13Z

Add better error message for get requests with a body

commit 5a7a8c2877f2b25cc6281462007a5ff03de40781
Author: zentol 
Date:   

[jira] [Created] (FLINK-7486) flink-mesos: Support for adding unique attribute / group_by attribute constraints

2017-08-21 Thread Bhumika Bayani (JIRA)
Bhumika Bayani created FLINK-7486:
-

 Summary: flink-mesos: Support for adding unique attribute / 
group_by attribute constraints
 Key: FLINK-7486
 URL: https://issues.apache.org/jira/browse/FLINK-7486
 Project: Flink
  Issue Type: Improvement
  Components: Mesos
Affects Versions: 1.3.2
Reporter: Bhumika Bayani


In our setup, we have multiple mesos-workers. Inspite of this, flink 
application master most of the times ends up spawning all task-managers on same 
mesos-worker.

We intend to ensure HA of task managers. We would like to make sure each 
task-manager is running on different mesos-worker as well as such mesos-worker 
which does not share the AZ attribute with earlier task manager instances.

Netflix-fenzo supports adding UniqueHostAttribute and BalancedHostAttribute 
contraints. Flink-mesos should also enable us to add these kind of constraints.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7483) BlobCache cleanup timer not reset after job re-registration

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135209#comment-16135209
 ] 

ASF GitHub Bot commented on FLINK-7483:
---

Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/4568
  
lgtm


> BlobCache cleanup timer not reset after job re-registration
> ---
>
> Key: FLINK-7483
> URL: https://issues.apache.org/jira/browse/FLINK-7483
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Since FLINK-7057, the blob cache handles cleanup via {{registerJob}} and 
> {{releaseJob}} calls where the latter sets a cleanup interval. 
> {{registerJob}}, however, forgets to reset this if the job is re-registered 
> again and so the job's blobs will be cleaned up although it is still used!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4568: [FLINK-7483][blob] prevent cleanup of re-registered jobs

2017-08-21 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/4568
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-1180) Add support for Hadoop MapReduce.* API Mappers and Reducers

2017-08-21 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-1180.
---
Resolution: Won't Do

> Add support for Hadoop MapReduce.* API Mappers and Reducers
> ---
>
> Key: FLINK-1180
> URL: https://issues.apache.org/jira/browse/FLINK-1180
> Project: Flink
>  Issue Type: Task
>  Components: DataSet API
>Affects Versions: 0.7.0-incubating
>Reporter: Mohitdeep Singh
>Assignee: Mohitdeep Singh
>Priority: Minor
>  Labels: hadoop
>
> Flink currently supports hadoop mapred mapper and reduce function but not via 
> mapreduce api. 
> Reference: email exchange on flink mailing list.
> "...Another option would be to extend the Hadoop Compatibility Layer. Right 
> now, we have wrappers for Hadoop's mapred-API function (Mapper, Reducer), but 
> not for the mapreduce-API functions [2]. Having wrappers for mapreduce-API 
> functions would also be cool. There is no JIRA for this issue yet. "



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-838) Implement full Hadoop Compatibility for Apache Flink

2017-08-21 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-838.
--
Resolution: Won't Do

> Implement full Hadoop Compatibility for Apache Flink
> 
>
> Key: FLINK-838
> URL: https://issues.apache.org/jira/browse/FLINK-838
> Project: Flink
>  Issue Type: Improvement
>  Components: Batch Connectors and Input/Output Formats, DataSet API
>Reporter: GitHub Import
>Assignee: Artem Tsikiridis
>  Labels: github-import
>
> This is a meta issue for tracking @atsikiridis progress with implementing a 
> full Hadoop Compatibliltiy Layer for Stratosphere.
> Some documentation can be found in the Wiki: 
> https://github.com/stratosphere/stratosphere/wiki/%5BGSoC-14%5D-A-Hadoop-abstraction-layer-for-Stratosphere-(Project-Map-and-Notes)
> As well as the project proposal: 
> https://github.com/stratosphere/stratosphere/wiki/GSoC-2014-Project-Proposal-Draft-by-Artem-Tsikiridis
> Most importantly, there is the following **schedule**:
> *19 May - 27 June (Midterm)*
> 1) Work on the Hadoop tasks, their Context and the mapping of Hadoop's 
> Configuration to the one of Stratosphere. By successfully bridging the Hadoop 
> tasks with Stratosphere, we already cover the most basic Hadoop Jobs. This 
> can be determined by running some popular Hadoop examples on Stratosphere 
> (e.g. WordCount, k-means, join) (4 - 5 weeks)
> 2) Understand how the running of these jobs works (e.g. command line 
> interface) for the wrapper. Implement how will the user run them. (1 - 2 
> weeks).
> *27 June - 11 August*
> 1) Continue wrapping more "advanced" Hadoop Interfaces (Comparators, 
> Partitioners, Distributed Cache etc.) There are quite a few interfaces and it 
> will be a challenge to support all of them. (5 full weeks)
> 2) Profiling of the application and optimizations (if applicable)
> *11 August - 18 August*
> Write documentation on code, write a README with care and add more 
> unit-tests. (1 week)
>  Imported from GitHub 
> Url: https://github.com/stratosphere/stratosphere/issues/838
> Created by: [rmetzger|https://github.com/rmetzger]
> Labels: core, enhancement, parent-for-major-feature, 
> Milestone: Release 0.7 (unplanned)
> Created at: Tue May 20 10:11:34 CEST 2014
> State: open



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7477) Use "hadoop classpath" to augment classpath when available

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135189#comment-16135189
 ] 

ASF GitHub Bot commented on FLINK-7477:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/4566


> Use "hadoop classpath" to augment classpath when available
> --
>
> Key: FLINK-7477
> URL: https://issues.apache.org/jira/browse/FLINK-7477
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Currently, some cloud environments don't properly put the Hadoop jars into 
> {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should 
> check in {{config.sh}} if the {{hadoop}} binary is on the path and augment 
> our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in 
> our scripts.
> This will improve the out-of-box experience of users that otherwise have to 
> manually set {{HADOOP_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4566: [FLINK-7477] [FLINK-7480] Various improvements to ...

2017-08-21 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/4566


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-7480) Set HADOOP_CONF_DIR to sane default if not set

2017-08-21 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7480.
---
Resolution: Fixed

Implemented in a3143bcb0dd2895025bd6f693f4240604a5f1840

> Set HADOOP_CONF_DIR to sane default if not set
> --
>
> Key: FLINK-7480
> URL: https://issues.apache.org/jira/browse/FLINK-7480
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Currently, both AWS and GCE don't have a {{HADOOP_CONF_DIR}} set by default. 
> This makes the out-of-box experience on these cloud environments bad because 
> not setting it results in errors that are not obviously clear.
> In case {{HADOOP_CONF_DIR}} is not set we should check if 
> {{/etc/hadoop/conf}} exits and set {{HADOOP_CONF_DIR}} to that.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7477) Use "hadoop classpath" to augment classpath when available

2017-08-21 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7477.
---
Resolution: Fixed

Implemented in 0a0f6ed6c3d6cff702e4322293340274bea5e7d9

> Use "hadoop classpath" to augment classpath when available
> --
>
> Key: FLINK-7477
> URL: https://issues.apache.org/jira/browse/FLINK-7477
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Currently, some cloud environments don't properly put the Hadoop jars into 
> {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should 
> check in {{config.sh}} if the {{hadoop}} binary is on the path and augment 
> our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in 
> our scripts.
> This will improve the out-of-box experience of users that otherwise have to 
> manually set {{HADOOP_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7300) End-to-end tests are instable on Travis

2017-08-21 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135176#comment-16135176
 ] 

Aljoscha Krettek commented on FLINK-7300:
-

I think you're right, we're logging exceptions that don't indicate a real 
problem. Changing this looks like a bigger task, though, and doesn't help with 
the immediate problem of the unstable end-to-end test.

> End-to-end tests are instable on Travis
> ---
>
> Key: FLINK-7300
> URL: https://issues.apache.org/jira/browse/FLINK-7300
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> It seems like the end-to-end tests are instable, causing the {{misc}} build 
> profile to sporadically fail.
> Incorrect matched output:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/258569408/log.txt?X-Amz-Expires=30=20170731T060526Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request=host=4ef9ff5e60fe06db53a84be8d73775a46cb595a8caeb806b05dbbf824d3b69e8
> Another failure example of a different cause then the above, also on the 
> end-to-end tests:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/258841693/log.txt?X-Amz-Expires=30=20170731T060007Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request=host=4a106b3990228b7628c250cc15407bc2c131c8332e1a94ad68d649fe8d32d726



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7484) com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5

2017-08-21 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7484:

Component/s: CEP

> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
> Index: 7, Size: 5
> ---
>
> Key: FLINK-7484
> URL: https://issues.apache.org/jira/browse/FLINK-7484
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, DataStream API, Scala API
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2 , Yarn Cluster, FsStateBackend
>Reporter: Shashank Agarwal
>
> I am using many CEP's and Global Window. I am getting following error 
> sometimes and application  crashes. I have checked logically there's no flow 
> in the program. Here ItemPojo is a Pojo class and we are using 
> java.utill.List[ItemPojo]. We are using Scala DataStream API please find 
> attached logs.
> {code}
> 2017-08-17 10:04:12,814 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - TriggerWindow(GlobalWindows(), 
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
>  co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 
> WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: 
> Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1) switched 
> from RUNNING to FAILED.
> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
> Index: 7, Size: 5
> Serialization trace:
> category (co.thirdwatch.pojo.ItemPojo)
> underlying (scala.collection.convert.Wrappers$SeqWrapper)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
>   at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:190)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
>   at 
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:34)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:77)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5
>   at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>   at java.util.ArrayList.get(ArrayList.java:429)
>   at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>   at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>   ... 22 more
> 2017-08-17 10:04:12,816 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Freeing task resources for TriggerWindow(GlobalWindows(), 
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
>  co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 
> WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: 
> Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1).
> 2017-08-17 10:04:12,816 INFO  

[jira] [Commented] (FLINK-7485) Using DataView interface to improve (MIN/MAX)WithRetractAggFunction.

2017-08-21 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135172#comment-16135172
 ] 

Jark Wu commented on FLINK-7485:


Hi [~sunjincheng121]  this issue is duplicated with FLINK-7208, so I closed it. 

> Using DataView interface to improve (MIN/MAX)WithRetractAggFunction.
> 
>
> Key: FLINK-7485
> URL: https://issues.apache.org/jira/browse/FLINK-7485
> Project: Flink
>  Issue Type: Improvement
>Reporter: sunjincheng
>
> Currently MIN/MAX using memory structure {{HashMap}}  to store all values, 
> after FLINK-7206 we can improve them by using {{DataView}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135159#comment-16135159
 ] 

ASF GitHub Bot commented on FLINK-6442:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134186937
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
+  "WITH, ORDER_BY, EXPLICIT_TABLE")
+}
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+*/
+  def sqlUpdate(sql: String): Unit = {
+sqlUpdate(sql, QueryConfig.getQueryConfigFromTableEnv(this))
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table 

[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135158#comment-16135158
 ] 

ASF GitHub Bot commented on FLINK-6442:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134228841
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CsvSQLTableSink.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.table.sinks.CsvTableSink
+import org.apache.flink.types.Row
+
+class CsvSQLTableSink(
--- End diff --

Yes, I should remove this class since I'm not sure wether to add it as a 
built-in sink table that schema can be declared.


> Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in 
> SQL
> ---
>
> Key: FLINK-6442
> URL: https://issues.apache.org/jira/browse/FLINK-6442
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> Currently in TableAPI  there’s only registration method for source table,  
> when we use SQL writing a streaming job, we should add additional part for 
> the sink, like TableAPI does:
> {code}
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> val t = StreamTestData.getSmall3TupleDataStream(env)
> tEnv.registerDataStream("MyTable", t)
> // one way: invoke tableAPI’s writeToSink method directly
> val result = tEnv.sql(sqlQuery)
> result.writeToSink(new YourStreamSink)
> // another way: convert to datastream first and then invoke addSink 
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
> result.addSink(new StreamITCase.StringSink)
> {code}
> From the api we can see the sink table always be a derived table because its 
> 'schema' is inferred from the result type of upstream query.
> Compare to traditional RDBMS which support DML syntax, a query with a target 
> output could be written like this:
> {code}
> insert into table target_table_name
> [(column_name [ ,...n ])]
> query
> {code}
> The equivalent form of the example above is as follows:
> {code}
> tEnv.registerTableSink("targetTable", new YourSink)
> val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
> val result = tEnv.sql(sql)
> {code}
> It is supported by Calcite’s grammar: 
> {code}
>  insert:( INSERT | UPSERT ) INTO tablePrimary
>  [ '(' column [, column ]* ')' ]
>  query
> {code}
> I'd like to extend Flink TableAPI to support such feature.  see design doc: 
> https://goo.gl/n3phK5



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135156#comment-16135156
 ] 

ASF GitHub Bot commented on FLINK-6442:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134186403
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
--- End diff --

Not only INSERT but also UPDATE/DELETE ...
It'll reject all kinds of sql except SELECT, EXCEPT, INTERSECT, UNION, 
VALUES, ORDER_BY, EXPLICIT_TABLE.



> Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in 
> SQL
> ---
>
> Key: FLINK-6442
> URL: https://issues.apache.org/jira/browse/FLINK-6442
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> Currently in TableAPI  there’s only registration method for source table,  
> when we use SQL writing a streaming job, we should add additional part for 
> the sink, like TableAPI does:
> {code}
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> val t = StreamTestData.getSmall3TupleDataStream(env)
> tEnv.registerDataStream("MyTable", t)
> // one way: invoke tableAPI’s writeToSink method directly
> val result = tEnv.sql(sqlQuery)
> result.writeToSink(new YourStreamSink)
> // another way: convert to datastream first and then invoke addSink 
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
> result.addSink(new StreamITCase.StringSink)
> {code}
> From the api we can see the sink table always be a derived table because its 
> 'schema' is inferred from the result type of upstream query.
> Compare to traditional RDBMS which support DML syntax, a query with a target 
> output could be written like this:
> {code}
> insert into table target_table_name
> [(column_name [ ,...n ])]
> query
> {code}
> The equivalent form of the example above is as follows:
> {code}
> tEnv.registerTableSink("targetTable", new YourSink)
> val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
> val result = tEnv.sql(sql)
> {code}
> It is supported by Calcite’s grammar: 
> {code}
>  insert:( INSERT | UPSERT ) INTO tablePrimary
>  [ '(' column [, column ]* ')' ]
>  query
> {code}
> I'd like to extend Flink TableAPI to support such feature.  see design doc: 
> 

[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135155#comment-16135155
 ] 

ASF GitHub Bot commented on FLINK-6442:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134185829
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
--- End diff --

sounds reasonable.


> Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in 
> SQL
> ---
>
> Key: FLINK-6442
> URL: https://issues.apache.org/jira/browse/FLINK-6442
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> Currently in TableAPI  there’s only registration method for source table,  
> when we use SQL writing a streaming job, we should add additional part for 
> the sink, like TableAPI does:
> {code}
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> val t = StreamTestData.getSmall3TupleDataStream(env)
> tEnv.registerDataStream("MyTable", t)
> // one way: invoke tableAPI’s writeToSink method directly
> val result = tEnv.sql(sqlQuery)
> result.writeToSink(new YourStreamSink)
> // another way: convert to datastream first and then invoke addSink 
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
> result.addSink(new StreamITCase.StringSink)
> {code}
> From the api we can see the sink table always be a derived table because its 
> 'schema' is inferred from the result type of upstream query.
> Compare to traditional RDBMS which support DML syntax, a query with a target 
> output could be written like this:
> {code}
> insert into table target_table_name
> [(column_name [ ,...n ])]
> query
> {code}
> The equivalent form of the example above is as follows:
> {code}
> tEnv.registerTableSink("targetTable", new YourSink)
> val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
> val result = tEnv.sql(sql)
> {code}
> It is supported by Calcite’s grammar: 
> {code}
>  insert:( INSERT | UPSERT ) INTO tablePrimary
>  [ '(' column [, column ]* ')' ]
>  query
> {code}
> I'd like to extend Flink TableAPI to support such feature.  see design doc: 
> https://goo.gl/n3phK5



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135157#comment-16135157
 ] 

ASF GitHub Bot commented on FLINK-6442:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134184973
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -132,6 +132,44 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] in this [[TableEnvironment]]'s 
catalog.
+* Registered sink tables can be referenced in SQL DML clause.
+*
+* Examples:
+*
+* - predefine a table sink with schema
+* {{{
+*   val fieldTypes: Array[TypeInformation[_]]  = Array( #TODO )
+*   val fieldNames: Array[String]  = Array("a", "b", "c")
+*   val tableSink: TableSink = new YourTableSinkImpl(fieldTypes, 
Option(fieldNames))
+* }}}
+*
+* -  register an alias for this table sink to catalog
+* {{{
+*   tableEnv.registerTableSink("example_sink_table", tableSink)
+* }}}
+*
+* -  use the registered sink in SQL directly
+* {{{
+*   tableEnv.sqlInsert("INSERT INTO example_sink_table SELECT a, b, c 
FROM sourceTable")
+* }}}
+*
+* @param name The name under which the [[TableSink]] is registered.
+* @param tableSink The [[TableSink]] to register.
+*/
+  override def registerTableSink(name: String, tableSink: TableSink[_]): 
Unit = {
+checkValidTableName(name)
+
+tableSink match {
+  case t @ (_: AppendStreamTableSink[_] | _: UpsertStreamTableSink[_] |
+_: RetractStreamTableSink[_]) =>
+registerTableInternal(name, new TableSinkTable(t))
+  case _ =>
+throw new TableException("BatchTableSink can not be registered in 
StreamTableEnvironment")
--- End diff --

make sense to me.


> Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in 
> SQL
> ---
>
> Key: FLINK-6442
> URL: https://issues.apache.org/jira/browse/FLINK-6442
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> Currently in TableAPI  there’s only registration method for source table,  
> when we use SQL writing a streaming job, we should add additional part for 
> the sink, like TableAPI does:
> {code}
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> val t = StreamTestData.getSmall3TupleDataStream(env)
> tEnv.registerDataStream("MyTable", t)
> // one way: invoke tableAPI’s writeToSink method directly
> val result = tEnv.sql(sqlQuery)
> result.writeToSink(new YourStreamSink)
> // another way: convert to datastream first and then invoke addSink 
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
> result.addSink(new StreamITCase.StringSink)
> {code}
> From the api we can see the sink table always be a derived table because its 
> 'schema' is inferred from the result type of upstream query.
> Compare to traditional RDBMS which support DML syntax, a query with a target 
> output could be written like this:
> {code}
> insert into table target_table_name
> [(column_name [ ,...n ])]
> query
> {code}
> The equivalent form of the example above is as follows:
> {code}
> tEnv.registerTableSink("targetTable", new YourSink)
> val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
> val result = tEnv.sql(sql)
> {code}
> It is supported by Calcite’s grammar: 
> {code}
>  insert:( INSERT | UPSERT ) INTO tablePrimary
>  [ '(' column [, column ]* ')' ]
>  query
> {code}
> I'd like to extend Flink TableAPI to support such feature.  see design doc: 
> https://goo.gl/n3phK5



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134185829
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
--- End diff --

sounds reasonable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134186403
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
--- End diff --

Not only INSERT but also UPDATE/DELETE ...
It'll reject all kinds of sql except SELECT, EXCEPT, INTERSECT, UNION, 
VALUES, ORDER_BY, EXPLICIT_TABLE.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134186937
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
+  "WITH, ORDER_BY, EXPLICIT_TABLE")
+}
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+*/
+  def sqlUpdate(sql: String): Unit = {
+sqlUpdate(sql, QueryConfig.getQueryConfigFromTableEnv(this))
+  }
+
+  /**
+* Evaluates a SQL statement which must be an SQL Data Manipulation 
Language (DML) statement,
+* such as INSERT, UPDATE or DELETE; or an SQL statement that returns 
nothing, such as a DDL
+* statement;
+* Currently only support a SQL INSERT statement on registered tables 
and has no return value.
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   /// register table sink for insertion
+*   tEnv.registerTableSink("target_table", ...
+*   val sourceTable: Table = ...
+*   // sourceTable is not registered to the table environment
+*   tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM 
$sourceTable")
+* }}}
+*
+* @param sql The SQL String to evaluate.
+* @param config The [[QueryConfig]] to use.
+*/
+  def sqlUpdate(sql: String, config: QueryConfig): Unit = {

[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134184973
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -132,6 +132,44 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+* Registers an external [[TableSink]] in this [[TableEnvironment]]'s 
catalog.
+* Registered sink tables can be referenced in SQL DML clause.
+*
+* Examples:
+*
+* - predefine a table sink with schema
+* {{{
+*   val fieldTypes: Array[TypeInformation[_]]  = Array( #TODO )
+*   val fieldNames: Array[String]  = Array("a", "b", "c")
+*   val tableSink: TableSink = new YourTableSinkImpl(fieldTypes, 
Option(fieldNames))
+* }}}
+*
+* -  register an alias for this table sink to catalog
+* {{{
+*   tableEnv.registerTableSink("example_sink_table", tableSink)
+* }}}
+*
+* -  use the registered sink in SQL directly
+* {{{
+*   tableEnv.sqlInsert("INSERT INTO example_sink_table SELECT a, b, c 
FROM sourceTable")
+* }}}
+*
+* @param name The name under which the [[TableSink]] is registered.
+* @param tableSink The [[TableSink]] to register.
+*/
+  override def registerTableSink(name: String, tableSink: TableSink[_]): 
Unit = {
+checkValidTableName(name)
+
+tableSink match {
+  case t @ (_: AppendStreamTableSink[_] | _: UpsertStreamTableSink[_] |
+_: RetractStreamTableSink[_]) =>
+registerTableInternal(name, new TableSinkTable(t))
+  case _ =>
+throw new TableException("BatchTableSink can not be registered in 
StreamTableEnvironment")
--- End diff --

make sense to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134228841
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CsvSQLTableSink.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.table.sinks.CsvTableSink
+import org.apache.flink.types.Row
+
+class CsvSQLTableSink(
--- End diff --

Yes, I should remove this class since I'm not sure wether to add it as a 
built-in sink table that schema can be declared.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135152#comment-16135152
 ] 

ASF GitHub Bot commented on FLINK-6442:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134185390
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
--- End diff --

yes, the replacement should be mentioned here.


> Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in 
> SQL
> ---
>
> Key: FLINK-6442
> URL: https://issues.apache.org/jira/browse/FLINK-6442
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> Currently in TableAPI  there’s only registration method for source table,  
> when we use SQL writing a streaming job, we should add additional part for 
> the sink, like TableAPI does:
> {code}
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> val t = StreamTestData.getSmall3TupleDataStream(env)
> tEnv.registerDataStream("MyTable", t)
> // one way: invoke tableAPI’s writeToSink method directly
> val result = tEnv.sql(sqlQuery)
> result.writeToSink(new YourStreamSink)
> // another way: convert to datastream first and then invoke addSink 
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
> result.addSink(new StreamITCase.StringSink)
> {code}
> From the api we can see the sink table always be a derived table because its 
> 'schema' is inferred from the result type of upstream query.
> Compare to traditional RDBMS which support DML syntax, a query with a target 
> output could be written like this:
> {code}
> insert into table target_table_name
> [(column_name [ ,...n ])]
> query
> {code}
> The equivalent form of the example above is as follows:
> {code}
> tEnv.registerTableSink("targetTable", new YourSink)
> val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
> val result = tEnv.sql(sql)
> {code}
> It is supported by Calcite’s grammar: 
> {code}
>  insert:( INSERT | UPSERT ) INTO tablePrimary
>  [ '(' column [, column ]* ')' ]
>  query
> {code}
> I'd like to extend Flink TableAPI to support such feature.  see design doc: 
> https://goo.gl/n3phK5



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-21 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134185390
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
--- End diff --

yes, the replacement should be mentioned here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7483) BlobCache cleanup timer not reset after job re-registration

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135141#comment-16135141
 ] 

ASF GitHub Bot commented on FLINK-7483:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4568
  
FYI: the test instability of `JobManagerCleanupITCase` will be fixed by 
#4358, the next BLOB-PR in line - I don't want to mess up the following PRs 
anymore (again) by integrating it separately or here


> BlobCache cleanup timer not reset after job re-registration
> ---
>
> Key: FLINK-7483
> URL: https://issues.apache.org/jira/browse/FLINK-7483
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Since FLINK-7057, the blob cache handles cleanup via {{registerJob}} and 
> {{releaseJob}} calls where the latter sets a cleanup interval. 
> {{registerJob}}, however, forgets to reset this if the job is re-registered 
> again and so the job's blobs will be cleaned up although it is still used!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4568: [FLINK-7483][blob] prevent cleanup of re-registered jobs

2017-08-21 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4568
  
FYI: the test instability of `JobManagerCleanupITCase` will be fixed by 
#4358, the next BLOB-PR in line - I don't want to mess up the following PRs 
anymore (again) by integrating it separately or here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7439) Support variable arguments for UDTF in SQL

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135132#comment-16135132
 ] 

ASF GitHub Bot commented on FLINK-7439:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4536
  
@sunjincheng121  @fhueske  I would be great if you can have a look ;-)


> Support variable arguments for UDTF in SQL
> --
>
> Key: FLINK-7439
> URL: https://issues.apache.org/jira/browse/FLINK-7439
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently, both UDF and UDAF support variable parameters, but UDTF not. 
> FLINK-5882 supports variable UDTF for Table API only, but missed SQL.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4536: [FLINK-7439] [table] Support variable arguments for UDTF ...

2017-08-21 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4536
  
@sunjincheng121  @fhueske  I would be great if you can have a look ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-7485) Using DataView interface to improve (MIN/MAX)WithRetractAggFunction.

2017-08-21 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-7485.
--
Resolution: Duplicate

> Using DataView interface to improve (MIN/MAX)WithRetractAggFunction.
> 
>
> Key: FLINK-7485
> URL: https://issues.apache.org/jira/browse/FLINK-7485
> Project: Flink
>  Issue Type: Improvement
>Reporter: sunjincheng
>
> Currently MIN/MAX using memory structure {{HashMap}}  to store all values, 
> after FLINK-7206 we can improve them by using {{DataView}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7484) com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5

2017-08-21 Thread Shashank Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135101#comment-16135101
 ] 

Shashank Agarwal commented on FLINK-7484:
-

Hi [~yew1eb]
It's very big code base, With lot of business logics. But i can give you 
overview and scenario. We are using scala streams. But due to some cassandra 
sink issue we convert scala stream to Java stream. So we convert internal 
objects to java object also.

As per logs i guess this is the scenario.  We use a ItemPojo class. 


{code:java}
@SerialVersionUID(224567L)
@UDT(keyspace = "cstable", name = "item")
case class ItemPojo(
  @BeanProperty var item_id: String,
  @BeanProperty var product_title: String,
  @BeanProperty var price: String
 ) extends Serializable
{
  def this() {
this(null, null, null)
  }
}
{code}

In a stream object we use java.util.List[ItemPojo] , It's not creating any 
issue till now we were using lot of CEP's and we were using in global window 
also. But after some time due to some need we have iterate over that list in 
global window. Than we are getting this error some time and application got 
crashed. 


{code:java}
for (cItem <- cItemList) {
some logic here.
}
{code}

I think may be this is issue cause i am getting error after these. 



> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
> Index: 7, Size: 5
> ---
>
> Key: FLINK-7484
> URL: https://issues.apache.org/jira/browse/FLINK-7484
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Scala API
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2 , Yarn Cluster, FsStateBackend
>Reporter: Shashank Agarwal
>
> I am using many CEP's and Global Window. I am getting following error 
> sometimes and application  crashes. I have checked logically there's no flow 
> in the program. Here ItemPojo is a Pojo class and we are using 
> java.utill.List[ItemPojo]. We are using Scala DataStream API please find 
> attached logs.
> {code}
> 2017-08-17 10:04:12,814 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - TriggerWindow(GlobalWindows(), 
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
>  co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 
> WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: 
> Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1) switched 
> from RUNNING to FAILED.
> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
> Index: 7, Size: 5
> Serialization trace:
> category (co.thirdwatch.pojo.ItemPojo)
> underlying (scala.collection.convert.Wrappers$SeqWrapper)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
>   at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:190)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
>   at 
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:34)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:77)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> 

[jira] [Commented] (FLINK-7438) Some classes are eclipsed by classes in package scala

2017-08-21 Thread Hai Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135084#comment-16135084
 ] 

Hai Zhou commented on FLINK-7438:
-

Hi [~aljoscha]
I would like to work on this issue.:D

> Some classes are eclipsed by classes in package scala
> -
>
> Key: FLINK-7438
> URL: https://issues.apache.org/jira/browse/FLINK-7438
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, DataStream API
>Reporter: Ted Yu
>Priority: Minor
>
> Noticed the following during compilation:
> {code}
> [WARNING] 
> /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33:
>  warning: imported `OutputTag' is permanently  hidden by definition of 
> object OutputTag in package scala
> [WARNING] import org.apache.flink.util.{Collector, OutputTag}
> [WARNING]  ^
> [WARNING] 
> /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33:
>  warning: imported `OutputTag' is permanently  hidden by definition of 
> class OutputTag in package scala
> [WARNING] import org.apache.flink.util.{Collector, OutputTag}
> {code}
> We should avoid the warning e.r.t. OutputTag.
> There may be other occurrences of similar warning.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7438) Some classes are eclipsed by classes in package scala

2017-08-21 Thread Hai Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135076#comment-16135076
 ] 

Hai Zhou edited comment on FLINK-7438 at 8/21/17 12:01 PM:
---

I think the reason for this problem is 
[https://issues.scala-lang.org/browse/SI-8808].
this problem:

{panel:title=org.apache.flink.streaming.api.scala.WindowedStream.scala}
package org.apache.flink.streaming.api.scala

import org.apache.flink.util.OutputTag  // permanently hidden warning here

class WindowedStream {

  @PublicEvolving
  def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {  

   //  this OutputTag class  is 
org.apache.flink.streaming.api.scala.OutputTag.scala
javaStream.sideOutputLateData(outputTag)   
this
  }
..
}
{panel}

{panel:title=org.apache.flink.streaming.api.scala.OutputTag.scala}
package org.apache.flink.streaming.api.scala

import org.apache.flink.util.{OutputTag => JOutputTag}

class OutputTag[T: TypeInformation](id: String) extends JOutputTag[T](id, 
implicitly[TypeInformation[T]])

object OutputTag {
  def apply[T: TypeInformation](id: String): OutputTag[T] = new OutputTag(id)
}
{panel}


Actually used "org.apache.flink.streaming.api.scala.OutputTag.scala" in 
WindowedStream class.
So we remove "import org.apache.flink.util.OutputTag" in WindowedStream class, 
the warnning will disappear.


was (Author: yew1eb):
I think the reason for this problem is 
[https://issues.scala-lang.org/browse/SI-8808].
this problem:

{panel:title=org.apache.flink.streaming.api.scala.WindowedStream.scala}
package org.apache.flink.streaming.api.scala

import org.apache.flink.util.OutputTag  // permanently hidden warning here

class WindowedStream {

  @PublicEvolving
  def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {  

   //  this OutputTag class  is 
org.apache.flink.streaming.api.scala.OutputTag.scala
javaStream.sideOutputLateData(outputTag)   
this
  }
..
}
{panel}

{panel:title=org.apache.flink.streaming.api.scala.OutputTag.scala}
package org.apache.flink.streaming.api.scala

import org.apache.flink.util.{OutputTag => JOutputTag}

class OutputTag[T: TypeInformation](id: String) extends JOutputTag[T](id, 
implicitly[TypeInformation[T]])

object OutputTag {
  def apply[T: TypeInformation](id: String): OutputTag[T] = new OutputTag(id)
}
{panel}

> Some classes are eclipsed by classes in package scala
> -
>
> Key: FLINK-7438
> URL: https://issues.apache.org/jira/browse/FLINK-7438
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, DataStream API
>Reporter: Ted Yu
>Priority: Minor
>
> Noticed the following during compilation:
> {code}
> [WARNING] 
> /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33:
>  warning: imported `OutputTag' is permanently  hidden by definition of 
> object OutputTag in package scala
> [WARNING] import org.apache.flink.util.{Collector, OutputTag}
> [WARNING]  ^
> [WARNING] 
> /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33:
>  warning: imported `OutputTag' is permanently  hidden by definition of 
> class OutputTag in package scala
> [WARNING] import org.apache.flink.util.{Collector, OutputTag}
> {code}
> We should avoid the warning e.r.t. OutputTag.
> There may be other occurrences of similar warning.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7438) Some classes are eclipsed by classes in package scala

2017-08-21 Thread Hai Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135076#comment-16135076
 ] 

Hai Zhou commented on FLINK-7438:
-

I think the reason for this problem is 
[https://issues.scala-lang.org/browse/SI-8808].
this problem:

{panel:title=org.apache.flink.streaming.api.scala.WindowedStream.scala}
package org.apache.flink.streaming.api.scala

import org.apache.flink.util.OutputTag  // permanently hidden warning here

class WindowedStream {

  @PublicEvolving
  def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {  

   //  this OutputTag class  is 
org.apache.flink.streaming.api.scala.OutputTag.scala
javaStream.sideOutputLateData(outputTag)   
this
  }
..
}
{panel}

{panel:title=org.apache.flink.streaming.api.scala.OutputTag.scala}
package org.apache.flink.streaming.api.scala

import org.apache.flink.util.{OutputTag => JOutputTag}

class OutputTag[T: TypeInformation](id: String) extends JOutputTag[T](id, 
implicitly[TypeInformation[T]])

object OutputTag {
  def apply[T: TypeInformation](id: String): OutputTag[T] = new OutputTag(id)
}
{panel}

> Some classes are eclipsed by classes in package scala
> -
>
> Key: FLINK-7438
> URL: https://issues.apache.org/jira/browse/FLINK-7438
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, DataStream API
>Reporter: Ted Yu
>Priority: Minor
>
> Noticed the following during compilation:
> {code}
> [WARNING] 
> /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33:
>  warning: imported `OutputTag' is permanently  hidden by definition of 
> object OutputTag in package scala
> [WARNING] import org.apache.flink.util.{Collector, OutputTag}
> [WARNING]  ^
> [WARNING] 
> /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33:
>  warning: imported `OutputTag' is permanently  hidden by definition of 
> class OutputTag in package scala
> [WARNING] import org.apache.flink.util.{Collector, OutputTag}
> {code}
> We should avoid the warning e.r.t. OutputTag.
> There may be other occurrences of similar warning.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4358: [FLINK-7068][blob] change BlobService sub-classes for per...

2017-08-21 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4358
  
sorry for the mess, but let me also drag in #4568 and adapt the code in 
here (which is moved from `BlobCache` to `PermanentBlobCache` by this PR)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135066#comment-16135066
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4358
  
sorry for the mess, but let me also drag in #4568 and adapt the code in 
here (which is moved from `BlobCache` to `PermanentBlobCache` by this PR)


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7485) Using DataView interface to improve (MIN/MAX)WithRetractAggFunction.

2017-08-21 Thread sunjincheng (JIRA)
sunjincheng created FLINK-7485:
--

 Summary: Using DataView interface to improve 
(MIN/MAX)WithRetractAggFunction.
 Key: FLINK-7485
 URL: https://issues.apache.org/jira/browse/FLINK-7485
 Project: Flink
  Issue Type: Improvement
Reporter: sunjincheng


Currently MIN/MAX using memory structure {{HashMap}}  to store all values, 
after FLINK-7206 we can improve them by using {{DataView}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7484) com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5

2017-08-21 Thread Hai Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135046#comment-16135046
 ] 

Hai Zhou commented on FLINK-7484:
-

Hi [~shashank734]
Can you upload your code?


> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
> Index: 7, Size: 5
> ---
>
> Key: FLINK-7484
> URL: https://issues.apache.org/jira/browse/FLINK-7484
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Scala API
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2 , Yarn Cluster, FsStateBackend
>Reporter: Shashank Agarwal
>
> I am using many CEP's and Global Window. I am getting following error 
> sometimes and application  crashes. I have checked logically there's no flow 
> in the program. Here ItemPojo is a Pojo class and we are using 
> java.utill.List[ItemPojo]. We are using Scala DataStream API please find 
> attached logs.
> {code}
> 2017-08-17 10:04:12,814 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - TriggerWindow(GlobalWindows(), 
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
>  co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 
> WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: 
> Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1) switched 
> from RUNNING to FAILED.
> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
> Index: 7, Size: 5
> Serialization trace:
> category (co.thirdwatch.pojo.ItemPojo)
> underlying (scala.collection.convert.Wrappers$SeqWrapper)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
>   at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:190)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
>   at 
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:34)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:77)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5
>   at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>   at java.util.ArrayList.get(ArrayList.java:429)
>   at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>   at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>   ... 22 more
> 2017-08-17 10:04:12,816 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Freeing task resources for TriggerWindow(GlobalWindows(), 
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
>  co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 
> WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: 
> Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1).
> 

[jira] [Created] (FLINK-7484) com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5

2017-08-21 Thread Shashank Agarwal (JIRA)
Shashank Agarwal created FLINK-7484:
---

 Summary: com.esotericsoftware.kryo.KryoException: 
java.lang.IndexOutOfBoundsException: Index: 7, Size: 5
 Key: FLINK-7484
 URL: https://issues.apache.org/jira/browse/FLINK-7484
 Project: Flink
  Issue Type: Bug
  Components: DataStream API, Scala API
Affects Versions: 1.3.2
 Environment: Flink 1.3.2 , Yarn Cluster, FsStateBackend
Reporter: Shashank Agarwal


I am using many CEP's and Global Window. I am getting following error sometimes 
and application  crashes. I have checked logically there's no flow in the 
program. Here ItemPojo is a Pojo class and we are using 
java.utill.List[ItemPojo]. We are using Scala DataStream API please find 
attached logs.


{code}
2017-08-17 10:04:12,814 INFO  org.apache.flink.runtime.taskmanager.Task 
- TriggerWindow(GlobalWindows(), 
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
 co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 
WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: 
Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1) switched from 
RUNNING to FAILED.
com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
Index: 7, Size: 5
Serialization trace:
category (co.thirdwatch.pojo.ItemPojo)
underlying (scala.collection.convert.Wrappers$SeqWrapper)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:190)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at 
org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
at 
org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:34)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
at 
org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:77)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
... 22 more
2017-08-17 10:04:12,816 INFO  org.apache.flink.runtime.taskmanager.Task 
- Freeing task resources for TriggerWindow(GlobalWindows(), 
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
 co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 
WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: 
Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1).
2017-08-17 10:04:12,816 INFO  org.apache.flink.runtime.taskmanager.Task 
- Ensuring all FileSystem streams are closed for task 
TriggerWindow(GlobalWindows(), 
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
 co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 

[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135030#comment-16135030
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
The check failed because the spotbugs plugin found something; this plugin 
isn't run by default when calling `mvn verify`. You can run the spotbugs 
locally by adding `-Dspotbugs` to the maven invocation.

The found problem is the PythonEnvironmentConfig class, which contains 
public static non-final fields. I propose making these non-static and 
explicitly pass around a config object where needed.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-08-21 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
The check failed because the spotbugs plugin found something; this plugin 
isn't run by default when calling `mvn verify`. You can run the spotbugs 
locally by adding `-Dspotbugs` to the maven invocation.

The found problem is the PythonEnvironmentConfig class, which contains 
public static non-final fields. I propose making these non-static and 
explicitly pass around a config object where needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135018#comment-16135018
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
I'm trying to track down the root cause for the checks failures without a 
success. Obviously, the given project (flink-libraries/flink-streaming-python) 
in master branch passes the `verify` with success in my environment.

Please advise,


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-08-21 Thread zohar-mizrahi
Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
I'm trying to track down the root cause for the checks failures without a 
success. Obviously, the given project (flink-libraries/flink-streaming-python) 
in master branch passes the `verify` with success in my environment.

Please advise,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4831) Implement a slf4j metric reporter

2017-08-21 Thread Sumit Sarin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135012#comment-16135012
 ] 

Sumit Sarin commented on FLINK-4831:


Hey Chesner,
After going through slf4j and log4j documentation and your past work in 
progress,
I believe that i do need to have an understanding of metrics in depth. So 
should i get acquainted with that or am i going in the wrong direction? I am 
currently unaware about counters, gauges, meters, histograms, etc.

> Implement a slf4j metric reporter
> -
>
> Key: FLINK-4831
> URL: https://issues.apache.org/jira/browse/FLINK-4831
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.2
>Reporter: Chesnay Schepler
>Assignee: Sumit Sarin
>Priority: Minor
>  Labels: easyfix, starter
>
> For debugging purpose it would be very useful to have a log4j metric 
> reporter. If you don't want to setup a metric backend you currently have to 
> rely on JMX, which a) works a bit differently than other reporters (for 
> example it doesn't extend AbstractReporter) and b) makes it a bit tricky to 
> analyze results as metrics are cleaned up once a job finishes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7483) BlobCache cleanup timer not reset after job re-registration

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135007#comment-16135007
 ] 

ASF GitHub Bot commented on FLINK-7483:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4568
  
 @tedyu this fixes the two issues you found in #4238. Could you have a 
quick look, also?


> BlobCache cleanup timer not reset after job re-registration
> ---
>
> Key: FLINK-7483
> URL: https://issues.apache.org/jira/browse/FLINK-7483
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Since FLINK-7057, the blob cache handles cleanup via {{registerJob}} and 
> {{releaseJob}} calls where the latter sets a cleanup interval. 
> {{registerJob}}, however, forgets to reset this if the job is re-registered 
> again and so the job's blobs will be cleaned up although it is still used!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4568: [FLINK-7483][blob] prevent cleanup of re-registered jobs

2017-08-21 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4568
  
 @tedyu this fixes the two issues you found in #4238. Could you have a 
quick look, also?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7483) BlobCache cleanup timer not reset after job re-registration

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135000#comment-16135000
 ] 

ASF GitHub Bot commented on FLINK-7483:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4568

[FLINK-7483][blob] prevent cleanup of re-registered jobs

## What is the purpose of the change

Since #4238, when a job is registered but was released before and the ref 
count hit `0`, its cleanup timeout was not reset. This fixes that and adds more 
reference-counter tests.

## Brief change log

- reset the cleanup timeout after job re-registration
- add a test for verifying the reference counters contain the expected 
values

## Verifying this change

This change added a test for verifying reference counter values: 
`BlobCacheCleanupTest#testJobReferences`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-6916-7057-hotfix2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4568.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4568


commit a0a5e2ebf999867462ed31257f30fd777f4fb5f4
Author: Nico Kruber 
Date:   2017-08-21T08:36:56Z

[FLINK-7483][blob] prevent cleanup of re-registered jobs

When a job is registered, it may have been released before and we thus need 
to
reset the cleanup timeout again.




> BlobCache cleanup timer not reset after job re-registration
> ---
>
> Key: FLINK-7483
> URL: https://issues.apache.org/jira/browse/FLINK-7483
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Since FLINK-7057, the blob cache handles cleanup via {{registerJob}} and 
> {{releaseJob}} calls where the latter sets a cleanup interval. 
> {{registerJob}}, however, forgets to reset this if the job is re-registered 
> again and so the job's blobs will be cleaned up although it is still used!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4568: [FLINK-7483][blob] prevent cleanup of re-registere...

2017-08-21 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4568

[FLINK-7483][blob] prevent cleanup of re-registered jobs

## What is the purpose of the change

Since #4238, when a job is registered but was released before and the ref 
count hit `0`, its cleanup timeout was not reset. This fixes that and adds more 
reference-counter tests.

## Brief change log

- reset the cleanup timeout after job re-registration
- add a test for verifying the reference counters contain the expected 
values

## Verifying this change

This change added a test for verifying reference counter values: 
`BlobCacheCleanupTest#testJobReferences`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-6916-7057-hotfix2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4568.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4568


commit a0a5e2ebf999867462ed31257f30fd777f4fb5f4
Author: Nico Kruber 
Date:   2017-08-21T08:36:56Z

[FLINK-7483][blob] prevent cleanup of re-registered jobs

When a job is registered, it may have been released before and we thus need 
to
reset the cleanup timeout again.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7483) BlobCache cleanup timer not reset after job re-registration

2017-08-21 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-7483:
--

 Summary: BlobCache cleanup timer not reset after job 
re-registration
 Key: FLINK-7483
 URL: https://issues.apache.org/jira/browse/FLINK-7483
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination, Network
Affects Versions: 1.4.0
Reporter: Nico Kruber
Assignee: Nico Kruber


Since FLINK-7057, the blob cache handles cleanup via {{registerJob}} and 
{{releaseJob}} calls where the latter sets a cleanup interval. {{registerJob}}, 
however, forgets to reset this if the job is re-registered again and so the 
job's blobs will be cleaned up although it is still used!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7479) Support to retrieve the past event by physical offset

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134985#comment-16134985
 ] 

ASF GitHub Bot commented on FLINK-7479:
---

Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4563
  
Thanks @dawidwys for the suggestion. I will update the PR accordingly.


> Support to retrieve the past event by physical offset 
> --
>
> Key: FLINK-7479
> URL: https://issues.apache.org/jira/browse/FLINK-7479
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, it's already able to retrieve events matched to the specifed 
> pattern in {{IterativeCondition.Context}}. While there are also requirements 
> to retrieve events by an physical offset. The retrieved events may not be 
> matched to any pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4563: [FLINK-7479] [cep] Support to retrieve the past event by ...

2017-08-21 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4563
  
Thanks @dawidwys for the suggestion. I will update the PR accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6938) IterativeCondition should support RichFunction interface

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134982#comment-16134982
 ] 

ASF GitHub Bot commented on FLINK-6938:
---

Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4513
  
Thanks @dawidwys for the remind. Yes, you're right and that make sense to 
me. I will update the the PR and remove ConditionRegistry related changes.


> IterativeCondition should support RichFunction interface
> 
>
> Key: FLINK-6938
> URL: https://issues.apache.org/jira/browse/FLINK-6938
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Jark Wu
>Assignee: Jark Wu
> Fix For: 1.4.0
>
>
> In FLIP-20, we need IterativeCondition to support an {{open()}} method to 
> compile the generated code once. We do not want to insert a if condition  in 
> the {{filter()}} method. So I suggest make IterativeCondition support 
> {{RichFunction}} interface.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4513: [FLINK-6938][FLINK-6939] [cep] Not store IterativeConditi...

2017-08-21 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4513
  
Thanks @dawidwys for the remind. Yes, you're right and that make sense to 
me. I will update the the PR and remove ConditionRegistry related changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6938) IterativeCondition should support RichFunction interface

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134963#comment-16134963
 ] 

ASF GitHub Bot commented on FLINK-6938:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4513
  
Hmm, I read back all the comments to previous PRs and I think the consesus 
was that we do not want to introduce the ConditionRegistry at that time, but 
start with just FLINK-6938 to enable SQL integration. 

See comments on that PR: #4172


> IterativeCondition should support RichFunction interface
> 
>
> Key: FLINK-6938
> URL: https://issues.apache.org/jira/browse/FLINK-6938
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Jark Wu
>Assignee: Jark Wu
> Fix For: 1.4.0
>
>
> In FLIP-20, we need IterativeCondition to support an {{open()}} method to 
> compile the generated code once. We do not want to insert a if condition  in 
> the {{filter()}} method. So I suggest make IterativeCondition support 
> {{RichFunction}} interface.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4513: [FLINK-6938][FLINK-6939] [cep] Not store IterativeConditi...

2017-08-21 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4513
  
Hmm, I read back all the comments to previous PRs and I think the consesus 
was that we do not want to introduce the ConditionRegistry at that time, but 
start with just FLINK-6938 to enable SQL integration. 

See comments on that PR: #4172


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-7227) OR expression with more than 2 predicates is not pushed into a TableSource

2017-08-21 Thread Usman Younas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Usman Younas reassigned FLINK-7227:
---

Assignee: Usman Younas

> OR expression with more than 2 predicates is not pushed into a TableSource
> --
>
> Key: FLINK-7227
> URL: https://issues.apache.org/jira/browse/FLINK-7227
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>
> It seems that {{RexNodeToExpressionConverter}} cannot handle OR expressions 
> with more than 2 predicates. Therefore the expression is not pushed into a 
> {{FilterableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134933#comment-16134933
 ] 

ASF GitHub Bot commented on FLINK-6442:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r134160516
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
--- End diff --

Please add a `deprecated` comment to the javadoc to tell users which new 
api should be used.


> Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in 
> SQL
> ---
>
> Key: FLINK-6442
> URL: https://issues.apache.org/jira/browse/FLINK-6442
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> Currently in TableAPI  there’s only registration method for source table,  
> when we use SQL writing a streaming job, we should add additional part for 
> the sink, like TableAPI does:
> {code}
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> val t = StreamTestData.getSmall3TupleDataStream(env)
> tEnv.registerDataStream("MyTable", t)
> // one way: invoke tableAPI’s writeToSink method directly
> val result = tEnv.sql(sqlQuery)
> result.writeToSink(new YourStreamSink)
> // another way: convert to datastream first and then invoke addSink 
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
> result.addSink(new StreamITCase.StringSink)
> {code}
> From the api we can see the sink table always be a derived table because its 
> 'schema' is inferred from the result type of upstream query.
> Compare to traditional RDBMS which support DML syntax, a query with a target 
> output could be written like this:
> {code}
> insert into table target_table_name
> [(column_name [ ,...n ])]
> query
> {code}
> The equivalent form of the example above is as follows:
> {code}
> tEnv.registerTableSink("targetTable", new YourSink)
> val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
> val result = tEnv.sql(sql)
> {code}
> It is supported by Calcite’s grammar: 
> {code}
>  insert:( INSERT | UPSERT ) INTO tablePrimary
>  [ '(' column [, column ]* ')' ]
>  query
> {code}
> I'd like to extend Flink TableAPI to support such feature.  see design doc: 
> https://goo.gl/n3phK5



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   >