Xikui Wang created ASTERIXDB-2747:
-------------------------------------

             Summary: Feed with SELECT * Function fails to inline subplan
                 Key: ASTERIXDB-2747
                 URL: https://issues.apache.org/jira/browse/ASTERIXDB-2747
             Project: Apache AsterixDB
          Issue Type: Bug
            Reporter: Xikui Wang


In the commented part, the function is not inlined. Using the object merge 
function returns a more efficient plan.

{code:sql}
DROP DATAVERSE dhs IF EXISTS;
CREATE DATAVERSE dhs;
USE dhs;

CREATE TYPE Tweet AS {
  tid: bigint,
  uid: int64,
  text: string
};

CREATE DATASET Tweets(Tweet) PRIMARY KEY tid;

CREATE FEED TweetFeed WITH {
  "adapter-name" : "http_adapter",
  "addresses" : "127.0.0.1:10011",
  "address-type" : "IP",
  "type-name" : "Tweet",
  "format" : "adm"
};

//USE dhs;
//CREATE FUNCTION EnrichTweet(tweet) {
//    SELECT tweet.*, datetime_from_unix_time_in_ms(tweet.created_at) as 
timestamp, 
//        create_point(tweet.coordinates[0], tweet.coordinates[1]) as location
//};
//CONNECT FEED TweetFeed to DATASET Tweets APPLY FUNCTION EnrichTweet;
//START FEED TweetFeed;

USE dhs;
CREATE FUNCTION EnrichTweet(tweet) {
    object_merge(tweet,  {"timestamp" : 
datetime_from_unix_time_in_ms(tweet.created_at), 
        "location" : create_point(tweet.coordinates[0], tweet.coordinates[1])})
};

CONNECT FEED TweetFeed to DATASET Tweets APPLY FUNCTION EnrichTweet;
START FEED TweetFeed;
{code}

Here is the query plan for SELECT *

commit
-- COMMIT  |PARTITIONED|
  project ([$$27])
  -- STREAM_PROJECT  |PARTITIONED|
    exchange
    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
      upsert into dhs.Tweets from record: $$32 partitioned by [$$27] out: 
([record-before-upsert:$$29])
      -- INSERT_DELETE  |PARTITIONED|
        exchange
        -- HASH_PARTITION_EXCHANGE [$$27]  |PARTITIONED|
          assign [$$27] <- [$$32.getField(0)]
          -- ASSIGN  |PARTITIONED|
            project ([$$32])
            -- STREAM_PROJECT  |PARTITIONED|
              assign [$$32] <- [check-unknown(cast($$26))]
              -- ASSIGN  |PARTITIONED|
                project ([$$26])
                -- STREAM_PROJECT  |PARTITIONED|
                  unnest $$26 <- scan-collection($$24)
                  -- UNNEST  |PARTITIONED|
                    project ([$$24])
                    -- STREAM_PROJECT  |PARTITIONED|
                      subplan {
                                aggregate [$$24] <- [listify($$23)]
                                -- AGGREGATE  |LOCAL|
                                  assign [$$23] <- 
[object-concat-strict(if-missing-or-null(to-object($$TweetFeed), cast({})), 
{"timestamp": datetime-from-unix-time-in-ms($$33), "location": 
create-point(get-item($$31, 0), get-item($$31, 1))})]
                                  -- ASSIGN  |LOCAL|
                                    nested tuple source
                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
                             }
                      -- SUBPLAN  |PARTITIONED|
                        assign [$$33, $$31] <- 
[$$TweetFeed.getField("created_at"), $$TweetFeed.getField("coordinates")]
                        -- ASSIGN  |PARTITIONED|
                          exchange
                          -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
                            data-scan []<-[$$TweetFeed] <- dhs.TweetFeed
                            -- DATASOURCE_SCAN  |PARTITIONED|
                              exchange
                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                empty-tuple-source
                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to