[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-08 Thread via GitHub


FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1287877080


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java:
##
@@ -97,25 +114,29 @@ public void storeCatalog(String catalogName, 
CatalogDescriptor catalog)
 throws CatalogException {
 checkOpenState();
 
-Path filePath = getCatalogPath(catalogName);
+Path catalogPath = getCatalogPath(catalogName);
 try {
-File file = filePath.toFile();
-if (file.exists()) {
+FileSystem fs = catalogPath.getFileSystem();
+
+if (fs.exists(catalogPath)) {
 throw new CatalogException(
 String.format(
 "Catalog %s's store file %s is already exist.",
-catalogName, filePath));
+catalogName, catalogPath));
+}
+
+try (FSDataOutputStream os = fs.create(catalogPath, 
WriteMode.NO_OVERWRITE)) {
+YAML_MAPPER.writeValue(os, catalog.getConfiguration().toMap());
 }
-// create a new file
-file.createNewFile();
-String yamlString = 
yaml.dumpAsMap(catalog.getConfiguration().toMap());
-FileUtils.writeFile(file, yamlString, charset);
-LOG.info("Catalog {}'s configuration saved to file {}", 
catalogName, filePath);
-} catch (Throwable e) {
+
+LOG.info("Catalog {}'s configuration saved to file {}", 
catalogName, catalogPath);
+} catch (CatalogException e) {
+throw e;
+} catch (Exception e) {

Review Comment:
   For example, we only need to catch IOException, StreamWriteException, 
DatabindException for the above code and convert them to CatalogException and 
rethrow CatalogException here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-08 Thread via GitHub


FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1286651330


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java:
##
@@ -97,25 +114,29 @@ public void storeCatalog(String catalogName, 
CatalogDescriptor catalog)
 throws CatalogException {
 checkOpenState();
 
-Path filePath = getCatalogPath(catalogName);
+Path catalogPath = getCatalogPath(catalogName);
 try {
-File file = filePath.toFile();
-if (file.exists()) {
+FileSystem fs = catalogPath.getFileSystem();
+
+if (fs.exists(catalogPath)) {
 throw new CatalogException(
 String.format(
 "Catalog %s's store file %s is already exist.",
-catalogName, filePath));
+catalogName, catalogPath));
+}
+
+try (FSDataOutputStream os = fs.create(catalogPath, 
WriteMode.NO_OVERWRITE)) {
+YAML_MAPPER.writeValue(os, catalog.getConfiguration().toMap());
 }
-// create a new file
-file.createNewFile();
-String yamlString = 
yaml.dumpAsMap(catalog.getConfiguration().toMap());
-FileUtils.writeFile(file, yamlString, charset);
-LOG.info("Catalog {}'s configuration saved to file {}", 
catalogName, filePath);
-} catch (Throwable e) {
+
+LOG.info("Catalog {}'s configuration saved to file {}", 
catalogName, catalogPath);
+} catch (CatalogException e) {
+throw e;
+} catch (Exception e) {

Review Comment:
   Can we refine the different exceptions here to avoid the above process for 
`CatalogException`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-03 Thread via GitHub


FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1283909486


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactory.java:
##
@@ -20,42 +20,39 @@
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.factories.CatalogStoreFactory;
-import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.FactoryHelper;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
-import static 
org.apache.flink.table.catalog.FileCatalogStoreFactoryOptions.CHARSET;
 import static 
org.apache.flink.table.catalog.FileCatalogStoreFactoryOptions.IDENTIFIER;
 import static 
org.apache.flink.table.catalog.FileCatalogStoreFactoryOptions.PATH;
 import static 
org.apache.flink.table.factories.FactoryUtil.createCatalogStoreFactoryHelper;
 
-/** CatalogStore factory for {@link FileCatalogStore}. */
+/** Catalog store factory for {@link FileCatalogStore}. */
 public class FileCatalogStoreFactory implements CatalogStoreFactory {
 
 private String path;
 
-private String charset;
-
 @Override
 public CatalogStore createCatalogStore() {
-return new FileCatalogStore(path, charset);
+return new FileCatalogStore(path);
 }
 
 @Override
-public void open(Context context) throws CatalogException {
-FactoryUtil.FactoryHelper factoryHelper = 
createCatalogStoreFactoryHelper(this, context);
+public void open(Context context) {
+FactoryHelper factoryHelper =
+createCatalogStoreFactoryHelper(this, context);
 factoryHelper.validate();
-ReadableConfig options = factoryHelper.getOptions();
 
+ReadableConfig options = factoryHelper.getOptions();
 path = options.get(PATH);
-charset = options.get(CHARSET);
 }
 
 @Override
-public void close() throws CatalogException {}
+public void close() {}

Review Comment:
   Call super.close()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-03 Thread via GitHub


FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1283908619


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java:
##
@@ -217,32 +253,10 @@ public Set listCatalogs() throws CatalogException 
{
 public boolean contains(String catalogName) throws CatalogException {
 checkOpenState();
 
-return listAllCatalogFiles().containsKey(catalogName);
-}
-
-private Map listAllCatalogFiles() throws CatalogException {
-Map files = new HashMap<>();
-File directoryFile = new File(catalogStoreDirectory);
-if (!directoryFile.isDirectory()) {
-throw new CatalogException("File catalog store only support local 
directory");
-}
-
-try {
-Files.list(directoryFile.toPath())
-.filter(file -> 
file.getFileName().toString().endsWith(FILE_EXTENSION))
-.filter(Files::isRegularFile)
-.forEach(
-p ->
-files.put(
-
p.getFileName().toString().replace(FILE_EXTENSION, ""),
-p));
-} catch (Throwable t) {
-throw new CatalogException("Failed to list file catalog store 
directory", t);
-}
-return files;
+return listCatalogs().contains(catalogName);

Review Comment:
   Directly check if the catalog file exists here? I think get all catalogs may 
be not an efficient operation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-03 Thread via GitHub


FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1283907327


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java:
##
@@ -201,8 +222,23 @@ public Optional getCatalog(String 
catalogName) throws Catalog
 @Override
 public Set listCatalogs() throws CatalogException {
 checkOpenState();
+try {
+FileStatus[] statusArr = 
catalogStorePath.getFileSystem().listStatus(catalogStorePath);
 
-return Collections.unmodifiableSet(listAllCatalogFiles().keySet());
+return Arrays.stream(statusArr)
+.filter(status -> !status.isDir())
+.map(FileStatus::getPath)
+.map(Path::getName)
+.map(filename -> filename.replace(FILE_EXTENSION, ""))
+.collect(Collectors.toSet());
+} catch (CatalogException e) {
+throw e;

Review Comment:
   The same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-03 Thread via GitHub


FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1283906870


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java:
##
@@ -172,22 +185,30 @@ public void removeCatalog(String catalogName, boolean 
ignoreIfNotExists)
 @Override
 public Optional getCatalog(String catalogName) throws 
CatalogException {
 checkOpenState();
-
-Path path = getCatalogPath(catalogName);
+Path catalogPath = getCatalogPath(catalogName);
 try {
-File file = path.toFile();
-if (!file.exists()) {
-LOG.warn("Catalog {}'s store file %s does not exist.", 
catalogName, path);
+FileSystem fs = catalogPath.getFileSystem();
+
+if (!fs.exists(catalogPath)) {
 return Optional.empty();
 }
-String content = FileUtils.readFile(file, charset);
-Map options = yaml.load(content);
-return Optional.of(CatalogDescriptor.of(catalogName, 
Configuration.fromMap(options)));
-} catch (Throwable t) {
+
+try (FSDataInputStream is = fs.open(catalogPath)) {
+Map configMap =
+YAML_MAPPER.readValue(is, new 
TypeReference>() {});
+
+CatalogDescriptor catalog =
+CatalogDescriptor.of(catalogName, 
Configuration.fromMap(configMap));
+
+return Optional.of(catalog);
+}
+} catch (CatalogException e) {
+throw e;

Review Comment:
   Same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-03 Thread via GitHub


FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1283906615


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java:
##
@@ -132,31 +153,23 @@ public void storeCatalog(String catalogName, 
CatalogDescriptor catalog)
 public void removeCatalog(String catalogName, boolean ignoreIfNotExists)
 throws CatalogException {
 checkOpenState();
-
-Path path = getCatalogPath(catalogName);
+Path catalogPath = getCatalogPath(catalogName);
 try {
-File file = path.toFile();
-if (file.exists()) {
-if (!file.isFile()) {
-throw new CatalogException(
-String.format(
-"Catalog %s's store file %s is not a 
regular file",
-catalogName, path.getFileName()));
-}
-Files.deleteIfExists(path);
-} else {
-if (!ignoreIfNotExists) {
-throw new CatalogException(
-String.format(
-"Catalog %s's store file %s is not exist", 
catalogName, path));
-}
+FileSystem fs = catalogPath.getFileSystem();
+
+if (fs.exists(catalogPath)) {
+fs.delete(catalogPath, false);
+} else if (!ignoreIfNotExists) {
+throw new CatalogException(
+String.format(
+"Catalog %s's store file %s does not exist.",
+catalogName, catalogPath));
 }
-} catch (Throwable e) {
+} catch (CatalogException e) {
+throw e;

Review Comment:
   The same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-03 Thread via GitHub


FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1283906508


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java:
##
@@ -97,25 +114,29 @@ public void storeCatalog(String catalogName, 
CatalogDescriptor catalog)
 throws CatalogException {
 checkOpenState();
 
-Path filePath = getCatalogPath(catalogName);
+Path catalogPath = getCatalogPath(catalogName);
 try {
-File file = filePath.toFile();
-if (file.exists()) {
+FileSystem fs = catalogPath.getFileSystem();
+
+if (fs.exists(catalogPath)) {
 throw new CatalogException(
 String.format(
 "Catalog %s's store file %s is already exist.",
-catalogName, filePath));
+catalogName, catalogPath));
 }
-// create a new file
-file.createNewFile();
-String yamlString = 
yaml.dumpAsMap(catalog.getConfiguration().toMap());
-FileUtils.writeFile(file, yamlString, charset);
-LOG.info("Catalog {}'s configuration saved to file {}", 
catalogName, filePath);
-} catch (Throwable e) {
+
+try (FSDataOutputStream os = fs.create(catalogPath, 
WriteMode.NO_OVERWRITE)) {
+YAML_MAPPER.writeValue(os, catalog.getConfiguration().toMap());
+}
+
+LOG.info("Catalog {}'s configuration saved to file {}", 
catalogName, catalogPath);
+} catch (CatalogException e) {
+throw e;

Review Comment:
   The handling here is a bit strange. Under what circumstances will a 
CatalogException be caught?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-03 Thread via GitHub


FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1283903488


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java:
##
@@ -46,42 +49,56 @@ public class FileCatalogStore extends AbstractCatalogStore {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(FileCatalogStore.class);
 
-private static final String FILE_EXTENSION = ".yaml";
+static final String FILE_EXTENSION = ".yaml";
 
-/** The directory path where catalog configurations will be stored. */
-private final String catalogStoreDirectory;
-
-/** The character set to use when reading and writing catalog files. */
-private final String charset;
+/** The YAML mapper to use when reading and writing catalog files. */
+private static final YAMLMapper YAML_MAPPER =
+new 
YAMLMapper().disable(YAMLGenerator.Feature.WRITE_DOC_START_MARKER);
 
-/** The YAML parser to use when reading and writing catalog files. */
-private final Yaml yaml = new Yaml();
+/** The directory path where catalog configurations will be stored. */
+private final Path catalogStorePath;
 
 /**
  * Creates a new {@link FileCatalogStore} instance with the specified 
directory path.
  *
- * @param catalogStoreDirectory the directory path where catalog 
configurations will be stored
+ * @param catalogStorePath the directory path where catalog configurations 
will be stored
  */
-public FileCatalogStore(String catalogStoreDirectory, String charset) {
-this.catalogStoreDirectory = catalogStoreDirectory;
-this.charset = charset;
+public FileCatalogStore(String catalogStorePath) {
+this.catalogStorePath = new Path(catalogStorePath);
 }
 
 /**
  * Opens the catalog store and initializes the catalog file map.
  *
- * @throws CatalogException if the catalog store directory does not exist 
or if there is an
- * error reading the directory
+ * @throws CatalogException if the catalog store directory does not exist, 
not a directory, or
+ * if there is an error reading the directory
  */
 @Override
 public void open() throws CatalogException {
-super.open();
-
 try {
+FileSystem fs = catalogStorePath.getFileSystem();
+if (!fs.exists(catalogStorePath)) {
+throw new CatalogException(
+String.format(
+"Failed to open catalog store. The catalog 
store directory %s does not exist.",
+catalogStorePath));
+}
 
-} catch (Throwable e) {
-throw new CatalogException("Failed to open file catalog store 
directory", e);
+if (!fs.getFileStatus(catalogStorePath).isDir()) {
+throw new CatalogException(
+String.format(
+"Failed to open catalog store. The given 
catalog store path %s is not a directory.",
+catalogStorePath));
+}
+} catch (CatalogException e) {
+throw e;
+} catch (Exception e) {
+throw new CatalogException(
+String.format(
+"Failed to open file catalog store directory %s.", 
catalogStorePath),
+e);
 }
+super.open();

Review Comment:
   Generally, `super.open()` is called first. Why is it called last here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org