[GitHub] [flink] JingsongLi commented on a change in pull request #12283: [FLINK-16975][documentation] Add docs for FileSystem connector

2020-05-28 Thread GitBox


JingsongLi commented on a change in pull request #12283:
URL: https://github.com/apache/flink/pull/12283#discussion_r431837728



##
File path: docs/dev/table/connectors/filesystem.md
##
@@ -0,0 +1,355 @@
+---
+title: "FileSystem Connector"
+nav-title: FileSystem Connector
+nav-parent_id: connectors-table
+nav-pos: 1
+---
+
+
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{ 
site.baseurl}}/ops/filesystems/index.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+The file system connector itself is included in Flink and does not require an 
additional dependency.
+A corresponding format needs to be specified for reading and writing rows from 
and to a file system.
+
+The file system connector allows for reading and writing from a local or 
distributed filesystem. A filesystem can be defined as:
+
+
+
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  column_name1 INT,
+  column_name2 STRING,
+  ...
+  part_name1 INT,
+  part_name2 STRING
+) PARTITIONED BY (part_name1, part_name2) WITH (
+  'connector' = 'filesystem',   -- required: specify to connector type
+  'path' = 'file:///path/to/whatever',  -- required: path to a directory
+  'format' = '...', -- required: file system connector 
requires to specify a format,
+-- Please refer to Table Formats
+-- section for more details.s
+  'partition.default-name' = '...', -- optional: default partition name in 
case the dynamic partition
+-- column value is null/empty string.
+  
+  -- optional: the option to enable shuffle data by dynamic partition fields 
in sink phase, this can greatly
+  -- reduce the number of file for filesystem sink but may lead data skew, the 
default value is disabled.
+  'sink.shuffle-by-partition.enable' = '...',
+  ...
+)
+{% endhighlight %}
+
+
+
+Attention Make sure to include [Flink 
File System specific dependencies]({{ site.baseurl 
}}/ops/filesystems/index.html).
+
+Attention File system sources for 
streaming is still under development. In the future, the community will add 
support for common streaming use cases, i.e., partition and directory 
monitoring.
+
+## Partition Files
+
+Flink's file system partition support uses the standard hive format. However, 
it does not require partitions to be pre-registered with a table catalog. 
Partitions are discovered and inferred based on directory structure. For 
example, a table partitioned based on the directory below would be inferred to 
contain `datetime` and `hour` partitions.
+
+```
+path
+└── datetime=2019-08-25
+└── hour=11
+├── part-0.parquet
+├── part-1.parquet
+└── hour=12
+├── part-0.parquet
+└── datetime=2019-08-26
+└── hour=6
+├── part-0.parquet
+```
+
+The file system table supports both partition inserting and overwrite 
inserting. See [INSERT Statement]({{ site.baseurl 
}}/dev/table/sql/insert.html). When you insert overwrite to a partitioned 
table, only the corresponding partition will be overwritten, not the entire 
table.
+
+## File Formats
+
+The file system connector supports multiple formats:
+
+ - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). Uncompressed.
+ - JSON: Note JSON format for file system connector is not a typical JSON file 
but uncompressed [newline delimited JSON](http://jsonlines.org/).
+ - Avro: [Apache Avro](http://avro.apache.org). Support compression by 
configuring `avro.codec`.
+ - Parquet: [Apache Parquet](http://parquet.apache.org). Compatible with Hive.
+ - Orc: [Apache Orc](http://orc.apache.org). Compatible with Hive.
+
+## Streaming Sink
+
+The file system connector supports streaming writes, based on Flink's 
[Streaming File Sink]({{ site.baseurl }}/dev/connectors/streamfile_sink.html)
+to write records to file. Row-encoded Formats are csv and json. Bulk-encoded 
Formats are parquet, orc and avro.
+
+### Rolling Policy
+
+Data within the partition directories are split into part files. Each 
partition will contain at least one part file for
+each subtask of the sink that has received data for that partition. The 
in-progress part file will be closed and additional
+part file will be created according to the configurable rolling policy. The 
policy rolls part files based on size,
+a timeout that specifies the maximum duration for which a file can be open.
+
+
+  
+
+Key
+Default
+Type
+Description
+
+  
+  
+
+sink.rolling-policy.file-size
+128MB
+MemorySize
+The maximum part file size before rolling.
+
+
+sink.rolling-policy.time-interval
+30 m
+Duration
+The maximum time duration a part file can stay open before rolling 
(by default 30 min to avoid to many small files).
+
+  
+
+
+**NOTE:** For bulk formats (parquet, orc, 

[GitHub] [flink] JingsongLi commented on a change in pull request #12283: [FLINK-16975][documentation] Add docs for FileSystem connector

2020-05-28 Thread GitBox


JingsongLi commented on a change in pull request #12283:
URL: https://github.com/apache/flink/pull/12283#discussion_r431836042



##
File path: docs/dev/table/connectors/filesystem.md
##
@@ -0,0 +1,355 @@
+---
+title: "FileSystem Connector"
+nav-title: FileSystem Connector
+nav-parent_id: connectors-table
+nav-pos: 1
+---
+
+
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{ 
site.baseurl}}/ops/filesystems/index.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+The file system connector itself is included in Flink and does not require an 
additional dependency.
+A corresponding format needs to be specified for reading and writing rows from 
and to a file system.
+
+The file system connector allows for reading and writing from a local or 
distributed filesystem. A filesystem can be defined as:
+
+
+
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  column_name1 INT,
+  column_name2 STRING,
+  ...
+  part_name1 INT,
+  part_name2 STRING
+) PARTITIONED BY (part_name1, part_name2) WITH (
+  'connector' = 'filesystem',   -- required: specify to connector type
+  'path' = 'file:///path/to/whatever',  -- required: path to a directory
+  'format' = '...', -- required: file system connector 
requires to specify a format,
+-- Please refer to Table Formats
+-- section for more details.s
+  'partition.default-name' = '...', -- optional: default partition name in 
case the dynamic partition
+-- column value is null/empty string.
+  
+  -- optional: the option to enable shuffle data by dynamic partition fields 
in sink phase, this can greatly
+  -- reduce the number of file for filesystem sink but may lead data skew, the 
default value is disabled.
+  'sink.shuffle-by-partition.enable' = '...',
+  ...
+)
+{% endhighlight %}
+
+
+
+Attention Make sure to include [Flink 
File System specific dependencies]({{ site.baseurl 
}}/ops/filesystems/index.html).
+
+Attention File system sources for 
streaming is still under development. In the future, the community will add 
support for common streaming use cases, i.e., partition and directory 
monitoring.
+
+## Partition Files
+
+Flink's file system partition support uses the standard hive format. However, 
it does not require partitions to be pre-registered with a table catalog. 
Partitions are discovered and inferred based on directory structure. For 
example, a table partitioned based on the directory below would be inferred to 
contain `datetime` and `hour` partitions.
+
+```
+path
+└── datetime=2019-08-25
+└── hour=11
+├── part-0.parquet
+├── part-1.parquet
+└── hour=12
+├── part-0.parquet
+└── datetime=2019-08-26
+└── hour=6
+├── part-0.parquet
+```
+
+The file system table supports both partition inserting and overwrite 
inserting. See [INSERT Statement]({{ site.baseurl 
}}/dev/table/sql/insert.html). When you insert overwrite to a partitioned 
table, only the corresponding partition will be overwritten, not the entire 
table.
+
+## File Formats
+
+The file system connector supports multiple formats:
+
+ - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). Uncompressed.
+ - JSON: Note JSON format for file system connector is not a typical JSON file 
but uncompressed [newline delimited JSON](http://jsonlines.org/).
+ - Avro: [Apache Avro](http://avro.apache.org). Support compression by 
configuring `avro.codec`.
+ - Parquet: [Apache Parquet](http://parquet.apache.org). Compatible with Hive.
+ - Orc: [Apache Orc](http://orc.apache.org). Compatible with Hive.
+
+## Streaming Sink
+
+The file system connector supports streaming writes, based on Flink's 
[Streaming File Sink]({{ site.baseurl }}/dev/connectors/streamfile_sink.html)
+to write records to file. Row-encoded Formats are csv and json. Bulk-encoded 
Formats are parquet, orc and avro.
+
+### Rolling Policy
+
+Data within the partition directories are split into part files. Each 
partition will contain at least one part file for
+each subtask of the sink that has received data for that partition. The 
in-progress part file will be closed and additional
+part file will be created according to the configurable rolling policy. The 
policy rolls part files based on size,
+a timeout that specifies the maximum duration for which a file can be open.
+
+
+  
+
+Key
+Default
+Type
+Description
+
+  
+  
+
+sink.rolling-policy.file-size
+128MB
+MemorySize

Review comment:
   I think it is a legacy reason, and it is used in disk too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For 

[GitHub] [flink] JingsongLi commented on a change in pull request #12283: [FLINK-16975][documentation] Add docs for FileSystem connector

2020-05-28 Thread GitBox


JingsongLi commented on a change in pull request #12283:
URL: https://github.com/apache/flink/pull/12283#discussion_r431835345



##
File path: docs/dev/table/connectors/filesystem.md
##
@@ -0,0 +1,355 @@
+---
+title: "FileSystem Connector"
+nav-title: FileSystem Connector
+nav-parent_id: connectors-table
+nav-pos: 1
+---
+
+
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{ 
site.baseurl}}/ops/filesystems/index.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+The file system connector itself is included in Flink and does not require an 
additional dependency.
+A corresponding format needs to be specified for reading and writing rows from 
and to a file system.
+
+The file system connector allows for reading and writing from a local or 
distributed filesystem. A filesystem can be defined as:
+
+
+
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  column_name1 INT,
+  column_name2 STRING,
+  ...
+  part_name1 INT,
+  part_name2 STRING
+) PARTITIONED BY (part_name1, part_name2) WITH (
+  'connector' = 'filesystem',   -- required: specify to connector type
+  'path' = 'file:///path/to/whatever',  -- required: path to a directory
+  'format' = '...', -- required: file system connector 
requires to specify a format,
+-- Please refer to Table Formats
+-- section for more details.s
+  'partition.default-name' = '...', -- optional: default partition name in 
case the dynamic partition
+-- column value is null/empty string.
+  
+  -- optional: the option to enable shuffle data by dynamic partition fields 
in sink phase, this can greatly
+  -- reduce the number of file for filesystem sink but may lead data skew, the 
default value is disabled.
+  'sink.shuffle-by-partition.enable' = '...',
+  ...
+)
+{% endhighlight %}
+
+
+
+Attention Make sure to include [Flink 
File System specific dependencies]({{ site.baseurl 
}}/ops/filesystems/index.html).
+
+Attention File system sources for 
streaming is still under development. In the future, the community will add 
support for common streaming use cases, i.e., partition and directory 
monitoring.
+
+## Partition Files
+
+Flink's file system partition support uses the standard hive format. However, 
it does not require partitions to be pre-registered with a table catalog. 
Partitions are discovered and inferred based on directory structure. For 
example, a table partitioned based on the directory below would be inferred to 
contain `datetime` and `hour` partitions.
+
+```
+path
+└── datetime=2019-08-25
+└── hour=11
+├── part-0.parquet
+├── part-1.parquet
+└── hour=12
+├── part-0.parquet
+└── datetime=2019-08-26
+└── hour=6
+├── part-0.parquet
+```
+
+The file system table supports both partition inserting and overwrite 
inserting. See [INSERT Statement]({{ site.baseurl 
}}/dev/table/sql/insert.html). When you insert overwrite to a partitioned 
table, only the corresponding partition will be overwritten, not the entire 
table.
+
+## File Formats
+
+The file system connector supports multiple formats:
+
+ - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). Uncompressed.
+ - JSON: Note JSON format for file system connector is not a typical JSON file 
but uncompressed [newline delimited JSON](http://jsonlines.org/).
+ - Avro: [Apache Avro](http://avro.apache.org). Support compression by 
configuring `avro.codec`.
+ - Parquet: [Apache Parquet](http://parquet.apache.org). Compatible with Hive.
+ - Orc: [Apache Orc](http://orc.apache.org). Compatible with Hive.
+
+## Streaming Sink
+
+The file system connector supports streaming writes, based on Flink's 
[Streaming File Sink]({{ site.baseurl }}/dev/connectors/streamfile_sink.html)
+to write records to file. Row-encoded Formats are csv and json. Bulk-encoded 
Formats are parquet, orc and avro.
+
+### Rolling Policy
+
+Data within the partition directories are split into part files. Each 
partition will contain at least one part file for
+each subtask of the sink that has received data for that partition. The 
in-progress part file will be closed and additional
+part file will be created according to the configurable rolling policy. The 
policy rolls part files based on size,
+a timeout that specifies the maximum duration for which a file can be open.

Review comment:
   yes





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #12283: [FLINK-16975][documentation] Add docs for FileSystem connector

2020-05-25 Thread GitBox


JingsongLi commented on a change in pull request #12283:
URL: https://github.com/apache/flink/pull/12283#discussion_r429905623



##
File path: docs/dev/table/connectors/filesystem.md
##
@@ -0,0 +1,340 @@
+---
+title: "FileSystem Connector"
+nav-title: FileSystem Connector
+nav-parent_id: connectors
+nav-pos: 5
+---
+
+
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{ 
site.baseurl}}/ops/filesystems/index.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+The file system connector itself is included in Flink and does not require an 
additional dependency.
+A corresponding format needs to be specified for reading and writing rows from 
and to a file system.
+
+The file system connector allows for reading and writing from a local or 
distributed filesystem. A filesystem can be defined as:
+
+
+
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  column_name1 INT,
+  column_name2 STRING,
+  ...
+  part_name1 INT,
+  part_name2 STRING
+) PARTITIONED BY (part_name1, part_name2) WITH (
+  'connector' = 'filesystem',   -- required: specify to connector type
+  'path' = 'file:///path/to/whatever',  -- required: path to a directory
+  'format' = '...', -- required: file system connector 
requires to specify a format,
+-- Please refer to Table Formats
+-- section for more details.s
+  'partition.default-name' = '...', -- optional: default partition name in 
case the dynamic partition
+-- column value is null/empty string.
+  
+  -- optional: the option to enable shuffle data by dynamic partition fields 
in sink phase, this can greatly
+  -- reduce the number of file for filesystem sink but may lead data skew, the 
default value is disabled.
+  'sink.shuffle-by-partition.enable' = '...',
+  ...
+)
+{% endhighlight %}
+
+
+
+Attention Make sure to include [Flink 
File System specific dependencies]({{ site.baseurl 
}}/ops/filesystems/index.html).
+
+Attention File system sources for 
streaming is still under development. In the future, the community will add 
support for common streaming use cases, i.e., partition and directory 
monitoring.
+
+## Partition Files
+
+Flink's file system partition support uses the standard hive format. However, 
it does not require partitions to be pre-registered with a table catalog. 
Partitions are discovered and inferred based on directory structure. For 
example, a table partitioned based on the directory below would be inferred to 
contain `datetime` and `hour` partitions.
+
+```
+path
+└── datetime=2019-08-25
+└── hour=11
+├── part-0.parquet
+├── part-1.parquet
+└── hour=12
+├── part-0.parquet
+└── datetime=2019-08-26
+└── hour=6
+├── part-0.parquet
+```
+
+The file system table supports both partition inserting and overwrite 
inserting. See [INSERT Statement]({{ site.baseurl }}/table/sql/insert.html). 
When you insert overwrite to a partitioned table, only the corresponding 
partition will be overwritten, not the entire table.
+
+## File Formats
+
+The file system connector supports multiple formats:
+
+ - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). Uncompressed.
+ - JSON: Note JSON format for file system connector is not a typical JSON file 
but uncompressed [newline delimited JSON](http://jsonlines.org/).
+ - Avro: [Apache Avro](http://avro.apache.org). Support compression by 
configuring `avro.codec`.
+ - Parquet: [Apache Parquet](http://parquet.apache.org). Compatible with Hive.
+ - Orc: [Apache Orc](http://orc.apache.org). Compatible with Hive.
+
+## Streaming Sink
+
+The file system connector supports streaming writes, based on Flink's 
[Streaming File Sink]({{ site.baseurl }}/connectors/streamfile_sink.html)
+to write records to file. Row-encoded Formats are csv and json. Bulk-encoded 
Formats are parquet, orc and avro.
+
+### Rolling Policy
+
+Data within the partition directories are split into part files. Each 
partition will contain at least one part file for
+each subtask of the sink that has received data for that partition. The 
in-progress part file will be closed and additional
+part file will be created according to the configurable rolling policy. The 
policy rolls part files based on size,
+a timeout that specifies the maximum duration for which a file can be open.
+
+
+  
+
+Key
+Default
+Type
+Description
+
+  
+  
+
+sink.rolling-policy.file-size
+1024L * 1024L * 128L
+Long
+The maximum part file size before rolling.
+
+
+sink.rolling-policy.time-interval
+30 m
+Duration
+The maximum time duration a part file can stay open before rolling 
(by default 30 min to avoid to many small files).
+
+  
+
+
+**NOTE:** For bulk formats (parquet, orc, avro), 

[GitHub] [flink] JingsongLi commented on a change in pull request #12283: [FLINK-16975][documentation] Add docs for FileSystem connector

2020-05-25 Thread GitBox


JingsongLi commented on a change in pull request #12283:
URL: https://github.com/apache/flink/pull/12283#discussion_r429900328



##
File path: docs/dev/table/connectors/filesystem.md
##
@@ -0,0 +1,352 @@
+---
+title: "Hadoop FileSystem Connector"
+nav-title: Hadoop FileSystem
+nav-parent_id: connectors
+nav-pos: 5
+---
+
+
+* This will be replaced by the TOC
+{:toc}
+
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{ 
site.baseurl}}/ops/filesystems/index.html).
+
+The file system connector itself is included in Flink and does not require an 
additional dependency.
+A corresponding format needs to be specified for reading and writing rows from 
and to a file system.
+
+The file system connector allows for reading and writing from a local or 
distributed filesystem. A filesystem can be defined as:
+
+
+
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  column_name1 INT,
+  column_name2 STRING,
+  ...
+  part_name1 INT,
+  part_name2 STRING
+) PARTITIONED BY (part_name1, part_name2) WITH (
+  'connector' = 'filesystem',   -- required: specify to connector type
+  'path' = 'file:///path/to/whatever',  -- required: path to a directory
+  'format' = '...', -- required: file system connector 
requires to specify a format,
+-- Please refer to Table Formats
+-- section for more details.s
+  'partition.default-name' = '...', -- optional: default partition name in 
case the dynamic partition
+-- column value is null/empty string.
+  
+  -- optional: the option to enable shuffle data by dynamic partition fields 
in sink phase, this can greatly
+  -- reduce the number of file for filesystem sink but may lead data skew, the 
default value is disabled.
+  'sink.shuffle-by-partition.enable' = '...',
+  ...
+)
+{% endhighlight %}
+
+
+
+Attention Make sure to include [Flink 
File System specific dependencies]({{ site.baseurl 
}}/internals/filesystems.html).
+
+Attention File system sources for 
streaming are only experimental. In the future, we will support actual 
streaming use cases, i.e., partition and directory monitoring.
+
+## Partition files
+
+The partition supported by the file system connector is similar to hive, but 
different from hive,
+hive manage partitions through catalog, file system table manages partitions 
according to the
+directory of the file system. File system connector discover and infer 
partitions automatically.
+For example, a table partitioned by datetime and hour is the structure in file 
system path:
+
+```
+path
+└── datetime=2019-08-25
+└── hour=11
+├── part-0.parquet
+├── part-1.parquet
+└── hour=12
+├── part-0.parquet
+└── datetime=2019-08-26
+└── hour=6
+├── part-0.parquet
+```
+
+The file system table support partition inserting and overwrite inserting. See 
[INSERT Statement]({{ site.baseurl }}/table/sql/insert.html).
+
+**NOTE:** When you insert overwrite to a partitioned table, only the 
corresponding partition will be overwrite, not the entire table.
+
+## File Formats
+
+The file system connector supports multiple formats:
+
+ - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). Uncompressed.
+ - JSON: Note JSON format for file system connector is not a typical JSON 
file. It is [Newline-delimited JSON](http://jsonlines.org/). Uncompressed.
+ - Avro: [Apache Avro](http://avro.apache.org). Support compression by 
configuring `avro.codec`.
+ - Parquet: [Apache Parquet](http://parquet.apache.org). Compatible with Hive.
+ - Orc: [Apache Orc](http://orc.apache.org). Compatible with Hive.
+
+## Streaming sink
+
+The file system connector supports streaming sink, it uses [Streaming File 
Sink]({{ site.baseurl }}/connectors/streamfile_sink.html)
+to write records to file. Row-encoded Formats are csv and json. Bulk-encoded 
Formats are parquet, orc and avro.
+
+### Rolling policy
+
+Data within the partition directories are split into part files. Each 
partition will contain at least one part file for
+each subtask of the sink that has received data for that partition. The 
in-progress part file will be closed and additional
+part file will be created according to the configurable rolling policy. The 
policy rolls part files based on size,
+a timeout that specifies the maximum duration for which a file can be open.
+
+
+  
+
+Key
+Default
+Type
+Description
+
+  
+  
+
+sink.rolling-policy.file-size
+1024L * 1024L * 128L
+Long
+The maximum part file size before rolling.
+
+
+sink.rolling-policy.time-interval
+30 m
+Duration
+The maximum time duration a part file can stay open before rolling 
(by default 30 min to avoid to many small files).
+
+  
+
+
+**NOTE:** For bulk formats 

[GitHub] [flink] JingsongLi commented on a change in pull request #12283: [FLINK-16975][documentation] Add docs for FileSystem connector

2020-05-25 Thread GitBox


JingsongLi commented on a change in pull request #12283:
URL: https://github.com/apache/flink/pull/12283#discussion_r429795438



##
File path: docs/dev/table/connectors/filesystem.md
##
@@ -0,0 +1,352 @@
+---
+title: "Hadoop FileSystem Connector"
+nav-title: Hadoop FileSystem
+nav-parent_id: connectors
+nav-pos: 5
+---
+
+
+* This will be replaced by the TOC
+{:toc}
+
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{ 
site.baseurl}}/ops/filesystems/index.html).
+
+The file system connector itself is included in Flink and does not require an 
additional dependency.
+A corresponding format needs to be specified for reading and writing rows from 
and to a file system.
+
+The file system connector allows for reading and writing from a local or 
distributed filesystem. A filesystem can be defined as:
+
+
+
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  column_name1 INT,
+  column_name2 STRING,
+  ...
+  part_name1 INT,
+  part_name2 STRING
+) PARTITIONED BY (part_name1, part_name2) WITH (
+  'connector' = 'filesystem',   -- required: specify to connector type
+  'path' = 'file:///path/to/whatever',  -- required: path to a directory
+  'format' = '...', -- required: file system connector 
requires to specify a format,
+-- Please refer to Table Formats
+-- section for more details.s
+  'partition.default-name' = '...', -- optional: default partition name in 
case the dynamic partition
+-- column value is null/empty string.
+  
+  -- optional: the option to enable shuffle data by dynamic partition fields 
in sink phase, this can greatly
+  -- reduce the number of file for filesystem sink but may lead data skew, the 
default value is disabled.
+  'sink.shuffle-by-partition.enable' = '...',
+  ...
+)
+{% endhighlight %}
+
+
+
+Attention Make sure to include [Flink 
File System specific dependencies]({{ site.baseurl 
}}/internals/filesystems.html).
+
+Attention File system sources for 
streaming are only experimental. In the future, we will support actual 
streaming use cases, i.e., partition and directory monitoring.
+
+## Partition files
+
+The partition supported by the file system connector is similar to hive, but 
different from hive,
+hive manage partitions through catalog, file system table manages partitions 
according to the
+directory of the file system. File system connector discover and infer 
partitions automatically.
+For example, a table partitioned by datetime and hour is the structure in file 
system path:
+
+```
+path
+└── datetime=2019-08-25
+└── hour=11
+├── part-0.parquet
+├── part-1.parquet
+└── hour=12
+├── part-0.parquet
+└── datetime=2019-08-26
+└── hour=6
+├── part-0.parquet
+```
+
+The file system table support partition inserting and overwrite inserting. See 
[INSERT Statement]({{ site.baseurl }}/table/sql/insert.html).
+
+**NOTE:** When you insert overwrite to a partitioned table, only the 
corresponding partition will be overwrite, not the entire table.
+
+## File Formats
+
+The file system connector supports multiple formats:
+
+ - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). Uncompressed.
+ - JSON: Note JSON format for file system connector is not a typical JSON 
file. It is [Newline-delimited JSON](http://jsonlines.org/). Uncompressed.
+ - Avro: [Apache Avro](http://avro.apache.org). Support compression by 
configuring `avro.codec`.
+ - Parquet: [Apache Parquet](http://parquet.apache.org). Compatible with Hive.
+ - Orc: [Apache Orc](http://orc.apache.org). Compatible with Hive.
+
+## Streaming sink
+
+The file system connector supports streaming sink, it uses [Streaming File 
Sink]({{ site.baseurl }}/connectors/streamfile_sink.html)
+to write records to file. Row-encoded Formats are csv and json. Bulk-encoded 
Formats are parquet, orc and avro.
+
+### Rolling policy
+
+Data within the partition directories are split into part files. Each 
partition will contain at least one part file for
+each subtask of the sink that has received data for that partition. The 
in-progress part file will be closed and additional
+part file will be created according to the configurable rolling policy. The 
policy rolls part files based on size,
+a timeout that specifies the maximum duration for which a file can be open.
+
+
+  
+
+Key
+Default
+Type
+Description
+
+  
+  
+
+sink.rolling-policy.file-size
+1024L * 1024L * 128L
+Long
+The maximum part file size before rolling.
+
+
+sink.rolling-policy.time-interval
+30 m
+Duration
+The maximum time duration a part file can stay open before rolling 
(by default 30 min to avoid to many small files).
+
+  
+
+
+**NOTE:** For bulk formats 

[GitHub] [flink] JingsongLi commented on a change in pull request #12283: [FLINK-16975][documentation] Add docs for FileSystem connector

2020-05-23 Thread GitBox


JingsongLi commented on a change in pull request #12283:
URL: https://github.com/apache/flink/pull/12283#discussion_r429594503



##
File path: docs/dev/table/connectors/filesystem.md
##
@@ -0,0 +1,352 @@
+---
+title: "Hadoop FileSystem Connector"
+nav-title: Hadoop FileSystem

Review comment:
   It is for Flink FS, forgot to modify title.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #12283: [FLINK-16975][documentation] Add docs for FileSystem connector

2020-05-22 Thread GitBox


JingsongLi commented on a change in pull request #12283:
URL: https://github.com/apache/flink/pull/12283#discussion_r429503816



##
File path: docs/dev/table/connectors/filesystem.md
##
@@ -0,0 +1,352 @@
+---
+title: "Hadoop FileSystem Connector"
+nav-title: Hadoop FileSystem
+nav-parent_id: connectors
+nav-pos: 5
+---
+
+
+* This will be replaced by the TOC
+{:toc}
+
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{ 
site.baseurl}}/ops/filesystems/index.html).
+
+The file system connector itself is included in Flink and does not require an 
additional dependency.
+A corresponding format needs to be specified for reading and writing rows from 
and to a file system.
+
+The file system connector allows for reading and writing from a local or 
distributed filesystem. A filesystem can be defined as:
+
+
+
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  column_name1 INT,
+  column_name2 STRING,
+  ...
+  part_name1 INT,
+  part_name2 STRING
+) PARTITIONED BY (part_name1, part_name2) WITH (
+  'connector' = 'filesystem',   -- required: specify to connector type
+  'path' = 'file:///path/to/whatever',  -- required: path to a directory
+  'format' = '...', -- required: file system connector 
requires to specify a format,
+-- Please refer to Table Formats
+-- section for more details.s
+  'partition.default-name' = '...', -- optional: default partition name in 
case the dynamic partition
+-- column value is null/empty string.
+  
+  -- optional: the option to enable shuffle data by dynamic partition fields 
in sink phase, this can greatly
+  -- reduce the number of file for filesystem sink but may lead data skew, the 
default value is disabled.
+  'sink.shuffle-by-partition.enable' = '...',
+  ...
+)
+{% endhighlight %}
+
+
+
+Attention Make sure to include [Flink 
File System specific dependencies]({{ site.baseurl 
}}/internals/filesystems.html).
+
+Attention File system sources for 
streaming are only experimental. In the future, we will support actual 
streaming use cases, i.e., partition and directory monitoring.
+
+## Partition files
+
+The partition supported by the file system connector is similar to hive, but 
different from hive,
+hive manage partitions through catalog, file system table manages partitions 
according to the
+directory of the file system. File system connector discover and infer 
partitions automatically.
+For example, a table partitioned by datetime and hour is the structure in file 
system path:
+
+```
+path
+└── datetime=2019-08-25
+└── hour=11
+├── part-0.parquet
+├── part-1.parquet
+└── hour=12
+├── part-0.parquet
+└── datetime=2019-08-26
+└── hour=6
+├── part-0.parquet
+```
+
+The file system table support partition inserting and overwrite inserting. See 
[INSERT Statement]({{ site.baseurl }}/table/sql/insert.html).
+
+**NOTE:** When you insert overwrite to a partitioned table, only the 
corresponding partition will be overwrite, not the entire table.
+
+## File Formats
+
+The file system connector supports multiple formats:
+
+ - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). Uncompressed.
+ - JSON: Note JSON format for file system connector is not a typical JSON 
file. It is [Newline-delimited JSON](http://jsonlines.org/). Uncompressed.
+ - Avro: [Apache Avro](http://avro.apache.org). Support compression by 
configuring `avro.codec`.
+ - Parquet: [Apache Parquet](http://parquet.apache.org). Compatible with Hive.
+ - Orc: [Apache Orc](http://orc.apache.org). Compatible with Hive.
+
+## Streaming sink
+
+The file system connector supports streaming sink, it uses [Streaming File 
Sink]({{ site.baseurl }}/connectors/streamfile_sink.html)
+to write records to file. Row-encoded Formats are csv and json. Bulk-encoded 
Formats are parquet, orc and avro.
+
+### Rolling policy
+
+Data within the partition directories are split into part files. Each 
partition will contain at least one part file for
+each subtask of the sink that has received data for that partition. The 
in-progress part file will be closed and additional
+part file will be created according to the configurable rolling policy. The 
policy rolls part files based on size,
+a timeout that specifies the maximum duration for which a file can be open.
+
+
+  
+
+Key
+Default
+Type
+Description
+
+  
+  
+
+sink.rolling-policy.file-size
+1024L * 1024L * 128L
+Long
+The maximum part file size before rolling.
+
+
+sink.rolling-policy.time-interval
+30 m
+Duration
+The maximum time duration a part file can stay open before rolling 
(by default 30 min to avoid to many small files).
+
+  
+
+
+**NOTE:** For bulk formats 

[GitHub] [flink] JingsongLi commented on a change in pull request #12283: [FLINK-16975][documentation] Add docs for FileSystem connector

2020-05-22 Thread GitBox


JingsongLi commented on a change in pull request #12283:
URL: https://github.com/apache/flink/pull/12283#discussion_r429503816



##
File path: docs/dev/table/connectors/filesystem.md
##
@@ -0,0 +1,352 @@
+---
+title: "Hadoop FileSystem Connector"
+nav-title: Hadoop FileSystem
+nav-parent_id: connectors
+nav-pos: 5
+---
+
+
+* This will be replaced by the TOC
+{:toc}
+
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{ 
site.baseurl}}/ops/filesystems/index.html).
+
+The file system connector itself is included in Flink and does not require an 
additional dependency.
+A corresponding format needs to be specified for reading and writing rows from 
and to a file system.
+
+The file system connector allows for reading and writing from a local or 
distributed filesystem. A filesystem can be defined as:
+
+
+
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  column_name1 INT,
+  column_name2 STRING,
+  ...
+  part_name1 INT,
+  part_name2 STRING
+) PARTITIONED BY (part_name1, part_name2) WITH (
+  'connector' = 'filesystem',   -- required: specify to connector type
+  'path' = 'file:///path/to/whatever',  -- required: path to a directory
+  'format' = '...', -- required: file system connector 
requires to specify a format,
+-- Please refer to Table Formats
+-- section for more details.s
+  'partition.default-name' = '...', -- optional: default partition name in 
case the dynamic partition
+-- column value is null/empty string.
+  
+  -- optional: the option to enable shuffle data by dynamic partition fields 
in sink phase, this can greatly
+  -- reduce the number of file for filesystem sink but may lead data skew, the 
default value is disabled.
+  'sink.shuffle-by-partition.enable' = '...',
+  ...
+)
+{% endhighlight %}
+
+
+
+Attention Make sure to include [Flink 
File System specific dependencies]({{ site.baseurl 
}}/internals/filesystems.html).
+
+Attention File system sources for 
streaming are only experimental. In the future, we will support actual 
streaming use cases, i.e., partition and directory monitoring.
+
+## Partition files
+
+The partition supported by the file system connector is similar to hive, but 
different from hive,
+hive manage partitions through catalog, file system table manages partitions 
according to the
+directory of the file system. File system connector discover and infer 
partitions automatically.
+For example, a table partitioned by datetime and hour is the structure in file 
system path:
+
+```
+path
+└── datetime=2019-08-25
+└── hour=11
+├── part-0.parquet
+├── part-1.parquet
+└── hour=12
+├── part-0.parquet
+└── datetime=2019-08-26
+└── hour=6
+├── part-0.parquet
+```
+
+The file system table support partition inserting and overwrite inserting. See 
[INSERT Statement]({{ site.baseurl }}/table/sql/insert.html).
+
+**NOTE:** When you insert overwrite to a partitioned table, only the 
corresponding partition will be overwrite, not the entire table.
+
+## File Formats
+
+The file system connector supports multiple formats:
+
+ - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). Uncompressed.
+ - JSON: Note JSON format for file system connector is not a typical JSON 
file. It is [Newline-delimited JSON](http://jsonlines.org/). Uncompressed.
+ - Avro: [Apache Avro](http://avro.apache.org). Support compression by 
configuring `avro.codec`.
+ - Parquet: [Apache Parquet](http://parquet.apache.org). Compatible with Hive.
+ - Orc: [Apache Orc](http://orc.apache.org). Compatible with Hive.
+
+## Streaming sink
+
+The file system connector supports streaming sink, it uses [Streaming File 
Sink]({{ site.baseurl }}/connectors/streamfile_sink.html)
+to write records to file. Row-encoded Formats are csv and json. Bulk-encoded 
Formats are parquet, orc and avro.
+
+### Rolling policy
+
+Data within the partition directories are split into part files. Each 
partition will contain at least one part file for
+each subtask of the sink that has received data for that partition. The 
in-progress part file will be closed and additional
+part file will be created according to the configurable rolling policy. The 
policy rolls part files based on size,
+a timeout that specifies the maximum duration for which a file can be open.
+
+
+  
+
+Key
+Default
+Type
+Description
+
+  
+  
+
+sink.rolling-policy.file-size
+1024L * 1024L * 128L
+Long
+The maximum part file size before rolling.
+
+
+sink.rolling-policy.time-interval
+30 m
+Duration
+The maximum time duration a part file can stay open before rolling 
(by default 30 min to avoid to many small files).
+
+  
+
+
+**NOTE:** For bulk formats