HI???6?7??flink1.12??????????????Sql??????????????????????GroupWindowAggregate????????????????Sql??????????????????????GroupWindowAggregate????
CREATE TEMPORARY TABLE RawSource (
    `key` STRING,
    `accessNum` INT,
    `status` STRING,
    rowTime TIMESTAMP(3),
    WATERMARK FOR rowTime AS rowTime - INTERVAL '10' SECOND
) WITH (
    'connector' = 'datagen'
);

CREATE TEMPORARY TABLE TrashSink (
    `tag` STRING,
    `key` STRING,
    `value` BIGINT
) WITH (
    'connector' = 'blackhole'
);

CREATE TEMPORARY VIEW AccView AS SELECT
    COUNT(*) AS accAll,
    COUNT(*) FILTER (WHERE status in ('error')) AS accError,
    `key`
FROM RawSource
GROUP BY TUMBLE(rowTime, INTERVAL '60' SECOND),`key`;

INSERT INTO TrashSink SELECT * FROM (
    SELECT 'accAll', `key`, accAll FROM AccView
    UNION ALL
    SELECT 'accErr', `key`, accError FROM AccView
);

回复