[GitHub] flink issue #4513: [FLINK-6938] [cep] IterativeCondition should support Rich...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4513 @dawidwys I have updated the PR and it currently only contains changes of the RichFunction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6938) IterativeCondition should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-6938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136338#comment-16136338 ] ASF GitHub Bot commented on FLINK-6938: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4513 @dawidwys I have updated the PR and it currently only contains changes of the RichFunction. > IterativeCondition should support RichFunction interface > > > Key: FLINK-6938 > URL: https://issues.apache.org/jira/browse/FLINK-6938 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Jark Wu >Assignee: Jark Wu > Fix For: 1.4.0 > > > In FLIP-20, we need IterativeCondition to support an {{open()}} method to > compile the generated code once. We do not want to insert a if condition in > the {{filter()}} method. So I suggest make IterativeCondition support > {{RichFunction}} interface. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7398) Table API operators/UDFs must not store Logger
[ https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136280#comment-16136280 ] Haohui Mai commented on FLINK-7398: --- +1 on logging trait. I'll submit a PR. Adding a checkstyle rule is also a good idea. > Table API operators/UDFs must not store Logger > -- > > Key: FLINK-7398 > URL: https://issues.apache.org/jira/browse/FLINK-7398 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Haohui Mai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: Example.png > > > Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in > an instance field (c.f. > https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39). > This means that the {{Logger}} will be serialised with the UDF and sent to > the cluster. This in itself does not sound right and leads to problems when > the slf4j configuration on the Client is different from the cluster > environment. > This is an example of a user running into that problem: > https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E. > Here, they have Logback on the client but the Logback classes are not > available on the cluster and so deserialisation of the UDFs fails with a > {{ClassNotFoundException}}. > This is a rough list of the involved classes: > {code} > src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28: > val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28: > val LOGICAL: Convention = new Convention.Impl("LOGICAL", > classOf[FlinkLogicalRel]) > src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38: val > LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( > src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48: > val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66: > val LOG =
[jira] [Commented] (FLINK-7423) Always reuse an instance to get elements from the inputFormat
[ https://issues.apache.org/jira/browse/FLINK-7423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136247#comment-16136247 ] ASF GitHub Bot commented on FLINK-7423: --- Github user XuPingyong commented on the issue: https://github.com/apache/flink/pull/4525 Thanks @StephanEwen , so I think `null` values can be returned from `InputFormat#nextRecord` anytime, even not end. Can `null` values be passed to `InputFormat#nextRecord`, or checkNull(if `null`, give a new object) before passed? > Always reuse an instance to get elements from the inputFormat > --- > > Key: FLINK-7423 > URL: https://issues.apache.org/jira/browse/FLINK-7423 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Xu Pingyong >Assignee: Xu Pingyong > > In InputFormatSourceFunction.java: > {code:java} > OUT nextElement = serializer.createInstance(); > while (isRunning) { > format.open(splitIterator.next()); > // for each element we also check if cancel > // was called by checking the isRunning flag > while (isRunning && !format.reachedEnd()) { > nextElement = > format.nextRecord(nextElement); > if (nextElement != null) { > ctx.collect(nextElement); > } else { > break; > } > } > format.close(); > completedSplitsCounter.inc(); > if (isRunning) { > isRunning = splitIterator.hasNext(); > } > } > {code} > the format may return other element or null when nextRecord, that will may > cause exception. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4525: [FLINK-7423] Always reuse an instance to get elements fro...
Github user XuPingyong commented on the issue: https://github.com/apache/flink/pull/4525 Thanks @StephanEwen , so I think `null` values can be returned from `InputFormat#nextRecord` anytime, even not end. Can `null` values be passed to `InputFormat#nextRecord`, or checkNull(if `null`, give a new object) before passed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4570: [FLINK-7438][DataStream API]Remove useless import,...
GitHub user yew1eb opened a pull request: https://github.com/apache/flink/pull/4570 [FLINK-7438][DataStream API]Remove useless import, avoid warnings ## What is the purpose of the change Avoid warningsï¼details: [ISSUE #FLINK-7438](https://issues.apache.org/jira/browse/FLINK-7438). ## Brief change log Remove useless "import org.apache.flink.util.OutputTag" in DataStream.scala, AllWindowedStream.scala, WindowedStream.scala ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yew1eb/flink FLINK-7438 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4570.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4570 commit d28f4b8eca85230faea8c3b8c9b868741fc63886 Author: yew1ebDate: 2017-08-22T03:16:51Z Remove useless import, avoid warnings --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7438) Some classes are eclipsed by classes in package scala
[ https://issues.apache.org/jira/browse/FLINK-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136217#comment-16136217 ] ASF GitHub Bot commented on FLINK-7438: --- GitHub user yew1eb opened a pull request: https://github.com/apache/flink/pull/4570 [FLINK-7438][DataStream API]Remove useless import, avoid warnings ## What is the purpose of the change Avoid warnings,details: [ISSUE #FLINK-7438](https://issues.apache.org/jira/browse/FLINK-7438). ## Brief change log Remove useless "import org.apache.flink.util.OutputTag" in DataStream.scala, AllWindowedStream.scala, WindowedStream.scala ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yew1eb/flink FLINK-7438 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4570.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4570 commit d28f4b8eca85230faea8c3b8c9b868741fc63886 Author: yew1ebDate: 2017-08-22T03:16:51Z Remove useless import, avoid warnings > Some classes are eclipsed by classes in package scala > - > > Key: FLINK-7438 > URL: https://issues.apache.org/jira/browse/FLINK-7438 > Project: Flink > Issue Type: Bug > Components: Build System, DataStream API >Reporter: Ted Yu >Priority: Minor > > Noticed the following during compilation: > {code} > [WARNING] > /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33: > warning: imported `OutputTag' is permanently hidden by definition of > object OutputTag in package scala > [WARNING] import org.apache.flink.util.{Collector, OutputTag} > [WARNING] ^ > [WARNING] > /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33: > warning: imported `OutputTag' is permanently hidden by definition of > class OutputTag in package scala > [WARNING] import org.apache.flink.util.{Collector, OutputTag} > {code} > We should avoid the warning e.r.t. OutputTag. > There may be other occurrences of similar warning. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7479) Support to retrieve the past event by physical offset
[ https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136210#comment-16136210 ] ASF GitHub Bot commented on FLINK-7479: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4563 @dawidwys Have updated the PR according to your comments. > Support to retrieve the past event by physical offset > -- > > Key: FLINK-7479 > URL: https://issues.apache.org/jira/browse/FLINK-7479 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > > Currently, it's already able to retrieve events matched to the specifed > pattern in {{IterativeCondition.Context}}. While there are also requirements > to retrieve events by an physical offset. The retrieved events may not be > matched to any pattern. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4563: [FLINK-7479] [cep] Support to retrieve the past event by ...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4563 @dawidwys Have updated the PR according to your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4422) Convert all time interval measurements to System.nanoTime()
[ https://issues.apache.org/jira/browse/FLINK-4422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136194#comment-16136194 ] Hai Zhou commented on FLINK-4422: - Hi [~StephanEwen], This is still a issue? If yes, I would like to work on this issue. Use the methods provided by [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/clock/SystemClock.java] to replace the time measurement code, such as relative / intervals. > Convert all time interval measurements to System.nanoTime() > --- > > Key: FLINK-4422 > URL: https://issues.apache.org/jira/browse/FLINK-4422 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Jin Mingjian >Priority: Minor > > In contrast to {{System.currentTimeMillis()}}, {{System.nanoTime()}} is > monotonous. To measure delays and time intervals, {{System.nanoTime()}} is > hence reliable, while {{System.currentTimeMillis()}} is not. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136174#comment-16136174 ] Jark Wu commented on FLINK-7465: Hi [~sunjincheng121], Regarding to the when to configure the accuracy, I think maybe before the function is registered is better. (1). the accuracy parameter can be checked before runtime (2) do not need to check whether the bitarray and hash function is initialized when every time the accumulator is called. Regarding to the de/serialize, - de/serialization bitArray every call the accumulate It maybe to expensive as I mentioned before. - de/serialization bitArray in check point We can't we do as I know. We implement the UDAGG by {{State}} interface. If we use Heap backend, the deserialization only happens in checkpoint, but if it is RocksDB backend, the de/serialization happens in every update. - de/serialization bitArray in open/close Do you mean not use State in UDAGG? How can we do the exactly-once then? I'm designing an implementation which is based on MapView(i.e. MapView) , it acts like {{long[]}} which can be used as bitarray or bitmap, but have a better performance. In this way, we only have to deserialize several longs in every {{accumulate}} call. What do you think? [~fhueske] [~sunjincheng121] > Add build-in BloomFilterCount on TableAPI > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136167#comment-16136167 ] ASF GitHub Bot commented on FLINK-7367: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 @tzulitai any more feedbacks? We have a ticket on my company for this task, and I'd like to mark it as finished if possible :) > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 @tzulitai any more feedbacks? We have a ticket on my company for this task, and I'd like to mark it as finished if possible :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7366) Upgrade kinesis producer library in flink-connector-kinesis
[ https://issues.apache.org/jira/browse/FLINK-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136166#comment-16136166 ] ASF GitHub Bot commented on FLINK-7366: --- GitHub user bowenli86 reopened a pull request: https://github.com/apache/flink/pull/4522 [FLINK-7366][kinesis connector] Upgrade kinesis producer library in flink-connector-kinesis ## What is the purpose of the change We need to upgrade KPL and KCL to pick up the enhanced performance and stability for Flink to work better with Kinesis. Upgrading KPL is specially necessary, because the KPL version Flink uses is old, and doesn't have good retry and error handling logic. **Upgrade KPL:** flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, which is released in Nov 2015 by AWS. It's old. It's the fourth release, and thus problematic. It doesn't even have good retry logic, therefore Flink fails really frequently (about every 10 mins as we observed) when Flink writes too fast to Kinesis and receives RateLimitExceededException, Quotes from https://github.com/awslabs/amazon-kinesis-producer/issues/56, "With the newer version of the KPL it uses the AWS C++ SDK which should offer additional retries." on Oct 2016. 0.12.5, the version we are upgrading to, is released in May 2017 and should have the enhanced retry logic. **Upgrade KCL:** Upgrade KCL from 1.6.2 to 1.8.1 **Upgrade AWS SDK:** from 1.10.71 to 1.11.171 ## Verifying this change This change is already covered by existing tests ## Does this pull request potentially affect one of the following parts: There should be any impact outside flink-connector-kinesis, because 1) KPL and KCL is only used in flink-connector-kinesis, and 2) AWS SDK in flink-connector-kinesis is shaded ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7366 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4522.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4522 commit 7486878631e7238283eabfdd575387d32b210f91 Author: Bowen LiDate: 2017-08-10T22:09:56Z FLINK-7366 Upgrade kinesis-producer-library in flink-connector-kinesis from 0.10.2 to 0.12.5 commit ba9fb5f1e189e31f8085bd384edbb682a69aae7f Author: Bowen Li Date: 2017-08-10T23:46:59Z upgrade KCL and AWS SDK commit 88a16efcaf4c7e2addcf978a98f8ffbea4281639 Author: Bowen Li Date: 2017-08-13T05:49:26Z revert changes to KCL > Upgrade kinesis producer library in flink-connector-kinesis > --- > > Key: FLINK-7366 > URL: https://issues.apache.org/jira/browse/FLINK-7366 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.2 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > We need to upgrade KPL and KCL to pick up the enhanced performance and > stability for Flink to work better with Kinesis. Upgrading KPL is specially > necessary, because the KPL version Flink uses is old, and doesn't have good > retry and error handling logic. > *KPL:* > flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, which > is released in Nov 2015 by AWS. It's old. It's the fourth release, and thus > problematic. It doesn't even have good retry logic, therefore Flink fails > really frequently (about every 10 mins as we observed) when Flink writes too > fast to Kinesis and receives RateLimitExceededException, > Quotes from https://github.com/awslabs/amazon-kinesis-producer/issues/56, > "*With the newer version of the KPL it uses the AWS C++ SDK which should > offer additional retries.*" on Oct 2016. 0.12.5, the version we are upgrading > to, is released in May 2017 and should have the enhanced retry logic. > *KCL:* > Upgrade KCL from 1.6.2 to 1.8.1 > *AWS SDK* > from 1.10.71 to 1.11.171 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7366) Upgrade kinesis producer library in flink-connector-kinesis
[ https://issues.apache.org/jira/browse/FLINK-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136163#comment-16136163 ] ASF GitHub Bot commented on FLINK-7366: --- Github user bowenli86 closed the pull request at: https://github.com/apache/flink/pull/4522 > Upgrade kinesis producer library in flink-connector-kinesis > --- > > Key: FLINK-7366 > URL: https://issues.apache.org/jira/browse/FLINK-7366 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.2 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > We need to upgrade KPL and KCL to pick up the enhanced performance and > stability for Flink to work better with Kinesis. Upgrading KPL is specially > necessary, because the KPL version Flink uses is old, and doesn't have good > retry and error handling logic. > *KPL:* > flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, which > is released in Nov 2015 by AWS. It's old. It's the fourth release, and thus > problematic. It doesn't even have good retry logic, therefore Flink fails > really frequently (about every 10 mins as we observed) when Flink writes too > fast to Kinesis and receives RateLimitExceededException, > Quotes from https://github.com/awslabs/amazon-kinesis-producer/issues/56, > "*With the newer version of the KPL it uses the AWS C++ SDK which should > offer additional retries.*" on Oct 2016. 0.12.5, the version we are upgrading > to, is released in May 2017 and should have the enhanced retry logic. > *KCL:* > Upgrade KCL from 1.6.2 to 1.8.1 > *AWS SDK* > from 1.10.71 to 1.11.171 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4522: [FLINK-7366][kinesis connector] Upgrade kinesis pr...
GitHub user bowenli86 reopened a pull request: https://github.com/apache/flink/pull/4522 [FLINK-7366][kinesis connector] Upgrade kinesis producer library in flink-connector-kinesis ## What is the purpose of the change We need to upgrade KPL and KCL to pick up the enhanced performance and stability for Flink to work better with Kinesis. Upgrading KPL is specially necessary, because the KPL version Flink uses is old, and doesn't have good retry and error handling logic. **Upgrade KPL:** flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, which is released in Nov 2015 by AWS. It's old. It's the fourth release, and thus problematic. It doesn't even have good retry logic, therefore Flink fails really frequently (about every 10 mins as we observed) when Flink writes too fast to Kinesis and receives RateLimitExceededException, Quotes from https://github.com/awslabs/amazon-kinesis-producer/issues/56, "With the newer version of the KPL it uses the AWS C++ SDK which should offer additional retries." on Oct 2016. 0.12.5, the version we are upgrading to, is released in May 2017 and should have the enhanced retry logic. **Upgrade KCL:** Upgrade KCL from 1.6.2 to 1.8.1 **Upgrade AWS SDK:** from 1.10.71 to 1.11.171 ## Verifying this change This change is already covered by existing tests ## Does this pull request potentially affect one of the following parts: There should be any impact outside flink-connector-kinesis, because 1) KPL and KCL is only used in flink-connector-kinesis, and 2) AWS SDK in flink-connector-kinesis is shaded ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7366 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4522.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4522 commit 7486878631e7238283eabfdd575387d32b210f91 Author: Bowen LiDate: 2017-08-10T22:09:56Z FLINK-7366 Upgrade kinesis-producer-library in flink-connector-kinesis from 0.10.2 to 0.12.5 commit ba9fb5f1e189e31f8085bd384edbb682a69aae7f Author: Bowen Li Date: 2017-08-10T23:46:59Z upgrade KCL and AWS SDK commit 88a16efcaf4c7e2addcf978a98f8ffbea4281639 Author: Bowen Li Date: 2017-08-13T05:49:26Z revert changes to KCL --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7366) Upgrade kinesis producer library in flink-connector-kinesis
[ https://issues.apache.org/jira/browse/FLINK-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136162#comment-16136162 ] ASF GitHub Bot commented on FLINK-7366: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4522 @tzulitai any more feedbacks? We have a ticket on my company for this task, and I'd like to mark it as finished if possible :) > Upgrade kinesis producer library in flink-connector-kinesis > --- > > Key: FLINK-7366 > URL: https://issues.apache.org/jira/browse/FLINK-7366 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.2 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > We need to upgrade KPL and KCL to pick up the enhanced performance and > stability for Flink to work better with Kinesis. Upgrading KPL is specially > necessary, because the KPL version Flink uses is old, and doesn't have good > retry and error handling logic. > *KPL:* > flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, which > is released in Nov 2015 by AWS. It's old. It's the fourth release, and thus > problematic. It doesn't even have good retry logic, therefore Flink fails > really frequently (about every 10 mins as we observed) when Flink writes too > fast to Kinesis and receives RateLimitExceededException, > Quotes from https://github.com/awslabs/amazon-kinesis-producer/issues/56, > "*With the newer version of the KPL it uses the AWS C++ SDK which should > offer additional retries.*" on Oct 2016. 0.12.5, the version we are upgrading > to, is released in May 2017 and should have the enhanced retry logic. > *KCL:* > Upgrade KCL from 1.6.2 to 1.8.1 > *AWS SDK* > from 1.10.71 to 1.11.171 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4522: [FLINK-7366][kinesis connector] Upgrade kinesis pr...
Github user bowenli86 closed the pull request at: https://github.com/apache/flink/pull/4522 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4522: [FLINK-7366][kinesis connector] Upgrade kinesis producer ...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4522 @tzulitai any more feedbacks? We have a ticket on my company for this task, and I'd like to mark it as finished if possible :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-7398) Table API operators/UDFs must not store Logger
[ https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136140#comment-16136140 ] Jark Wu edited comment on FLINK-7398 at 8/22/17 1:47 AM: - +1 Logging trait. BTW, I think to avoid the misusage of {{LOG}} by inherit the {{Logging}} trait is not perfect. What if user not inherit the Logging trait? I would like to add a checkstyle to avoid using slf4j Logger. was (Author: jark): +1 Logging trait. BTW, I think to avoid the misusage of {{LOG}} by inherit the {{Logging}} trait is not perfect. What if user not inherit the Logging trait? I would like to add a checkstyle to avoid using slf4j Logger directly. > Table API operators/UDFs must not store Logger > -- > > Key: FLINK-7398 > URL: https://issues.apache.org/jira/browse/FLINK-7398 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Haohui Mai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: Example.png > > > Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in > an instance field (c.f. > https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39). > This means that the {{Logger}} will be serialised with the UDF and sent to > the cluster. This in itself does not sound right and leads to problems when > the slf4j configuration on the Client is different from the cluster > environment. > This is an example of a user running into that problem: > https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E. > Here, they have Logback on the client but the Logback classes are not > available on the cluster and so deserialisation of the UDFs fails with a > {{ClassNotFoundException}}. > This is a rough list of the involved classes: > {code} > src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28: > val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28: > val LOGICAL: Convention = new Convention.Impl("LOGICAL", > classOf[FlinkLogicalRel]) > src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38: val > LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( > src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52: > val LOG = LoggerFactory.getLogger(this.getClass)
[jira] [Commented] (FLINK-7398) Table API operators/UDFs must not store Logger
[ https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136140#comment-16136140 ] Jark Wu commented on FLINK-7398: +1 Logging trait. BTW, I think to avoid the misusage of {{LOG}} by inherit the {{Logging}} trait is not perfect. What if user not inherit the Logging trait? I would like to add a checkstyle to avoid using slf4j Logger directly. > Table API operators/UDFs must not store Logger > -- > > Key: FLINK-7398 > URL: https://issues.apache.org/jira/browse/FLINK-7398 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Haohui Mai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: Example.png > > > Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in > an instance field (c.f. > https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39). > This means that the {{Logger}} will be serialised with the UDF and sent to > the cluster. This in itself does not sound right and leads to problems when > the slf4j configuration on the Client is different from the cluster > environment. > This is an example of a user running into that problem: > https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E. > Here, they have Logback on the client but the Logback classes are not > available on the cluster and so deserialisation of the UDFs fails with a > {{ClassNotFoundException}}. > This is a rough list of the involved classes: > {code} > src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28: > val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28: > val LOGICAL: Convention = new Convention.Impl("LOGICAL", > classOf[FlinkLogicalRel]) > src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38: val > LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( > src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48: > val LOG: Logger =
[jira] [Updated] (FLINK-7488) TaskManagerHeapSizeCalculationJavaBashTest sometimes fails
[ https://issues.apache.org/jira/browse/FLINK-7488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7488: -- Description: {code} compareNetworkBufShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.239 sec <<< FAILURE! org.junit.ComparisonFailure: Different network buffer memory sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600> at org.junit.Assert.assertEquals(Assert.java:115) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:235) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:81) compareHeapSizeShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.16 sec <<< FAILURE! org.junit.ComparisonFailure: Different heap sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]1000> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]1000> at org.junit.Assert.assertEquals(Assert.java:115) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:275) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:110){code} $HADOOP_CONF_DIR was not set prior to running the test. was: {code} TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeShellScriptWithJava:110->compareHeapSizeJavaVsScript:275 Different heap sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]1000> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]1000> TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava:81->compareNetworkBufJavaVsScript:235 Different network buffer memory sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath:
[jira] [Created] (FLINK-7488) TaskManagerHeapSizeCalculationJavaBashTest sometimes fails
Ted Yu created FLINK-7488: - Summary: TaskManagerHeapSizeCalculationJavaBashTest sometimes fails Key: FLINK-7488 URL: https://issues.apache.org/jira/browse/FLINK-7488 Project: Flink Issue Type: Test Reporter: Ted Yu Priority: Minor {code} TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeShellScriptWithJava:110->compareHeapSizeJavaVsScript:275 Different heap sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]1000> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]1000> TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava:81->compareNetworkBufJavaVsScript:235 Different network buffer memory sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600> {code} $HADOOP_CONF_DIR was not set prior to running the test. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136062#comment-16136062 ] sunjincheng commented on FLINK-7465: [~fhueske] I want add accuracy and maxElement as function parameter,the function signature looks like: {code} count-bf(accuracy:Double, maxKeyCount, col:Any) {code} And we will use the following formula to calculate the bitarray size(bsize): {code} (-maxKeyCount * Math.log(accuracy) / (Math.log(2) * Math.log(2))) {code} And we will use the following formula to calculate the cont of hash function: {code} Math.max(1, Math.round(bsize.asInstanceOf[Double] / maxKeyCount * Math.log(2))) {code} The formula same as the reference of the JIRA. description. That mean we configure the accuracy when the function is used. Is this make sense for you? [~fhueske] I think {{count-min}} is very useful in some certain cases. so does the {{HyperLogLog}} (cardinality counting). After we complete the this JIRA. we can discuss these implementations. [~jark] The de/serialize of bitArray if very important in the implementation. I think the best way is do the de/serialization at check point or in {{open/close}} method, but currently we can not access the {{RuntimeContext}} from {{FunctionContext}},we need do some change. OR using DataView. Currently In my mind we have some choices as follows: * de/serialization bitArray every call the {{accumulate}}(bitArray as member of ACC) * de/serialization bitArray in check point.( bitArray as member of AGG) * de/serialization bitArray in {{open/close}} .( bitArray as member of AGG) What do you think? [~jark] [~fhueske] > Add build-in BloomFilterCount on TableAPI > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7482) StringWriter to support compression
[ https://issues.apache.org/jira/browse/FLINK-7482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-7482. - Resolution: Not A Bug Hi [~felixcheung]. This is a great question for the dev or user [mailing lists|https://flink.apache.org/community.html#mailing-lists]. > StringWriter to support compression > --- > > Key: FLINK-7482 > URL: https://issues.apache.org/jira/browse/FLINK-7482 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.3.2 >Reporter: Felix Cheung > > Is it possible to have StringWriter support compression like > AvroKeyValueSinkWriter or SequenceFileWriter? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7454) update 'Monitoring Current Event Time' section of Flink doc
[ https://issues.apache.org/jira/browse/FLINK-7454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136025#comment-16136025 ] ASF GitHub Bot commented on FLINK-7454: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4547 any feedback? > update 'Monitoring Current Event Time' section of Flink doc > --- > > Key: FLINK-7454 > URL: https://issues.apache.org/jira/browse/FLINK-7454 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.3.2 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > Since FLINK-3427 is done, there's no need to have the following doc in > https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_event_time.html#monitoring-current-event-time > "There are plans (see FLINK-3427) to show the current low watermark for each > operator in the Flink web interface. > Until this feature is implemented the current low watermark for each task can > be accessed through the metrics system." > We can replace it with something like "Low watermarks of each task can be > accessed either from Flink web interface or Flink metric system." -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4547: [FLINK-7454][docs] update 'Monitoring Current Event Time'...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4547 any feedback? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-7398) Table API operators/UDFs must not store Logger
[ https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Park updated FLINK-7398: -- Attachment: Example.png > Table API operators/UDFs must not store Logger > -- > > Key: FLINK-7398 > URL: https://issues.apache.org/jira/browse/FLINK-7398 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Haohui Mai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: Example.png > > > Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in > an instance field (c.f. > https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39). > This means that the {{Logger}} will be serialised with the UDF and sent to > the cluster. This in itself does not sound right and leads to problems when > the slf4j configuration on the Client is different from the cluster > environment. > This is an example of a user running into that problem: > https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E. > Here, they have Logback on the client but the Logback classes are not > available on the cluster and so deserialisation of the UDFs fails with a > {{ClassNotFoundException}}. > This is a rough list of the involved classes: > {code} > src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28: > val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28: > val LOGICAL: Convention = new Convention.Impl("LOGICAL", > classOf[FlinkLogicalRel]) > src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38: val > LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( > src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48: > val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) >
[jira] [Comment Edited] (FLINK-7398) Table API operators/UDFs must not store Logger
[ https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135534#comment-16135534 ] Jacob Park edited comment on FLINK-7398 at 8/21/17 6:28 PM: [~wheat9] Why not follow Apache Spark's example for this problem? https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/Logging.scala By having a Logging trait with a transient lazy LOG. If you want safe logging, you inherit the trait, and the current existing code will produce a compile-time error by the conflict, which you can use to fix the bad logging. Edit: Refer to my Example.png. !https://issues.apache.org/jira/secure/attachment/12882940/Example.png! I can take on this task if you want. was (Author: jparkie): [~wheat9] Why not follow Apache Spark's example for this problem? https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/Logging.scala By having a Logging trait with a transient lazy LOG. If you want safe logging, you inherit the trait, and the current existing code will produce a compile-time error by the conflict, which you can use to fix the bad logging. Edit: Refer to my Example.png. !https://issues.apache.org/jira/secure/attachment/12882932/Example.png! I can take on this task if you want. > Table API operators/UDFs must not store Logger > -- > > Key: FLINK-7398 > URL: https://issues.apache.org/jira/browse/FLINK-7398 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Haohui Mai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: Example.png > > > Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in > an instance field (c.f. > https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39). > This means that the {{Logger}} will be serialised with the UDF and sent to > the cluster. This in itself does not sound right and leads to problems when > the slf4j configuration on the Client is different from the cluster > environment. > This is an example of a user running into that problem: > https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E. > Here, they have Logback on the client but the Logback classes are not > available on the cluster and so deserialisation of the UDFs fails with a > {{ClassNotFoundException}}. > This is a rough list of the involved classes: > {code} > src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28: > val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28: > val LOGICAL: Convention = new Convention.Impl("LOGICAL", > classOf[FlinkLogicalRel]) > src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38: val > LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( > src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) >
[jira] [Updated] (FLINK-7398) Table API operators/UDFs must not store Logger
[ https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Park updated FLINK-7398: -- Attachment: (was: Example.png) > Table API operators/UDFs must not store Logger > -- > > Key: FLINK-7398 > URL: https://issues.apache.org/jira/browse/FLINK-7398 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Haohui Mai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in > an instance field (c.f. > https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39). > This means that the {{Logger}} will be serialised with the UDF and sent to > the cluster. This in itself does not sound right and leads to problems when > the slf4j configuration on the Client is different from the cluster > environment. > This is an example of a user running into that problem: > https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E. > Here, they have Logback on the client but the Logback classes are not > available on the cluster and so deserialisation of the UDFs fails with a > {{ClassNotFoundException}}. > This is a rough list of the involved classes: > {code} > src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28: > val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28: > val LOGICAL: Convention = new Convention.Impl("LOGICAL", > classOf[FlinkLogicalRel]) > src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38: val > LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( > src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48: > val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala:61: >
[jira] [Comment Edited] (FLINK-7398) Table API operators/UDFs must not store Logger
[ https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135534#comment-16135534 ] Jacob Park edited comment on FLINK-7398 at 8/21/17 6:17 PM: [~wheat9] Why not follow Apache Spark's example for this problem? https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/Logging.scala By having a Logging trait with a transient lazy LOG. If you want safe logging, you inherit the trait, and the current existing code will produce a compile-time error by the conflict, which you can use to fix the bad logging. Edit: Refer to my Example.png. !Example.png|thumbnail! I can take on this task if you want. was (Author: jparkie): [~wheat9] Why not follow Apache Spark's example for this problem? https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/Logging.scala By having a Logging trait with a transient lazy LOG. If you want safe logging, you inherit the trait, and the current existing code will produce a compile-time error by the conflict, which you can use to fix the bad logging. > Table API operators/UDFs must not store Logger > -- > > Key: FLINK-7398 > URL: https://issues.apache.org/jira/browse/FLINK-7398 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Haohui Mai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: Example.png > > > Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in > an instance field (c.f. > https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39). > This means that the {{Logger}} will be serialised with the UDF and sent to > the cluster. This in itself does not sound right and leads to problems when > the slf4j configuration on the Client is different from the cluster > environment. > This is an example of a user running into that problem: > https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E. > Here, they have Logback on the client but the Logback classes are not > available on the cluster and so deserialisation of the UDFs fails with a > {{ClassNotFoundException}}. > This is a rough list of the involved classes: > {code} > src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28: > val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28: > val LOGICAL: Convention = new Convention.Impl("LOGICAL", > classOf[FlinkLogicalRel]) > src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38: val > LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( > src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56: > val LOG = LoggerFactory.getLogger(this.getClass) >
[jira] [Comment Edited] (FLINK-7398) Table API operators/UDFs must not store Logger
[ https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135534#comment-16135534 ] Jacob Park edited comment on FLINK-7398 at 8/21/17 6:17 PM: [~wheat9] Why not follow Apache Spark's example for this problem? https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/Logging.scala By having a Logging trait with a transient lazy LOG. If you want safe logging, you inherit the trait, and the current existing code will produce a compile-time error by the conflict, which you can use to fix the bad logging. Edit: Refer to my Example.png. !https://issues.apache.org/jira/secure/attachment/12882932/Example.png! I can take on this task if you want. was (Author: jparkie): [~wheat9] Why not follow Apache Spark's example for this problem? https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/Logging.scala By having a Logging trait with a transient lazy LOG. If you want safe logging, you inherit the trait, and the current existing code will produce a compile-time error by the conflict, which you can use to fix the bad logging. Edit: Refer to my Example.png. !Example.png|thumbnail! I can take on this task if you want. > Table API operators/UDFs must not store Logger > -- > > Key: FLINK-7398 > URL: https://issues.apache.org/jira/browse/FLINK-7398 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Haohui Mai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: Example.png > > > Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in > an instance field (c.f. > https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39). > This means that the {{Logger}} will be serialised with the UDF and sent to > the cluster. This in itself does not sound right and leads to problems when > the slf4j configuration on the Client is different from the cluster > environment. > This is an example of a user running into that problem: > https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E. > Here, they have Logback on the client but the Logback classes are not > available on the cluster and so deserialisation of the UDFs fails with a > {{ClassNotFoundException}}. > This is a rough list of the involved classes: > {code} > src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28: > val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28: > val LOGICAL: Convention = new Convention.Impl("LOGICAL", > classOf[FlinkLogicalRel]) > src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38: val > LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( > src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) >
[jira] [Updated] (FLINK-7398) Table API operators/UDFs must not store Logger
[ https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Park updated FLINK-7398: -- Attachment: Example.png [~wheat9] Why not follow Apache Spark's example for this problem? https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/Logging.scala By having a Logging trait with a transient lazy LOG. If you want safe logging, you inherit the trait, and the current existing code will produce a compile-time error by the conflict, which you can use to fix the bad logging. > Table API operators/UDFs must not store Logger > -- > > Key: FLINK-7398 > URL: https://issues.apache.org/jira/browse/FLINK-7398 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Haohui Mai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: Example.png > > > Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in > an instance field (c.f. > https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39). > This means that the {{Logger}} will be serialised with the UDF and sent to > the cluster. This in itself does not sound right and leads to problems when > the slf4j configuration on the Client is different from the cluster > environment. > This is an example of a user running into that problem: > https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E. > Here, they have Logback on the client but the Logback classes are not > available on the cluster and so deserialisation of the UDFs fails with a > {{ClassNotFoundException}}. > This is a rough list of the involved classes: > {code} > src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28: > val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28: > val LOGICAL: Convention = new Convention.Impl("LOGICAL", > classOf[FlinkLogicalRel]) > src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38: val > LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( > src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55: > val LOG =
[jira] [Commented] (FLINK-5886) Python API for streaming applications
[ https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135429#comment-16135429 ] ASF GitHub Bot commented on FLINK-5886: --- Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/3838 The thing is that I use the ```PythonEnvironmentConfig.pythonTmpCachePath``` attribute to deliver information from the ```PythonStreamBinder``` to a class that is called from the python script. How would you suggest to do it otherwise? > Python API for streaming applications > - > > Key: FLINK-5886 > URL: https://issues.apache.org/jira/browse/FLINK-5886 > Project: Flink > Issue Type: New Feature > Components: Python API >Reporter: Zohar Mizrahi >Assignee: Zohar Mizrahi > > A work in progress to provide python interface for Flink streaming APIs. The > core technology is based on jython and thus imposes two limitations: a. user > defined functions cannot use python extensions. b. the python version is 2.x > The branch is based on Flink release 1.2.0, as can be found here: > https://github.com/zohar-pm/flink/tree/python-streaming > In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was > setup properly (see: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html), > one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, > which in return will execute all the tests under > {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/3838 The thing is that I use the ```PythonEnvironmentConfig.pythonTmpCachePath``` attribute to deliver information from the ```PythonStreamBinder``` to a class that is called from the python script. How would you suggest to do it otherwise? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7487) test instability in ClassLoaderITCase (no resources available)
Nico Kruber created FLINK-7487: -- Summary: test instability in ClassLoaderITCase (no resources available) Key: FLINK-7487 URL: https://issues.apache.org/jira/browse/FLINK-7487 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.4.0 Reporter: Nico Kruber This is the stack trace from https://travis-ci.org/NicoK/flink/jobs/266772103 which contains quite some changes but the error itself should be unrelated: {code} testKMeansJobWithCustomClassLoader(org.apache.flink.test.classloading.ClassLoaderITCase) Time elapsed: 0.604 sec <<< ERROR! org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) at org.apache.flink.test.classloading.ClassLoaderITCase.testKMeansJobWithCustomClassLoader(ClassLoaderITCase.java:232) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:930) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #0 (Map (Map at main(KMeansForTest.java:67)) (4/4)) @ (unassigned) - [SCHEDULED] > with groupID < 704e8c44f1c3edc91e03431408eb561d > in sharing group < SlotSharingGroup [727c589bfbe7c65aa4ffc75585a1e7e7, f82d7994fbfdd0aecab2c7f54e58f0c1, 62039db00aa28f9de4fa3df3b89fbc7d, 704e8c44f1c3edc91e03431408eb561d, 208a859a78f987562b4e8dcad6e90582, 9b9f002f990306532d6f153b38835b6f, 30f3d92eacc3068d3545693fe084a6b8, 74da3f65164120b4781de360723e60c0] >. Resources available to scheduler: Number of instances=2, total number of slots=4, available slots=0 at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:261) at org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:138) at org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:362) at org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:304) at org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596) at org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleOrUpdateConsumers$4(Execution.java:567) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} It seems that the job started in `testDisposeSavepointWithCustomKvState` is not properly shut down after the test method exits and (parts of) it remain and block resources for following tests. Copying the relevant parts of the log here: {code} 13:46:30,887 INFO org.apache.flink.test.classloading.ClassLoaderITCase - Test
[jira] [Updated] (FLINK-7487) test instability in ClassLoaderITCase (no resources available)
[ https://issues.apache.org/jira/browse/FLINK-7487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-7487: --- Labels: test-stability (was: ) > test instability in ClassLoaderITCase (no resources available) > -- > > Key: FLINK-7487 > URL: https://issues.apache.org/jira/browse/FLINK-7487 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0 >Reporter: Nico Kruber > Labels: test-stability > > This is the stack trace from https://travis-ci.org/NicoK/flink/jobs/266772103 > which contains quite some changes but the error itself should be unrelated: > {code} > testKMeansJobWithCustomClassLoader(org.apache.flink.test.classloading.ClassLoaderITCase) > Time elapsed: 0.604 sec <<< ERROR! > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) > at > org.apache.flink.test.classloading.ClassLoaderITCase.testKMeansJobWithCustomClassLoader(ClassLoaderITCase.java:232) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:930) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: < Attempt #0 (Map (Map at > main(KMeansForTest.java:67)) (4/4)) @ (unassigned) - [SCHEDULED] > with > groupID < 704e8c44f1c3edc91e03431408eb561d > in sharing group < > SlotSharingGroup [727c589bfbe7c65aa4ffc75585a1e7e7, > f82d7994fbfdd0aecab2c7f54e58f0c1, 62039db00aa28f9de4fa3df3b89fbc7d, > 704e8c44f1c3edc91e03431408eb561d, 208a859a78f987562b4e8dcad6e90582, > 9b9f002f990306532d6f153b38835b6f, 30f3d92eacc3068d3545693fe084a6b8, > 74da3f65164120b4781de360723e60c0] >. Resources available to scheduler: Number > of instances=2, total number of slots=4, available slots=0 > at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:261) > at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:138) > at > org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:362) > at > org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:304) > at > org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596) > at > org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleOrUpdateConsumers$4(Execution.java:567) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > It seems that the job started in `testDisposeSavepointWithCustomKvState` is >
[jira] [Closed] (FLINK-5671) Test ClassLoaderITCase#testJobsWithCustomClassLoader fails
[ https://issues.apache.org/jira/browse/FLINK-5671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber closed FLINK-5671. -- Resolution: Auto Closed Closing this now since there is not enough information to track this down and also the test class evolved and may exhibit different behaviour now. Feel free to re-open or create a new report. > Test ClassLoaderITCase#testJobsWithCustomClassLoader fails > -- > > Key: FLINK-5671 > URL: https://issues.apache.org/jira/browse/FLINK-5671 > Project: Flink > Issue Type: Bug > Components: Tests > Environment: Ubuntu 16.04 >Reporter: Anton Solovev > Labels: test-stability > > {code} > testJobsWithCustomClassLoader(org.apache.flink.test.classloading.ClassLoaderITCase) > Time elapsed: 41.75 sec <<< FAILURE! > java.lang.AssertionError: The program execution failed: Job execution failed. > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.test.classloading.ClassLoaderITCase.testJobsWithCustomClassLoader(ClassLoaderITCase.java:221) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...
Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r134248050 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala --- @@ -19,9 +19,21 @@ package org.apache.flink.table.api import _root_.java.io.Serializable + import org.apache.flink.api.common.time.Time -class QueryConfig private[table] extends Serializable {} +class QueryConfig private[table] extends Serializable { +} + +object QueryConfig { + def getQueryConfigFromTableEnv(tableEnv: TableEnvironment): QueryConfig = { --- End diff -- I'm not sure if we should move the `queryConfig` method to TableEnvironment or leave it as an utility method, what do you think? @fhueske --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
[ https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135246#comment-16135246 ] ASF GitHub Bot commented on FLINK-6442: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r134248050 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala --- @@ -19,9 +19,21 @@ package org.apache.flink.table.api import _root_.java.io.Serializable + import org.apache.flink.api.common.time.Time -class QueryConfig private[table] extends Serializable {} +class QueryConfig private[table] extends Serializable { +} + +object QueryConfig { + def getQueryConfigFromTableEnv(tableEnv: TableEnvironment): QueryConfig = { --- End diff -- I'm not sure if we should move the `queryConfig` method to TableEnvironment or leave it as an utility method, what do you think? @fhueske > Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in > SQL > --- > > Key: FLINK-6442 > URL: https://issues.apache.org/jira/browse/FLINK-6442 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > Currently in TableAPI there’s only registration method for source table, > when we use SQL writing a streaming job, we should add additional part for > the sink, like TableAPI does: > {code} > val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" > val t = StreamTestData.getSmall3TupleDataStream(env) > tEnv.registerDataStream("MyTable", t) > // one way: invoke tableAPI’s writeToSink method directly > val result = tEnv.sql(sqlQuery) > result.writeToSink(new YourStreamSink) > // another way: convert to datastream first and then invoke addSink > val result = tEnv.sql(sqlQuery).toDataStream[Row] > result.addSink(new StreamITCase.StringSink) > {code} > From the api we can see the sink table always be a derived table because its > 'schema' is inferred from the result type of upstream query. > Compare to traditional RDBMS which support DML syntax, a query with a target > output could be written like this: > {code} > insert into table target_table_name > [(column_name [ ,...n ])] > query > {code} > The equivalent form of the example above is as follows: > {code} > tEnv.registerTableSink("targetTable", new YourSink) > val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" > val result = tEnv.sql(sql) > {code} > It is supported by Calcite’s grammar: > {code} > insert:( INSERT | UPSERT ) INTO tablePrimary > [ '(' column [, column ]* ')' ] > query > {code} > I'd like to extend Flink TableAPI to support such feature. see design doc: > https://goo.gl/n3phK5 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135244#comment-16135244 ] ASF GitHub Bot commented on FLINK-7040: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4569 @tillrohrmann @kl0u Would be great if you could take a look. > Flip-6 client-cluster communication > --- > > Key: FLINK-7040 > URL: https://issues.apache.org/jira/browse/FLINK-7040 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: flip-6 > > With the new Flip-6 architecture, the client will communicate with the > cluster in a RESTful manner. > The cluster shall support the following REST calls: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Lookup job leader (GET): Gets the JM leader for the given job > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Cancel job (PUT): Cancel the given job > * Stop job (PUT): Stops the given job > * Take savepoint (POST): Take savepoint for given job (How to return the > savepoint under which the savepoint was stored? Maybe always having to > specify a path) > * Get KV state (GET): Gets the KV state for the given job and key (Queryable > state) > * Poll/subscribe to notifications for job (GET, WebSocket): Polls new > notifications from the execution of the given job/Opens WebSocket to receive > notifications > The first four REST calls will be served by the REST endpoint running in the > application master/cluster entrypoint. The other calls will be served by a > REST endpoint running along side to the JobManager. > Detailed information about different implementations and their pros and cons > can be found in this document: > https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing > The implementation will most likely be Netty based. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4569: [FLINK-7040] [REST] Add basics for REST communication
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4569 @tillrohrmann @kl0u Would be great if you could take a look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication
[ https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135238#comment-16135238 ] ASF GitHub Bot commented on FLINK-7040: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/4569 [FLINK-7040] [REST] Add basics for REST communication ## What is the purpose of the change This PR implements the fundamentals for generic Client/Server REST communication that will be used for the client/cluster communication, the WebRuntimeMonitor and queryable state. Endpoints `Endpoints` are the main runtime component that have to be started before any communication can happen. Their primary purpose is setting up the underlying netty stack. The `RestClientEndpoint` is a fully-functional class that provides an asynchronous API for sending requests/receiving responses based around `CompleteFutures`. Requests are sent in a synchronous fashion; a new request is only sent out after a response was received for the previous request (for simplicity). The `RestServerEndpoint` is an abstract class that is very similar to the `WebRuntimeMonitor`. Implementations have to implement a single method `abstract Collection> initializeHandlers();` that returns a collection of handlers that should be registered. Messages To send a request the client accepts 3 arguments: * a `MessageHeaders` * a `RequestBody` * a `ParameterMapper` `RequestBodies` represent the http message body, and will be converted to JSON using jackson-databind. `ParameterMappers` are used to assemble the final url, including query and path parameters `MessageHeaders` are stateless objects that define a link between requests and responses as well as provide meta-data about the communication for this particular pair. Headers have generic type arguments for requests and responses and provide some level of type-safeness; if the client and server use the same headers class the request/response type, url, http status codes etc. are all well-defined. Essentially, headers provide a tight coupling between handlers, clients, requests and responses (provided that implementations don't define arbitrary headers on the fly). For example, to define the communication for submitting a job one would define a `JobSubmitRequestBody` that contains the serialized job graph, a `JobSubmitResponseBody` containing the URL to track the job status and a `JobSubmitHeaders` class that define the HTTP method (POST), the url (e.g `/submit`, the response http status code (ACCEPTED). Handlers Someone has to deal with requests and send out responses, which is where the `AbstractRestHandler` comes into play. This is an abstract class that manages all the netty stuff, and, just like the `MessageHeaders`, has generic type arguments for a `RequestBody` and `ResponseBody`. Only a single method must be implemented, which accepts a request and returns a response: `abstract CompletableFuturehandleRequest(@Nonnull HandlerRequest request);` A `HandlerRequest` contains a `RequestBody` and maps for path/query parameters. A `HandlerResponse` contains either a `ResponseBody`, or an `ErrorResponse`. ## Brief change log - Add `MessageHeaders`, `Request-/ResponseBody` for modeling messages - Add `AbstractRestHandler`, `HandlerRequest/-Response` for message processing - Add `RestClient-/-ServerEndpoint` that setup netty ## Verifying this change This change added tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink rest_client_final Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4569.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4569 commit 6085639ad5438228ff56d66a4988cf52cbe850b2 Author: zentol Date: 2017-08-16T13:17:45Z [FLINK-7040] [rest] Add basics for REST communication commit
[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/4569 [FLINK-7040] [REST] Add basics for REST communication ## What is the purpose of the change This PR implements the fundamentals for generic Client/Server REST communication that will be used for the client/cluster communication, the WebRuntimeMonitor and queryable state. Endpoints `Endpoints` are the main runtime component that have to be started before any communication can happen. Their primary purpose is setting up the underlying netty stack. The `RestClientEndpoint` is a fully-functional class that provides an asynchronous API for sending requests/receiving responses based around `CompleteFutures`. Requests are sent in a synchronous fashion; a new request is only sent out after a response was received for the previous request (for simplicity). The `RestServerEndpoint` is an abstract class that is very similar to the `WebRuntimeMonitor`. Implementations have to implement a single method `abstract Collection> initializeHandlers();` that returns a collection of handlers that should be registered. Messages To send a request the client accepts 3 arguments: * a `MessageHeaders` * a `RequestBody` * a `ParameterMapper` `RequestBodies` represent the http message body, and will be converted to JSON using jackson-databind. `ParameterMappers` are used to assemble the final url, including query and path parameters `MessageHeaders` are stateless objects that define a link between requests and responses as well as provide meta-data about the communication for this particular pair. Headers have generic type arguments for requests and responses and provide some level of type-safeness; if the client and server use the same headers class the request/response type, url, http status codes etc. are all well-defined. Essentially, headers provide a tight coupling between handlers, clients, requests and responses (provided that implementations don't define arbitrary headers on the fly). For example, to define the communication for submitting a job one would define a `JobSubmitRequestBody` that contains the serialized job graph, a `JobSubmitResponseBody` containing the URL to track the job status and a `JobSubmitHeaders` class that define the HTTP method (POST), the url (e.g `/submit`, the response http status code (ACCEPTED). Handlers Someone has to deal with requests and send out responses, which is where the `AbstractRestHandler` comes into play. This is an abstract class that manages all the netty stuff, and, just like the `MessageHeaders`, has generic type arguments for a `RequestBody` and `ResponseBody`. Only a single method must be implemented, which accepts a request and returns a response: `abstract CompletableFuturehandleRequest(@Nonnull HandlerRequest request);` A `HandlerRequest` contains a `RequestBody` and maps for path/query parameters. A `HandlerResponse` contains either a `ResponseBody`, or an `ErrorResponse`. ## Brief change log - Add `MessageHeaders`, `Request-/ResponseBody` for modeling messages - Add `AbstractRestHandler`, `HandlerRequest/-Response` for message processing - Add `RestClient-/-ServerEndpoint` that setup netty ## Verifying this change This change added tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink rest_client_final Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4569.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4569 commit 6085639ad5438228ff56d66a4988cf52cbe850b2 Author: zentol Date: 2017-08-16T13:17:45Z [FLINK-7040] [rest] Add basics for REST communication commit af95d0729595d1c707ef3041b8fbdbc21c0d0d4a Author: zentol Date: 2017-08-21T09:53:13Z Add better error message for get requests with a body commit 5a7a8c2877f2b25cc6281462007a5ff03de40781 Author: zentol Date:
[jira] [Created] (FLINK-7486) flink-mesos: Support for adding unique attribute / group_by attribute constraints
Bhumika Bayani created FLINK-7486: - Summary: flink-mesos: Support for adding unique attribute / group_by attribute constraints Key: FLINK-7486 URL: https://issues.apache.org/jira/browse/FLINK-7486 Project: Flink Issue Type: Improvement Components: Mesos Affects Versions: 1.3.2 Reporter: Bhumika Bayani In our setup, we have multiple mesos-workers. Inspite of this, flink application master most of the times ends up spawning all task-managers on same mesos-worker. We intend to ensure HA of task managers. We would like to make sure each task-manager is running on different mesos-worker as well as such mesos-worker which does not share the AZ attribute with earlier task manager instances. Netflix-fenzo supports adding UniqueHostAttribute and BalancedHostAttribute contraints. Flink-mesos should also enable us to add these kind of constraints. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7483) BlobCache cleanup timer not reset after job re-registration
[ https://issues.apache.org/jira/browse/FLINK-7483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135209#comment-16135209 ] ASF GitHub Bot commented on FLINK-7483: --- Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4568 lgtm > BlobCache cleanup timer not reset after job re-registration > --- > > Key: FLINK-7483 > URL: https://issues.apache.org/jira/browse/FLINK-7483 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > Since FLINK-7057, the blob cache handles cleanup via {{registerJob}} and > {{releaseJob}} calls where the latter sets a cleanup interval. > {{registerJob}}, however, forgets to reset this if the job is re-registered > again and so the job's blobs will be cleaned up although it is still used! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4568: [FLINK-7483][blob] prevent cleanup of re-registered jobs
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/4568 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-1180) Add support for Hadoop MapReduce.* API Mappers and Reducers
[ https://issues.apache.org/jira/browse/FLINK-1180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-1180. --- Resolution: Won't Do > Add support for Hadoop MapReduce.* API Mappers and Reducers > --- > > Key: FLINK-1180 > URL: https://issues.apache.org/jira/browse/FLINK-1180 > Project: Flink > Issue Type: Task > Components: DataSet API >Affects Versions: 0.7.0-incubating >Reporter: Mohitdeep Singh >Assignee: Mohitdeep Singh >Priority: Minor > Labels: hadoop > > Flink currently supports hadoop mapred mapper and reduce function but not via > mapreduce api. > Reference: email exchange on flink mailing list. > "...Another option would be to extend the Hadoop Compatibility Layer. Right > now, we have wrappers for Hadoop's mapred-API function (Mapper, Reducer), but > not for the mapreduce-API functions [2]. Having wrappers for mapreduce-API > functions would also be cool. There is no JIRA for this issue yet. " -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-838) Implement full Hadoop Compatibility for Apache Flink
[ https://issues.apache.org/jira/browse/FLINK-838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-838. -- Resolution: Won't Do > Implement full Hadoop Compatibility for Apache Flink > > > Key: FLINK-838 > URL: https://issues.apache.org/jira/browse/FLINK-838 > Project: Flink > Issue Type: Improvement > Components: Batch Connectors and Input/Output Formats, DataSet API >Reporter: GitHub Import >Assignee: Artem Tsikiridis > Labels: github-import > > This is a meta issue for tracking @atsikiridis progress with implementing a > full Hadoop Compatibliltiy Layer for Stratosphere. > Some documentation can be found in the Wiki: > https://github.com/stratosphere/stratosphere/wiki/%5BGSoC-14%5D-A-Hadoop-abstraction-layer-for-Stratosphere-(Project-Map-and-Notes) > As well as the project proposal: > https://github.com/stratosphere/stratosphere/wiki/GSoC-2014-Project-Proposal-Draft-by-Artem-Tsikiridis > Most importantly, there is the following **schedule**: > *19 May - 27 June (Midterm)* > 1) Work on the Hadoop tasks, their Context and the mapping of Hadoop's > Configuration to the one of Stratosphere. By successfully bridging the Hadoop > tasks with Stratosphere, we already cover the most basic Hadoop Jobs. This > can be determined by running some popular Hadoop examples on Stratosphere > (e.g. WordCount, k-means, join) (4 - 5 weeks) > 2) Understand how the running of these jobs works (e.g. command line > interface) for the wrapper. Implement how will the user run them. (1 - 2 > weeks). > *27 June - 11 August* > 1) Continue wrapping more "advanced" Hadoop Interfaces (Comparators, > Partitioners, Distributed Cache etc.) There are quite a few interfaces and it > will be a challenge to support all of them. (5 full weeks) > 2) Profiling of the application and optimizations (if applicable) > *11 August - 18 August* > Write documentation on code, write a README with care and add more > unit-tests. (1 week) > Imported from GitHub > Url: https://github.com/stratosphere/stratosphere/issues/838 > Created by: [rmetzger|https://github.com/rmetzger] > Labels: core, enhancement, parent-for-major-feature, > Milestone: Release 0.7 (unplanned) > Created at: Tue May 20 10:11:34 CEST 2014 > State: open -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7477) Use "hadoop classpath" to augment classpath when available
[ https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135189#comment-16135189 ] ASF GitHub Bot commented on FLINK-7477: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/4566 > Use "hadoop classpath" to augment classpath when available > -- > > Key: FLINK-7477 > URL: https://issues.apache.org/jira/browse/FLINK-7477 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Currently, some cloud environments don't properly put the Hadoop jars into > {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should > check in {{config.sh}} if the {{hadoop}} binary is on the path and augment > our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in > our scripts. > This will improve the out-of-box experience of users that otherwise have to > manually set {{HADOOP_CLASSPATH}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4566: [FLINK-7477] [FLINK-7480] Various improvements to ...
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/4566 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-7480) Set HADOOP_CONF_DIR to sane default if not set
[ https://issues.apache.org/jira/browse/FLINK-7480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7480. --- Resolution: Fixed Implemented in a3143bcb0dd2895025bd6f693f4240604a5f1840 > Set HADOOP_CONF_DIR to sane default if not set > -- > > Key: FLINK-7480 > URL: https://issues.apache.org/jira/browse/FLINK-7480 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Currently, both AWS and GCE don't have a {{HADOOP_CONF_DIR}} set by default. > This makes the out-of-box experience on these cloud environments bad because > not setting it results in errors that are not obviously clear. > In case {{HADOOP_CONF_DIR}} is not set we should check if > {{/etc/hadoop/conf}} exits and set {{HADOOP_CONF_DIR}} to that. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7477) Use "hadoop classpath" to augment classpath when available
[ https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7477. --- Resolution: Fixed Implemented in 0a0f6ed6c3d6cff702e4322293340274bea5e7d9 > Use "hadoop classpath" to augment classpath when available > -- > > Key: FLINK-7477 > URL: https://issues.apache.org/jira/browse/FLINK-7477 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Currently, some cloud environments don't properly put the Hadoop jars into > {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should > check in {{config.sh}} if the {{hadoop}} binary is on the path and augment > our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in > our scripts. > This will improve the out-of-box experience of users that otherwise have to > manually set {{HADOOP_CLASSPATH}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7300) End-to-end tests are instable on Travis
[ https://issues.apache.org/jira/browse/FLINK-7300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135176#comment-16135176 ] Aljoscha Krettek commented on FLINK-7300: - I think you're right, we're logging exceptions that don't indicate a real problem. Changing this looks like a bigger task, though, and doesn't help with the immediate problem of the unstable end-to-end test. > End-to-end tests are instable on Travis > --- > > Key: FLINK-7300 > URL: https://issues.apache.org/jira/browse/FLINK-7300 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Aljoscha Krettek > Labels: test-stability > Fix For: 1.4.0 > > > It seems like the end-to-end tests are instable, causing the {{misc}} build > profile to sporadically fail. > Incorrect matched output: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/258569408/log.txt?X-Amz-Expires=30=20170731T060526Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request=host=4ef9ff5e60fe06db53a84be8d73775a46cb595a8caeb806b05dbbf824d3b69e8 > Another failure example of a different cause then the above, also on the > end-to-end tests: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/258841693/log.txt?X-Amz-Expires=30=20170731T060007Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request=host=4a106b3990228b7628c250cc15407bc2c131c8332e1a94ad68d649fe8d32d726 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7484) com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5
[ https://issues.apache.org/jira/browse/FLINK-7484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7484: Component/s: CEP > com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: > Index: 7, Size: 5 > --- > > Key: FLINK-7484 > URL: https://issues.apache.org/jira/browse/FLINK-7484 > Project: Flink > Issue Type: Bug > Components: CEP, DataStream API, Scala API >Affects Versions: 1.3.2 > Environment: Flink 1.3.2 , Yarn Cluster, FsStateBackend >Reporter: Shashank Agarwal > > I am using many CEP's and Global Window. I am getting following error > sometimes and application crashes. I have checked logically there's no flow > in the program. Here ItemPojo is a Pojo class and we are using > java.utill.List[ItemPojo]. We are using Scala DataStream API please find > attached logs. > {code} > 2017-08-17 10:04:12,814 INFO org.apache.flink.runtime.taskmanager.Task > - TriggerWindow(GlobalWindows(), > ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c}, > co.thirdwatch.trigger.TransactionTrigger@5707c1cb, > WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: > Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1) switched > from RUNNING to FAILED. > com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: > Index: 7, Size: 5 > Serialization trace: > category (co.thirdwatch.pojo.ItemPojo) > underlying (scala.collection.convert.Wrappers$SeqWrapper) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43) > at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:190) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74) > at > org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:34) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) > at > org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:77) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5 > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > at java.util.ArrayList.get(ArrayList.java:429) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) > ... 22 more > 2017-08-17 10:04:12,816 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for TriggerWindow(GlobalWindows(), > ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c}, > co.thirdwatch.trigger.TransactionTrigger@5707c1cb, > WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: > Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1). > 2017-08-17 10:04:12,816 INFO
[jira] [Commented] (FLINK-7485) Using DataView interface to improve (MIN/MAX)WithRetractAggFunction.
[ https://issues.apache.org/jira/browse/FLINK-7485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135172#comment-16135172 ] Jark Wu commented on FLINK-7485: Hi [~sunjincheng121] this issue is duplicated with FLINK-7208, so I closed it. > Using DataView interface to improve (MIN/MAX)WithRetractAggFunction. > > > Key: FLINK-7485 > URL: https://issues.apache.org/jira/browse/FLINK-7485 > Project: Flink > Issue Type: Improvement >Reporter: sunjincheng > > Currently MIN/MAX using memory structure {{HashMap}} to store all values, > after FLINK-7206 we can improve them by using {{DataView}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
[ https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135159#comment-16135159 ] ASF GitHub Bot commented on FLINK-6442: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r134186937 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: TableConfig) { * tEnv.sql(s"SELECT * FROM $table") * }}} * -* @param query The SQL query to evaluate. +* @param sql The SQL string to evaluate. * @return The result of the query as Table. */ - def sql(query: String): Table = { + @deprecated + def sql(sql: String): Table = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query -val parsed = planner.parse(query) +val parsed = planner.parse(sql) // validate the sql query val validated = planner.validate(parsed) // transform to a relational tree val relational = planner.rel(validated) - new Table(this, LogicalRelNode(relational.rel)) } /** +* Evaluates a SQL Select query on registered tables and retrieves the result as a +* [[Table]]. +* +* All tables referenced by the query must be registered in the TableEnvironment. But +* [[Table.toString]] will automatically register an unique table name and return the +* table name. So it allows to call SQL directly on tables like this: +* +* {{{ +* val table: Table = ... +* // the table is not registered to the table environment +* tEnv.sqlSelect(s"SELECT * FROM $table") +* }}} +* +* @param sql The SQL string to evaluate. +* @return The result of the query as Table or null of the DML insert operation. +*/ + def sqlQuery(sql: String): Table = { +val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) +// parse the sql query +val parsed = planner.parse(sql) +if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) { + // validate the sql query + val validated = planner.validate(parsed) + // transform to a relational tree + val relational = planner.rel(validated) + new Table(this, LogicalRelNode(relational.rel)) +} else { + throw new TableException( +"Unsupported sql query! sqlQuery Only accept SELECT, UNION, INTERSECT, EXCEPT, VALUES, " + + "WITH, ORDER_BY, EXPLICIT_TABLE") +} + } + + /** +* Evaluates a SQL statement which must be an SQL Data Manipulation Language (DML) statement, +* such as INSERT, UPDATE or DELETE; or an SQL statement that returns nothing, such as a DDL +* statement; +* Currently only support a SQL INSERT statement on registered tables and has no return value. +* +* All tables referenced by the query must be registered in the TableEnvironment. But +* [[Table.toString]] will automatically register an unique table name and return the +* table name. So it allows to call SQL directly on tables like this: +* +* {{{ +* /// register table sink for insertion +* tEnv.registerTableSink("target_table", ... +* val sourceTable: Table = ... +* // sourceTable is not registered to the table environment +* tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM $sourceTable") +* }}} +* +* @param sql The SQL String to evaluate. +*/ + def sqlUpdate(sql: String): Unit = { +sqlUpdate(sql, QueryConfig.getQueryConfigFromTableEnv(this)) + } + + /** +* Evaluates a SQL statement which must be an SQL Data Manipulation Language (DML) statement, +* such as INSERT, UPDATE or DELETE; or an SQL statement that returns nothing, such as a DDL +* statement; +* Currently only support a SQL INSERT statement on registered tables and has no return value. +* +* All tables referenced by the query must be registered in the TableEnvironment. But +* [[Table.toString]] will automatically register an unique table name and return the +* table name. So it allows to call SQL directly on tables like this: +* +* {{{ +* /// register table sink for insertion +* tEnv.registerTableSink("target_table", ... +* val sourceTable: Table = ... +* // sourceTable is not registered to the table environment +* tEnv.sqlInsert(s"INSERT INTO target_table
[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
[ https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135158#comment-16135158 ] ASF GitHub Bot commented on FLINK-6442: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r134228841 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CsvSQLTableSink.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.core.fs.FileSystem.WriteMode +import org.apache.flink.table.sinks.CsvTableSink +import org.apache.flink.types.Row + +class CsvSQLTableSink( --- End diff -- Yes, I should remove this class since I'm not sure wether to add it as a built-in sink table that schema can be declared. > Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in > SQL > --- > > Key: FLINK-6442 > URL: https://issues.apache.org/jira/browse/FLINK-6442 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > Currently in TableAPI there’s only registration method for source table, > when we use SQL writing a streaming job, we should add additional part for > the sink, like TableAPI does: > {code} > val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" > val t = StreamTestData.getSmall3TupleDataStream(env) > tEnv.registerDataStream("MyTable", t) > // one way: invoke tableAPI’s writeToSink method directly > val result = tEnv.sql(sqlQuery) > result.writeToSink(new YourStreamSink) > // another way: convert to datastream first and then invoke addSink > val result = tEnv.sql(sqlQuery).toDataStream[Row] > result.addSink(new StreamITCase.StringSink) > {code} > From the api we can see the sink table always be a derived table because its > 'schema' is inferred from the result type of upstream query. > Compare to traditional RDBMS which support DML syntax, a query with a target > output could be written like this: > {code} > insert into table target_table_name > [(column_name [ ,...n ])] > query > {code} > The equivalent form of the example above is as follows: > {code} > tEnv.registerTableSink("targetTable", new YourSink) > val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" > val result = tEnv.sql(sql) > {code} > It is supported by Calcite’s grammar: > {code} > insert:( INSERT | UPSERT ) INTO tablePrimary > [ '(' column [, column ]* ')' ] > query > {code} > I'd like to extend Flink TableAPI to support such feature. see design doc: > https://goo.gl/n3phK5 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
[ https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135156#comment-16135156 ] ASF GitHub Bot commented on FLINK-6442: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r134186403 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: TableConfig) { * tEnv.sql(s"SELECT * FROM $table") * }}} * -* @param query The SQL query to evaluate. +* @param sql The SQL string to evaluate. * @return The result of the query as Table. */ - def sql(query: String): Table = { + @deprecated + def sql(sql: String): Table = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query -val parsed = planner.parse(query) +val parsed = planner.parse(sql) // validate the sql query val validated = planner.validate(parsed) // transform to a relational tree val relational = planner.rel(validated) - new Table(this, LogicalRelNode(relational.rel)) } /** +* Evaluates a SQL Select query on registered tables and retrieves the result as a +* [[Table]]. +* +* All tables referenced by the query must be registered in the TableEnvironment. But +* [[Table.toString]] will automatically register an unique table name and return the +* table name. So it allows to call SQL directly on tables like this: +* +* {{{ +* val table: Table = ... +* // the table is not registered to the table environment +* tEnv.sqlSelect(s"SELECT * FROM $table") +* }}} +* +* @param sql The SQL string to evaluate. +* @return The result of the query as Table or null of the DML insert operation. +*/ + def sqlQuery(sql: String): Table = { +val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) +// parse the sql query +val parsed = planner.parse(sql) +if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) { + // validate the sql query + val validated = planner.validate(parsed) + // transform to a relational tree + val relational = planner.rel(validated) + new Table(this, LogicalRelNode(relational.rel)) +} else { + throw new TableException( +"Unsupported sql query! sqlQuery Only accept SELECT, UNION, INTERSECT, EXCEPT, VALUES, " + --- End diff -- Not only INSERT but also UPDATE/DELETE ... It'll reject all kinds of sql except SELECT, EXCEPT, INTERSECT, UNION, VALUES, ORDER_BY, EXPLICIT_TABLE. > Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in > SQL > --- > > Key: FLINK-6442 > URL: https://issues.apache.org/jira/browse/FLINK-6442 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > Currently in TableAPI there’s only registration method for source table, > when we use SQL writing a streaming job, we should add additional part for > the sink, like TableAPI does: > {code} > val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" > val t = StreamTestData.getSmall3TupleDataStream(env) > tEnv.registerDataStream("MyTable", t) > // one way: invoke tableAPI’s writeToSink method directly > val result = tEnv.sql(sqlQuery) > result.writeToSink(new YourStreamSink) > // another way: convert to datastream first and then invoke addSink > val result = tEnv.sql(sqlQuery).toDataStream[Row] > result.addSink(new StreamITCase.StringSink) > {code} > From the api we can see the sink table always be a derived table because its > 'schema' is inferred from the result type of upstream query. > Compare to traditional RDBMS which support DML syntax, a query with a target > output could be written like this: > {code} > insert into table target_table_name > [(column_name [ ,...n ])] > query > {code} > The equivalent form of the example above is as follows: > {code} > tEnv.registerTableSink("targetTable", new YourSink) > val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" > val result = tEnv.sql(sql) > {code} > It is supported by Calcite’s grammar: > {code} > insert:( INSERT | UPSERT ) INTO tablePrimary > [ '(' column [, column ]* ')' ] > query > {code} > I'd like to extend Flink TableAPI to support such feature. see design doc: >
[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
[ https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135155#comment-16135155 ] ASF GitHub Bot commented on FLINK-6442: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r134185829 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: TableConfig) { * tEnv.sql(s"SELECT * FROM $table") * }}} * -* @param query The SQL query to evaluate. +* @param sql The SQL string to evaluate. * @return The result of the query as Table. */ - def sql(query: String): Table = { + @deprecated + def sql(sql: String): Table = { --- End diff -- sounds reasonable. > Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in > SQL > --- > > Key: FLINK-6442 > URL: https://issues.apache.org/jira/browse/FLINK-6442 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > Currently in TableAPI there’s only registration method for source table, > when we use SQL writing a streaming job, we should add additional part for > the sink, like TableAPI does: > {code} > val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" > val t = StreamTestData.getSmall3TupleDataStream(env) > tEnv.registerDataStream("MyTable", t) > // one way: invoke tableAPI’s writeToSink method directly > val result = tEnv.sql(sqlQuery) > result.writeToSink(new YourStreamSink) > // another way: convert to datastream first and then invoke addSink > val result = tEnv.sql(sqlQuery).toDataStream[Row] > result.addSink(new StreamITCase.StringSink) > {code} > From the api we can see the sink table always be a derived table because its > 'schema' is inferred from the result type of upstream query. > Compare to traditional RDBMS which support DML syntax, a query with a target > output could be written like this: > {code} > insert into table target_table_name > [(column_name [ ,...n ])] > query > {code} > The equivalent form of the example above is as follows: > {code} > tEnv.registerTableSink("targetTable", new YourSink) > val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" > val result = tEnv.sql(sql) > {code} > It is supported by Calcite’s grammar: > {code} > insert:( INSERT | UPSERT ) INTO tablePrimary > [ '(' column [, column ]* ')' ] > query > {code} > I'd like to extend Flink TableAPI to support such feature. see design doc: > https://goo.gl/n3phK5 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
[ https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135157#comment-16135157 ] ASF GitHub Bot commented on FLINK-6442: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r134184973 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -132,6 +132,44 @@ abstract class StreamTableEnvironment( } /** +* Registers an external [[TableSink]] in this [[TableEnvironment]]'s catalog. +* Registered sink tables can be referenced in SQL DML clause. +* +* Examples: +* +* - predefine a table sink with schema +* {{{ +* val fieldTypes: Array[TypeInformation[_]] = Array( #TODO ) +* val fieldNames: Array[String] = Array("a", "b", "c") +* val tableSink: TableSink = new YourTableSinkImpl(fieldTypes, Option(fieldNames)) +* }}} +* +* - register an alias for this table sink to catalog +* {{{ +* tableEnv.registerTableSink("example_sink_table", tableSink) +* }}} +* +* - use the registered sink in SQL directly +* {{{ +* tableEnv.sqlInsert("INSERT INTO example_sink_table SELECT a, b, c FROM sourceTable") +* }}} +* +* @param name The name under which the [[TableSink]] is registered. +* @param tableSink The [[TableSink]] to register. +*/ + override def registerTableSink(name: String, tableSink: TableSink[_]): Unit = { +checkValidTableName(name) + +tableSink match { + case t @ (_: AppendStreamTableSink[_] | _: UpsertStreamTableSink[_] | +_: RetractStreamTableSink[_]) => +registerTableInternal(name, new TableSinkTable(t)) + case _ => +throw new TableException("BatchTableSink can not be registered in StreamTableEnvironment") --- End diff -- make sense to me. > Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in > SQL > --- > > Key: FLINK-6442 > URL: https://issues.apache.org/jira/browse/FLINK-6442 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > Currently in TableAPI there’s only registration method for source table, > when we use SQL writing a streaming job, we should add additional part for > the sink, like TableAPI does: > {code} > val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" > val t = StreamTestData.getSmall3TupleDataStream(env) > tEnv.registerDataStream("MyTable", t) > // one way: invoke tableAPI’s writeToSink method directly > val result = tEnv.sql(sqlQuery) > result.writeToSink(new YourStreamSink) > // another way: convert to datastream first and then invoke addSink > val result = tEnv.sql(sqlQuery).toDataStream[Row] > result.addSink(new StreamITCase.StringSink) > {code} > From the api we can see the sink table always be a derived table because its > 'schema' is inferred from the result type of upstream query. > Compare to traditional RDBMS which support DML syntax, a query with a target > output could be written like this: > {code} > insert into table target_table_name > [(column_name [ ,...n ])] > query > {code} > The equivalent form of the example above is as follows: > {code} > tEnv.registerTableSink("targetTable", new YourSink) > val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" > val result = tEnv.sql(sql) > {code} > It is supported by Calcite’s grammar: > {code} > insert:( INSERT | UPSERT ) INTO tablePrimary > [ '(' column [, column ]* ')' ] > query > {code} > I'd like to extend Flink TableAPI to support such feature. see design doc: > https://goo.gl/n3phK5 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...
Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r134185829 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: TableConfig) { * tEnv.sql(s"SELECT * FROM $table") * }}} * -* @param query The SQL query to evaluate. +* @param sql The SQL string to evaluate. * @return The result of the query as Table. */ - def sql(query: String): Table = { + @deprecated + def sql(sql: String): Table = { --- End diff -- sounds reasonable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...
Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r134186403 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: TableConfig) { * tEnv.sql(s"SELECT * FROM $table") * }}} * -* @param query The SQL query to evaluate. +* @param sql The SQL string to evaluate. * @return The result of the query as Table. */ - def sql(query: String): Table = { + @deprecated + def sql(sql: String): Table = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query -val parsed = planner.parse(query) +val parsed = planner.parse(sql) // validate the sql query val validated = planner.validate(parsed) // transform to a relational tree val relational = planner.rel(validated) - new Table(this, LogicalRelNode(relational.rel)) } /** +* Evaluates a SQL Select query on registered tables and retrieves the result as a +* [[Table]]. +* +* All tables referenced by the query must be registered in the TableEnvironment. But +* [[Table.toString]] will automatically register an unique table name and return the +* table name. So it allows to call SQL directly on tables like this: +* +* {{{ +* val table: Table = ... +* // the table is not registered to the table environment +* tEnv.sqlSelect(s"SELECT * FROM $table") +* }}} +* +* @param sql The SQL string to evaluate. +* @return The result of the query as Table or null of the DML insert operation. +*/ + def sqlQuery(sql: String): Table = { +val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) +// parse the sql query +val parsed = planner.parse(sql) +if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) { + // validate the sql query + val validated = planner.validate(parsed) + // transform to a relational tree + val relational = planner.rel(validated) + new Table(this, LogicalRelNode(relational.rel)) +} else { + throw new TableException( +"Unsupported sql query! sqlQuery Only accept SELECT, UNION, INTERSECT, EXCEPT, VALUES, " + --- End diff -- Not only INSERT but also UPDATE/DELETE ... It'll reject all kinds of sql except SELECT, EXCEPT, INTERSECT, UNION, VALUES, ORDER_BY, EXPLICIT_TABLE. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...
Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r134186937 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: TableConfig) { * tEnv.sql(s"SELECT * FROM $table") * }}} * -* @param query The SQL query to evaluate. +* @param sql The SQL string to evaluate. * @return The result of the query as Table. */ - def sql(query: String): Table = { + @deprecated + def sql(sql: String): Table = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query -val parsed = planner.parse(query) +val parsed = planner.parse(sql) // validate the sql query val validated = planner.validate(parsed) // transform to a relational tree val relational = planner.rel(validated) - new Table(this, LogicalRelNode(relational.rel)) } /** +* Evaluates a SQL Select query on registered tables and retrieves the result as a +* [[Table]]. +* +* All tables referenced by the query must be registered in the TableEnvironment. But +* [[Table.toString]] will automatically register an unique table name and return the +* table name. So it allows to call SQL directly on tables like this: +* +* {{{ +* val table: Table = ... +* // the table is not registered to the table environment +* tEnv.sqlSelect(s"SELECT * FROM $table") +* }}} +* +* @param sql The SQL string to evaluate. +* @return The result of the query as Table or null of the DML insert operation. +*/ + def sqlQuery(sql: String): Table = { +val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) +// parse the sql query +val parsed = planner.parse(sql) +if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) { + // validate the sql query + val validated = planner.validate(parsed) + // transform to a relational tree + val relational = planner.rel(validated) + new Table(this, LogicalRelNode(relational.rel)) +} else { + throw new TableException( +"Unsupported sql query! sqlQuery Only accept SELECT, UNION, INTERSECT, EXCEPT, VALUES, " + + "WITH, ORDER_BY, EXPLICIT_TABLE") +} + } + + /** +* Evaluates a SQL statement which must be an SQL Data Manipulation Language (DML) statement, +* such as INSERT, UPDATE or DELETE; or an SQL statement that returns nothing, such as a DDL +* statement; +* Currently only support a SQL INSERT statement on registered tables and has no return value. +* +* All tables referenced by the query must be registered in the TableEnvironment. But +* [[Table.toString]] will automatically register an unique table name and return the +* table name. So it allows to call SQL directly on tables like this: +* +* {{{ +* /// register table sink for insertion +* tEnv.registerTableSink("target_table", ... +* val sourceTable: Table = ... +* // sourceTable is not registered to the table environment +* tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM $sourceTable") +* }}} +* +* @param sql The SQL String to evaluate. +*/ + def sqlUpdate(sql: String): Unit = { +sqlUpdate(sql, QueryConfig.getQueryConfigFromTableEnv(this)) + } + + /** +* Evaluates a SQL statement which must be an SQL Data Manipulation Language (DML) statement, +* such as INSERT, UPDATE or DELETE; or an SQL statement that returns nothing, such as a DDL +* statement; +* Currently only support a SQL INSERT statement on registered tables and has no return value. +* +* All tables referenced by the query must be registered in the TableEnvironment. But +* [[Table.toString]] will automatically register an unique table name and return the +* table name. So it allows to call SQL directly on tables like this: +* +* {{{ +* /// register table sink for insertion +* tEnv.registerTableSink("target_table", ... +* val sourceTable: Table = ... +* // sourceTable is not registered to the table environment +* tEnv.sqlInsert(s"INSERT INTO target_table SELECT * FROM $sourceTable") +* }}} +* +* @param sql The SQL String to evaluate. +* @param config The [[QueryConfig]] to use. +*/ + def sqlUpdate(sql: String, config: QueryConfig): Unit = {
[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...
Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r134184973 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -132,6 +132,44 @@ abstract class StreamTableEnvironment( } /** +* Registers an external [[TableSink]] in this [[TableEnvironment]]'s catalog. +* Registered sink tables can be referenced in SQL DML clause. +* +* Examples: +* +* - predefine a table sink with schema +* {{{ +* val fieldTypes: Array[TypeInformation[_]] = Array( #TODO ) +* val fieldNames: Array[String] = Array("a", "b", "c") +* val tableSink: TableSink = new YourTableSinkImpl(fieldTypes, Option(fieldNames)) +* }}} +* +* - register an alias for this table sink to catalog +* {{{ +* tableEnv.registerTableSink("example_sink_table", tableSink) +* }}} +* +* - use the registered sink in SQL directly +* {{{ +* tableEnv.sqlInsert("INSERT INTO example_sink_table SELECT a, b, c FROM sourceTable") +* }}} +* +* @param name The name under which the [[TableSink]] is registered. +* @param tableSink The [[TableSink]] to register. +*/ + override def registerTableSink(name: String, tableSink: TableSink[_]): Unit = { +checkValidTableName(name) + +tableSink match { + case t @ (_: AppendStreamTableSink[_] | _: UpsertStreamTableSink[_] | +_: RetractStreamTableSink[_]) => +registerTableInternal(name, new TableSinkTable(t)) + case _ => +throw new TableException("BatchTableSink can not be registered in StreamTableEnvironment") --- End diff -- make sense to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...
Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r134228841 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CsvSQLTableSink.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.core.fs.FileSystem.WriteMode +import org.apache.flink.table.sinks.CsvTableSink +import org.apache.flink.types.Row + +class CsvSQLTableSink( --- End diff -- Yes, I should remove this class since I'm not sure wether to add it as a built-in sink table that schema can be declared. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
[ https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135152#comment-16135152 ] ASF GitHub Bot commented on FLINK-6442: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r134185390 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: TableConfig) { * tEnv.sql(s"SELECT * FROM $table") * }}} * -* @param query The SQL query to evaluate. +* @param sql The SQL string to evaluate. * @return The result of the query as Table. */ - def sql(query: String): Table = { + @deprecated --- End diff -- yes, the replacement should be mentioned here. > Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in > SQL > --- > > Key: FLINK-6442 > URL: https://issues.apache.org/jira/browse/FLINK-6442 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > Currently in TableAPI there’s only registration method for source table, > when we use SQL writing a streaming job, we should add additional part for > the sink, like TableAPI does: > {code} > val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" > val t = StreamTestData.getSmall3TupleDataStream(env) > tEnv.registerDataStream("MyTable", t) > // one way: invoke tableAPI’s writeToSink method directly > val result = tEnv.sql(sqlQuery) > result.writeToSink(new YourStreamSink) > // another way: convert to datastream first and then invoke addSink > val result = tEnv.sql(sqlQuery).toDataStream[Row] > result.addSink(new StreamITCase.StringSink) > {code} > From the api we can see the sink table always be a derived table because its > 'schema' is inferred from the result type of upstream query. > Compare to traditional RDBMS which support DML syntax, a query with a target > output could be written like this: > {code} > insert into table target_table_name > [(column_name [ ,...n ])] > query > {code} > The equivalent form of the example above is as follows: > {code} > tEnv.registerTableSink("targetTable", new YourSink) > val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" > val result = tEnv.sql(sql) > {code} > It is supported by Calcite’s grammar: > {code} > insert:( INSERT | UPSERT ) INTO tablePrimary > [ '(' column [, column ]* ')' ] > query > {code} > I'd like to extend Flink TableAPI to support such feature. see design doc: > https://goo.gl/n3phK5 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...
Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r134185390 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: TableConfig) { * tEnv.sql(s"SELECT * FROM $table") * }}} * -* @param query The SQL query to evaluate. +* @param sql The SQL string to evaluate. * @return The result of the query as Table. */ - def sql(query: String): Table = { + @deprecated --- End diff -- yes, the replacement should be mentioned here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7483) BlobCache cleanup timer not reset after job re-registration
[ https://issues.apache.org/jira/browse/FLINK-7483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135141#comment-16135141 ] ASF GitHub Bot commented on FLINK-7483: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4568 FYI: the test instability of `JobManagerCleanupITCase` will be fixed by #4358, the next BLOB-PR in line - I don't want to mess up the following PRs anymore (again) by integrating it separately or here > BlobCache cleanup timer not reset after job re-registration > --- > > Key: FLINK-7483 > URL: https://issues.apache.org/jira/browse/FLINK-7483 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > Since FLINK-7057, the blob cache handles cleanup via {{registerJob}} and > {{releaseJob}} calls where the latter sets a cleanup interval. > {{registerJob}}, however, forgets to reset this if the job is re-registered > again and so the job's blobs will be cleaned up although it is still used! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4568: [FLINK-7483][blob] prevent cleanup of re-registered jobs
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4568 FYI: the test instability of `JobManagerCleanupITCase` will be fixed by #4358, the next BLOB-PR in line - I don't want to mess up the following PRs anymore (again) by integrating it separately or here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7439) Support variable arguments for UDTF in SQL
[ https://issues.apache.org/jira/browse/FLINK-7439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135132#comment-16135132 ] ASF GitHub Bot commented on FLINK-7439: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/4536 @sunjincheng121 @fhueske I would be great if you can have a look ;-) > Support variable arguments for UDTF in SQL > -- > > Key: FLINK-7439 > URL: https://issues.apache.org/jira/browse/FLINK-7439 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently, both UDF and UDAF support variable parameters, but UDTF not. > FLINK-5882 supports variable UDTF for Table API only, but missed SQL. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4536: [FLINK-7439] [table] Support variable arguments for UDTF ...
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/4536 @sunjincheng121 @fhueske I would be great if you can have a look ;-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-7485) Using DataView interface to improve (MIN/MAX)WithRetractAggFunction.
[ https://issues.apache.org/jira/browse/FLINK-7485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-7485. -- Resolution: Duplicate > Using DataView interface to improve (MIN/MAX)WithRetractAggFunction. > > > Key: FLINK-7485 > URL: https://issues.apache.org/jira/browse/FLINK-7485 > Project: Flink > Issue Type: Improvement >Reporter: sunjincheng > > Currently MIN/MAX using memory structure {{HashMap}} to store all values, > after FLINK-7206 we can improve them by using {{DataView}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7484) com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5
[ https://issues.apache.org/jira/browse/FLINK-7484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135101#comment-16135101 ] Shashank Agarwal commented on FLINK-7484: - Hi [~yew1eb] It's very big code base, With lot of business logics. But i can give you overview and scenario. We are using scala streams. But due to some cassandra sink issue we convert scala stream to Java stream. So we convert internal objects to java object also. As per logs i guess this is the scenario. We use a ItemPojo class. {code:java} @SerialVersionUID(224567L) @UDT(keyspace = "cstable", name = "item") case class ItemPojo( @BeanProperty var item_id: String, @BeanProperty var product_title: String, @BeanProperty var price: String ) extends Serializable { def this() { this(null, null, null) } } {code} In a stream object we use java.util.List[ItemPojo] , It's not creating any issue till now we were using lot of CEP's and we were using in global window also. But after some time due to some need we have iterate over that list in global window. Than we are getting this error some time and application got crashed. {code:java} for (cItem <- cItemList) { some logic here. } {code} I think may be this is issue cause i am getting error after these. > com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: > Index: 7, Size: 5 > --- > > Key: FLINK-7484 > URL: https://issues.apache.org/jira/browse/FLINK-7484 > Project: Flink > Issue Type: Bug > Components: DataStream API, Scala API >Affects Versions: 1.3.2 > Environment: Flink 1.3.2 , Yarn Cluster, FsStateBackend >Reporter: Shashank Agarwal > > I am using many CEP's and Global Window. I am getting following error > sometimes and application crashes. I have checked logically there's no flow > in the program. Here ItemPojo is a Pojo class and we are using > java.utill.List[ItemPojo]. We are using Scala DataStream API please find > attached logs. > {code} > 2017-08-17 10:04:12,814 INFO org.apache.flink.runtime.taskmanager.Task > - TriggerWindow(GlobalWindows(), > ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c}, > co.thirdwatch.trigger.TransactionTrigger@5707c1cb, > WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: > Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1) switched > from RUNNING to FAILED. > com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: > Index: 7, Size: 5 > Serialization trace: > category (co.thirdwatch.pojo.ItemPojo) > underlying (scala.collection.convert.Wrappers$SeqWrapper) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43) > at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:190) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74) > at > org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:34) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) > at > org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:77) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at >
[jira] [Commented] (FLINK-7438) Some classes are eclipsed by classes in package scala
[ https://issues.apache.org/jira/browse/FLINK-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135084#comment-16135084 ] Hai Zhou commented on FLINK-7438: - Hi [~aljoscha] I would like to work on this issue.:D > Some classes are eclipsed by classes in package scala > - > > Key: FLINK-7438 > URL: https://issues.apache.org/jira/browse/FLINK-7438 > Project: Flink > Issue Type: Bug > Components: Build System, DataStream API >Reporter: Ted Yu >Priority: Minor > > Noticed the following during compilation: > {code} > [WARNING] > /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33: > warning: imported `OutputTag' is permanently hidden by definition of > object OutputTag in package scala > [WARNING] import org.apache.flink.util.{Collector, OutputTag} > [WARNING] ^ > [WARNING] > /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33: > warning: imported `OutputTag' is permanently hidden by definition of > class OutputTag in package scala > [WARNING] import org.apache.flink.util.{Collector, OutputTag} > {code} > We should avoid the warning e.r.t. OutputTag. > There may be other occurrences of similar warning. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7438) Some classes are eclipsed by classes in package scala
[ https://issues.apache.org/jira/browse/FLINK-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135076#comment-16135076 ] Hai Zhou edited comment on FLINK-7438 at 8/21/17 12:01 PM: --- I think the reason for this problem is [https://issues.scala-lang.org/browse/SI-8808]. this problem: {panel:title=org.apache.flink.streaming.api.scala.WindowedStream.scala} package org.apache.flink.streaming.api.scala import org.apache.flink.util.OutputTag // permanently hidden warning here class WindowedStream { @PublicEvolving def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = { // this OutputTag class is org.apache.flink.streaming.api.scala.OutputTag.scala javaStream.sideOutputLateData(outputTag) this } .. } {panel} {panel:title=org.apache.flink.streaming.api.scala.OutputTag.scala} package org.apache.flink.streaming.api.scala import org.apache.flink.util.{OutputTag => JOutputTag} class OutputTag[T: TypeInformation](id: String) extends JOutputTag[T](id, implicitly[TypeInformation[T]]) object OutputTag { def apply[T: TypeInformation](id: String): OutputTag[T] = new OutputTag(id) } {panel} Actually used "org.apache.flink.streaming.api.scala.OutputTag.scala" in WindowedStream class. So we remove "import org.apache.flink.util.OutputTag" in WindowedStream class, the warnning will disappear. was (Author: yew1eb): I think the reason for this problem is [https://issues.scala-lang.org/browse/SI-8808]. this problem: {panel:title=org.apache.flink.streaming.api.scala.WindowedStream.scala} package org.apache.flink.streaming.api.scala import org.apache.flink.util.OutputTag // permanently hidden warning here class WindowedStream { @PublicEvolving def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = { // this OutputTag class is org.apache.flink.streaming.api.scala.OutputTag.scala javaStream.sideOutputLateData(outputTag) this } .. } {panel} {panel:title=org.apache.flink.streaming.api.scala.OutputTag.scala} package org.apache.flink.streaming.api.scala import org.apache.flink.util.{OutputTag => JOutputTag} class OutputTag[T: TypeInformation](id: String) extends JOutputTag[T](id, implicitly[TypeInformation[T]]) object OutputTag { def apply[T: TypeInformation](id: String): OutputTag[T] = new OutputTag(id) } {panel} > Some classes are eclipsed by classes in package scala > - > > Key: FLINK-7438 > URL: https://issues.apache.org/jira/browse/FLINK-7438 > Project: Flink > Issue Type: Bug > Components: Build System, DataStream API >Reporter: Ted Yu >Priority: Minor > > Noticed the following during compilation: > {code} > [WARNING] > /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33: > warning: imported `OutputTag' is permanently hidden by definition of > object OutputTag in package scala > [WARNING] import org.apache.flink.util.{Collector, OutputTag} > [WARNING] ^ > [WARNING] > /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33: > warning: imported `OutputTag' is permanently hidden by definition of > class OutputTag in package scala > [WARNING] import org.apache.flink.util.{Collector, OutputTag} > {code} > We should avoid the warning e.r.t. OutputTag. > There may be other occurrences of similar warning. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7438) Some classes are eclipsed by classes in package scala
[ https://issues.apache.org/jira/browse/FLINK-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135076#comment-16135076 ] Hai Zhou commented on FLINK-7438: - I think the reason for this problem is [https://issues.scala-lang.org/browse/SI-8808]. this problem: {panel:title=org.apache.flink.streaming.api.scala.WindowedStream.scala} package org.apache.flink.streaming.api.scala import org.apache.flink.util.OutputTag // permanently hidden warning here class WindowedStream { @PublicEvolving def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = { // this OutputTag class is org.apache.flink.streaming.api.scala.OutputTag.scala javaStream.sideOutputLateData(outputTag) this } .. } {panel} {panel:title=org.apache.flink.streaming.api.scala.OutputTag.scala} package org.apache.flink.streaming.api.scala import org.apache.flink.util.{OutputTag => JOutputTag} class OutputTag[T: TypeInformation](id: String) extends JOutputTag[T](id, implicitly[TypeInformation[T]]) object OutputTag { def apply[T: TypeInformation](id: String): OutputTag[T] = new OutputTag(id) } {panel} > Some classes are eclipsed by classes in package scala > - > > Key: FLINK-7438 > URL: https://issues.apache.org/jira/browse/FLINK-7438 > Project: Flink > Issue Type: Bug > Components: Build System, DataStream API >Reporter: Ted Yu >Priority: Minor > > Noticed the following during compilation: > {code} > [WARNING] > /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33: > warning: imported `OutputTag' is permanently hidden by definition of > object OutputTag in package scala > [WARNING] import org.apache.flink.util.{Collector, OutputTag} > [WARNING] ^ > [WARNING] > /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33: > warning: imported `OutputTag' is permanently hidden by definition of > class OutputTag in package scala > [WARNING] import org.apache.flink.util.{Collector, OutputTag} > {code} > We should avoid the warning e.r.t. OutputTag. > There may be other occurrences of similar warning. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4358: [FLINK-7068][blob] change BlobService sub-classes for per...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4358 sorry for the mess, but let me also drag in #4568 and adapt the code in here (which is moved from `BlobCache` to `PermanentBlobCache` by this PR) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135066#comment-16135066 ] ASF GitHub Bot commented on FLINK-7068: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4358 sorry for the mess, but let me also drag in #4568 and adapt the code in here (which is moved from `BlobCache` to `PermanentBlobCache` by this PR) > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7485) Using DataView interface to improve (MIN/MAX)WithRetractAggFunction.
sunjincheng created FLINK-7485: -- Summary: Using DataView interface to improve (MIN/MAX)WithRetractAggFunction. Key: FLINK-7485 URL: https://issues.apache.org/jira/browse/FLINK-7485 Project: Flink Issue Type: Improvement Reporter: sunjincheng Currently MIN/MAX using memory structure {{HashMap}} to store all values, after FLINK-7206 we can improve them by using {{DataView}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7484) com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5
[ https://issues.apache.org/jira/browse/FLINK-7484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135046#comment-16135046 ] Hai Zhou commented on FLINK-7484: - Hi [~shashank734] Can you upload your code? > com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: > Index: 7, Size: 5 > --- > > Key: FLINK-7484 > URL: https://issues.apache.org/jira/browse/FLINK-7484 > Project: Flink > Issue Type: Bug > Components: DataStream API, Scala API >Affects Versions: 1.3.2 > Environment: Flink 1.3.2 , Yarn Cluster, FsStateBackend >Reporter: Shashank Agarwal > > I am using many CEP's and Global Window. I am getting following error > sometimes and application crashes. I have checked logically there's no flow > in the program. Here ItemPojo is a Pojo class and we are using > java.utill.List[ItemPojo]. We are using Scala DataStream API please find > attached logs. > {code} > 2017-08-17 10:04:12,814 INFO org.apache.flink.runtime.taskmanager.Task > - TriggerWindow(GlobalWindows(), > ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c}, > co.thirdwatch.trigger.TransactionTrigger@5707c1cb, > WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: > Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1) switched > from RUNNING to FAILED. > com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: > Index: 7, Size: 5 > Serialization trace: > category (co.thirdwatch.pojo.ItemPojo) > underlying (scala.collection.convert.Wrappers$SeqWrapper) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43) > at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:190) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74) > at > org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:34) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) > at > org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:77) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5 > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > at java.util.ArrayList.get(ArrayList.java:429) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) > ... 22 more > 2017-08-17 10:04:12,816 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for TriggerWindow(GlobalWindows(), > ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c}, > co.thirdwatch.trigger.TransactionTrigger@5707c1cb, > WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: > Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1). >
[jira] [Created] (FLINK-7484) com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5
Shashank Agarwal created FLINK-7484: --- Summary: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5 Key: FLINK-7484 URL: https://issues.apache.org/jira/browse/FLINK-7484 Project: Flink Issue Type: Bug Components: DataStream API, Scala API Affects Versions: 1.3.2 Environment: Flink 1.3.2 , Yarn Cluster, FsStateBackend Reporter: Shashank Agarwal I am using many CEP's and Global Window. I am getting following error sometimes and application crashes. I have checked logically there's no flow in the program. Here ItemPojo is a Pojo class and we are using java.utill.List[ItemPojo]. We are using Scala DataStream API please find attached logs. {code} 2017-08-17 10:04:12,814 INFO org.apache.flink.runtime.taskmanager.Task - TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c}, co.thirdwatch.trigger.TransactionTrigger@5707c1cb, WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1) switched from RUNNING to FAILED. com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5 Serialization trace: category (co.thirdwatch.pojo.ItemPojo) underlying (scala.collection.convert.Wrappers$SeqWrapper) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43) at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:190) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74) at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:34) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:77) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) ... 22 more 2017-08-17 10:04:12,816 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c}, co.thirdwatch.trigger.TransactionTrigger@5707c1cb, WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1). 2017-08-17 10:04:12,816 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c}, co.thirdwatch.trigger.TransactionTrigger@5707c1cb,
[jira] [Commented] (FLINK-5886) Python API for streaming applications
[ https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135030#comment-16135030 ] ASF GitHub Bot commented on FLINK-5886: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 The check failed because the spotbugs plugin found something; this plugin isn't run by default when calling `mvn verify`. You can run the spotbugs locally by adding `-Dspotbugs` to the maven invocation. The found problem is the PythonEnvironmentConfig class, which contains public static non-final fields. I propose making these non-static and explicitly pass around a config object where needed. > Python API for streaming applications > - > > Key: FLINK-5886 > URL: https://issues.apache.org/jira/browse/FLINK-5886 > Project: Flink > Issue Type: New Feature > Components: Python API >Reporter: Zohar Mizrahi >Assignee: Zohar Mizrahi > > A work in progress to provide python interface for Flink streaming APIs. The > core technology is based on jython and thus imposes two limitations: a. user > defined functions cannot use python extensions. b. the python version is 2.x > The branch is based on Flink release 1.2.0, as can be found here: > https://github.com/zohar-pm/flink/tree/python-streaming > In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was > setup properly (see: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html), > one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, > which in return will execute all the tests under > {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 The check failed because the spotbugs plugin found something; this plugin isn't run by default when calling `mvn verify`. You can run the spotbugs locally by adding `-Dspotbugs` to the maven invocation. The found problem is the PythonEnvironmentConfig class, which contains public static non-final fields. I propose making these non-static and explicitly pass around a config object where needed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5886) Python API for streaming applications
[ https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135018#comment-16135018 ] ASF GitHub Bot commented on FLINK-5886: --- Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/3838 I'm trying to track down the root cause for the checks failures without a success. Obviously, the given project (flink-libraries/flink-streaming-python) in master branch passes the `verify` with success in my environment. Please advise, > Python API for streaming applications > - > > Key: FLINK-5886 > URL: https://issues.apache.org/jira/browse/FLINK-5886 > Project: Flink > Issue Type: New Feature > Components: Python API >Reporter: Zohar Mizrahi >Assignee: Zohar Mizrahi > > A work in progress to provide python interface for Flink streaming APIs. The > core technology is based on jython and thus imposes two limitations: a. user > defined functions cannot use python extensions. b. the python version is 2.x > The branch is based on Flink release 1.2.0, as can be found here: > https://github.com/zohar-pm/flink/tree/python-streaming > In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was > setup properly (see: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html), > one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, > which in return will execute all the tests under > {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/3838 I'm trying to track down the root cause for the checks failures without a success. Obviously, the given project (flink-libraries/flink-streaming-python) in master branch passes the `verify` with success in my environment. Please advise, --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4831) Implement a slf4j metric reporter
[ https://issues.apache.org/jira/browse/FLINK-4831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135012#comment-16135012 ] Sumit Sarin commented on FLINK-4831: Hey Chesner, After going through slf4j and log4j documentation and your past work in progress, I believe that i do need to have an understanding of metrics in depth. So should i get acquainted with that or am i going in the wrong direction? I am currently unaware about counters, gauges, meters, histograms, etc. > Implement a slf4j metric reporter > - > > Key: FLINK-4831 > URL: https://issues.apache.org/jira/browse/FLINK-4831 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.2 >Reporter: Chesnay Schepler >Assignee: Sumit Sarin >Priority: Minor > Labels: easyfix, starter > > For debugging purpose it would be very useful to have a log4j metric > reporter. If you don't want to setup a metric backend you currently have to > rely on JMX, which a) works a bit differently than other reporters (for > example it doesn't extend AbstractReporter) and b) makes it a bit tricky to > analyze results as metrics are cleaned up once a job finishes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7483) BlobCache cleanup timer not reset after job re-registration
[ https://issues.apache.org/jira/browse/FLINK-7483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135007#comment-16135007 ] ASF GitHub Bot commented on FLINK-7483: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4568 @tedyu this fixes the two issues you found in #4238. Could you have a quick look, also? > BlobCache cleanup timer not reset after job re-registration > --- > > Key: FLINK-7483 > URL: https://issues.apache.org/jira/browse/FLINK-7483 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > Since FLINK-7057, the blob cache handles cleanup via {{registerJob}} and > {{releaseJob}} calls where the latter sets a cleanup interval. > {{registerJob}}, however, forgets to reset this if the job is re-registered > again and so the job's blobs will be cleaned up although it is still used! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4568: [FLINK-7483][blob] prevent cleanup of re-registered jobs
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4568 @tedyu this fixes the two issues you found in #4238. Could you have a quick look, also? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7483) BlobCache cleanup timer not reset after job re-registration
[ https://issues.apache.org/jira/browse/FLINK-7483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135000#comment-16135000 ] ASF GitHub Bot commented on FLINK-7483: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4568 [FLINK-7483][blob] prevent cleanup of re-registered jobs ## What is the purpose of the change Since #4238, when a job is registered but was released before and the ref count hit `0`, its cleanup timeout was not reset. This fixes that and adds more reference-counter tests. ## Brief change log - reset the cleanup timeout after job re-registration - add a test for verifying the reference counters contain the expected values ## Verifying this change This change added a test for verifying reference counter values: `BlobCacheCleanupTest#testJobReferences`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-6916-7057-hotfix2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4568.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4568 commit a0a5e2ebf999867462ed31257f30fd777f4fb5f4 Author: Nico KruberDate: 2017-08-21T08:36:56Z [FLINK-7483][blob] prevent cleanup of re-registered jobs When a job is registered, it may have been released before and we thus need to reset the cleanup timeout again. > BlobCache cleanup timer not reset after job re-registration > --- > > Key: FLINK-7483 > URL: https://issues.apache.org/jira/browse/FLINK-7483 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > Since FLINK-7057, the blob cache handles cleanup via {{registerJob}} and > {{releaseJob}} calls where the latter sets a cleanup interval. > {{registerJob}}, however, forgets to reset this if the job is re-registered > again and so the job's blobs will be cleaned up although it is still used! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4568: [FLINK-7483][blob] prevent cleanup of re-registere...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4568 [FLINK-7483][blob] prevent cleanup of re-registered jobs ## What is the purpose of the change Since #4238, when a job is registered but was released before and the ref count hit `0`, its cleanup timeout was not reset. This fixes that and adds more reference-counter tests. ## Brief change log - reset the cleanup timeout after job re-registration - add a test for verifying the reference counters contain the expected values ## Verifying this change This change added a test for verifying reference counter values: `BlobCacheCleanupTest#testJobReferences`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-6916-7057-hotfix2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4568.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4568 commit a0a5e2ebf999867462ed31257f30fd777f4fb5f4 Author: Nico KruberDate: 2017-08-21T08:36:56Z [FLINK-7483][blob] prevent cleanup of re-registered jobs When a job is registered, it may have been released before and we thus need to reset the cleanup timeout again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7483) BlobCache cleanup timer not reset after job re-registration
Nico Kruber created FLINK-7483: -- Summary: BlobCache cleanup timer not reset after job re-registration Key: FLINK-7483 URL: https://issues.apache.org/jira/browse/FLINK-7483 Project: Flink Issue Type: Bug Components: Distributed Coordination, Network Affects Versions: 1.4.0 Reporter: Nico Kruber Assignee: Nico Kruber Since FLINK-7057, the blob cache handles cleanup via {{registerJob}} and {{releaseJob}} calls where the latter sets a cleanup interval. {{registerJob}}, however, forgets to reset this if the job is re-registered again and so the job's blobs will be cleaned up although it is still used! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7479) Support to retrieve the past event by physical offset
[ https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134985#comment-16134985 ] ASF GitHub Bot commented on FLINK-7479: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4563 Thanks @dawidwys for the suggestion. I will update the PR accordingly. > Support to retrieve the past event by physical offset > -- > > Key: FLINK-7479 > URL: https://issues.apache.org/jira/browse/FLINK-7479 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > > Currently, it's already able to retrieve events matched to the specifed > pattern in {{IterativeCondition.Context}}. While there are also requirements > to retrieve events by an physical offset. The retrieved events may not be > matched to any pattern. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4563: [FLINK-7479] [cep] Support to retrieve the past event by ...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4563 Thanks @dawidwys for the suggestion. I will update the PR accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6938) IterativeCondition should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-6938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134982#comment-16134982 ] ASF GitHub Bot commented on FLINK-6938: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4513 Thanks @dawidwys for the remind. Yes, you're right and that make sense to me. I will update the the PR and remove ConditionRegistry related changes. > IterativeCondition should support RichFunction interface > > > Key: FLINK-6938 > URL: https://issues.apache.org/jira/browse/FLINK-6938 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Jark Wu >Assignee: Jark Wu > Fix For: 1.4.0 > > > In FLIP-20, we need IterativeCondition to support an {{open()}} method to > compile the generated code once. We do not want to insert a if condition in > the {{filter()}} method. So I suggest make IterativeCondition support > {{RichFunction}} interface. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4513: [FLINK-6938][FLINK-6939] [cep] Not store IterativeConditi...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4513 Thanks @dawidwys for the remind. Yes, you're right and that make sense to me. I will update the the PR and remove ConditionRegistry related changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6938) IterativeCondition should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-6938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134963#comment-16134963 ] ASF GitHub Bot commented on FLINK-6938: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4513 Hmm, I read back all the comments to previous PRs and I think the consesus was that we do not want to introduce the ConditionRegistry at that time, but start with just FLINK-6938 to enable SQL integration. See comments on that PR: #4172 > IterativeCondition should support RichFunction interface > > > Key: FLINK-6938 > URL: https://issues.apache.org/jira/browse/FLINK-6938 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Jark Wu >Assignee: Jark Wu > Fix For: 1.4.0 > > > In FLIP-20, we need IterativeCondition to support an {{open()}} method to > compile the generated code once. We do not want to insert a if condition in > the {{filter()}} method. So I suggest make IterativeCondition support > {{RichFunction}} interface. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4513: [FLINK-6938][FLINK-6939] [cep] Not store IterativeConditi...
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4513 Hmm, I read back all the comments to previous PRs and I think the consesus was that we do not want to introduce the ConditionRegistry at that time, but start with just FLINK-6938 to enable SQL integration. See comments on that PR: #4172 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-7227) OR expression with more than 2 predicates is not pushed into a TableSource
[ https://issues.apache.org/jira/browse/FLINK-7227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Usman Younas reassigned FLINK-7227: --- Assignee: Usman Younas > OR expression with more than 2 predicates is not pushed into a TableSource > -- > > Key: FLINK-7227 > URL: https://issues.apache.org/jira/browse/FLINK-7227 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > > It seems that {{RexNodeToExpressionConverter}} cannot handle OR expressions > with more than 2 predicates. Therefore the expression is not pushed into a > {{FilterableTableSource}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
[ https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134933#comment-16134933 ] ASF GitHub Bot commented on FLINK-6442: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r134160516 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: TableConfig) { * tEnv.sql(s"SELECT * FROM $table") * }}} * -* @param query The SQL query to evaluate. +* @param sql The SQL string to evaluate. * @return The result of the query as Table. */ - def sql(query: String): Table = { + @deprecated --- End diff -- Please add a `deprecated` comment to the javadoc to tell users which new api should be used. > Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in > SQL > --- > > Key: FLINK-6442 > URL: https://issues.apache.org/jira/browse/FLINK-6442 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > Currently in TableAPI there’s only registration method for source table, > when we use SQL writing a streaming job, we should add additional part for > the sink, like TableAPI does: > {code} > val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" > val t = StreamTestData.getSmall3TupleDataStream(env) > tEnv.registerDataStream("MyTable", t) > // one way: invoke tableAPI’s writeToSink method directly > val result = tEnv.sql(sqlQuery) > result.writeToSink(new YourStreamSink) > // another way: convert to datastream first and then invoke addSink > val result = tEnv.sql(sqlQuery).toDataStream[Row] > result.addSink(new StreamITCase.StringSink) > {code} > From the api we can see the sink table always be a derived table because its > 'schema' is inferred from the result type of upstream query. > Compare to traditional RDBMS which support DML syntax, a query with a target > output could be written like this: > {code} > insert into table target_table_name > [(column_name [ ,...n ])] > query > {code} > The equivalent form of the example above is as follows: > {code} > tEnv.registerTableSink("targetTable", new YourSink) > val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" > val result = tEnv.sql(sql) > {code} > It is supported by Calcite’s grammar: > {code} > insert:( INSERT | UPSERT ) INTO tablePrimary > [ '(' column [, column ]* ')' ] > query > {code} > I'd like to extend Flink TableAPI to support such feature. see design doc: > https://goo.gl/n3phK5 -- This message was sent by Atlassian JIRA (v6.4.14#64029)