Copilot commented on code in PR #2124:
URL: https://github.com/apache/libcloud/pull/2124#discussion_r2816395542
##########
libcloud/compute/drivers/ovh.py:
##########
@@ -148,14 +139,33 @@ def create_node(self, name, image, size, location,
ex_keyname=None):
"flavorId": size.id,
"region": location.id,
}
+ if auth and ex_keyname:
+ raise AttributeError("Cannot specify auth and ex_keyname together")
+
+ if auth:
+ auth = self._get_and_check_auth(auth)
+ # pylint: disable=no-member
+ key =
self.ex_find_or_import_keypair_by_key_material(location,auth.pubkey)
+ ex_keyname = key["keyName"]
if ex_keyname:
key_id = self.get_key_pair(ex_keyname, location).extra["id"]
data["sshKeyId"] = key_id
response = self.connection.request(action, data=data, method="POST")
return self._to_node(response.object)
-
+
+ def ex_find_or_import_keypair_by_key_material(self, location, pubkey):
Review Comment:
Missing documentation for the new public method
`ex_find_or_import_keypair_by_key_material`. Public methods should have
docstrings that describe their purpose, parameters, return values, and any
exceptions they may raise. Consider following the pattern from the EC2 driver's
implementation for consistency.
```suggestion
def ex_find_or_import_keypair_by_key_material(self, location, pubkey):
"""
Find an existing key pair matching the given public key material or
import it as a new key pair in the specified location.
:param location: Location (region) where the key pair should be
looked up or imported.
:type location: :class:`NodeLocation`
:param pubkey: Public key material (for example, in OpenSSH format).
:type pubkey: ``str``
:return: Dictionary containing key pair information. For an existing
key pair this includes ``keyName`` and ``keyId``; for a
newly imported key pair this includes ``keyName`` and
``keyFingerprint``.
:rtype: ``dict``
:raises Exception: If listing existing key pairs or importing a new
key pair fails in the underlying API.
"""
```
##########
libcloud/compute/drivers/ovh.py:
##########
@@ -148,14 +139,33 @@ def create_node(self, name, image, size, location,
ex_keyname=None):
"flavorId": size.id,
"region": location.id,
}
+ if auth and ex_keyname:
+ raise AttributeError("Cannot specify auth and ex_keyname together")
+
+ if auth:
+ auth = self._get_and_check_auth(auth)
+ # pylint: disable=no-member
+ key =
self.ex_find_or_import_keypair_by_key_material(location,auth.pubkey)
+ ex_keyname = key["keyName"]
Review Comment:
Missing test coverage for the new `auth` parameter functionality. The
existing `test_create_node` test does not cover the new authentication
parameter flow. Consider adding tests for: 1) creating a node with the `auth`
parameter and a new keypair, 2) creating a node with the `auth` parameter when
the keypair already exists, and 3) the mutual exclusivity validation that
raises AttributeError when both `auth` and `ex_keyname` are provided.
```suggestion
key = self.ex_find_or_import_keypair_by_key_material(location,
auth.pubkey)
ex_keyname = key.get("keyName") or key.get("name")
if not ex_keyname:
raise AttributeError("Unable to determine key name from
auth-provided key material")
```
##########
libcloud/compute/drivers/ovh.py:
##########
@@ -148,14 +139,33 @@ def create_node(self, name, image, size, location,
ex_keyname=None):
"flavorId": size.id,
"region": location.id,
}
+ if auth and ex_keyname:
+ raise AttributeError("Cannot specify auth and ex_keyname together")
+
+ if auth:
+ auth = self._get_and_check_auth(auth)
+ # pylint: disable=no-member
+ key =
self.ex_find_or_import_keypair_by_key_material(location,auth.pubkey)
+ ex_keyname = key["keyName"]
if ex_keyname:
key_id = self.get_key_pair(ex_keyname, location).extra["id"]
data["sshKeyId"] = key_id
response = self.connection.request(action, data=data, method="POST")
return self._to_node(response.object)
-
+
Review Comment:
Trailing whitespace detected. Remove the trailing whitespace at the end of
this line to maintain consistency with the rest of the codebase and adhere to
Python style guidelines (PEP 8).
##########
libcloud/compute/drivers/ovh.py:
##########
@@ -148,14 +139,33 @@ def create_node(self, name, image, size, location,
ex_keyname=None):
"flavorId": size.id,
"region": location.id,
}
+ if auth and ex_keyname:
+ raise AttributeError("Cannot specify auth and ex_keyname together")
+
+ if auth:
+ auth = self._get_and_check_auth(auth)
+ # pylint: disable=no-member
+ key =
self.ex_find_or_import_keypair_by_key_material(location,auth.pubkey)
+ ex_keyname = key["keyName"]
if ex_keyname:
key_id = self.get_key_pair(ex_keyname, location).extra["id"]
data["sshKeyId"] = key_id
response = self.connection.request(action, data=data, method="POST")
return self._to_node(response.object)
-
+
+ def ex_find_or_import_keypair_by_key_material(self, location, pubkey):
+ key_pairs = self.list_key_pairs(ex_location=location)
+ for key_pair in key_pairs:
+ if key_pair.public_key == pubkey:
+ return {"keyName": key_pair.name, "keyId":
key_pair.extra["id"]}
+ key_fingerprint = get_pubkey_ssh2_fingerprint(pubkey)
+ key_comment = get_pubkey_comment(pubkey, default="unnamed")
+ key_name = "{}-{}".format(key_comment, key_fingerprint)
+ key_pair=self.import_key_pair_from_string(key_name, pubkey,
ex_location=location)
Review Comment:
Missing space around assignment operator. Should be `key_pair =
self.import_key_pair_from_string(...)` for consistency with Python style
guidelines (PEP 8).
```suggestion
key_pair = self.import_key_pair_from_string(key_name, pubkey,
ex_location=location)
```
##########
libcloud/compute/drivers/ovh.py:
##########
@@ -148,14 +139,33 @@ def create_node(self, name, image, size, location,
ex_keyname=None):
"flavorId": size.id,
"region": location.id,
}
+ if auth and ex_keyname:
+ raise AttributeError("Cannot specify auth and ex_keyname together")
+
+ if auth:
+ auth = self._get_and_check_auth(auth)
+ # pylint: disable=no-member
+ key =
self.ex_find_or_import_keypair_by_key_material(location,auth.pubkey)
+ ex_keyname = key["keyName"]
if ex_keyname:
key_id = self.get_key_pair(ex_keyname, location).extra["id"]
data["sshKeyId"] = key_id
response = self.connection.request(action, data=data, method="POST")
return self._to_node(response.object)
-
+
+ def ex_find_or_import_keypair_by_key_material(self, location, pubkey):
+ key_pairs = self.list_key_pairs(ex_location=location)
+ for key_pair in key_pairs:
+ if key_pair.public_key == pubkey:
+ return {"keyName": key_pair.name, "keyId":
key_pair.extra["id"]}
+ key_fingerprint = get_pubkey_ssh2_fingerprint(pubkey)
+ key_comment = get_pubkey_comment(pubkey, default="unnamed")
+ key_name = "{}-{}".format(key_comment, key_fingerprint)
+ key_pair=self.import_key_pair_from_string(key_name, pubkey,
ex_location=location)
+ return {"keyName": key_pair.name, "keyFingerprint":
key_pair.fingerprint}
Review Comment:
Inconsistent return value structure and potential issue with None value.
Line 162 returns `keyId` (from `key_pair.extra["id"]`) but line 167 returns
`keyFingerprint` (from `key_pair.fingerprint`). However, the `_to_key_pair`
method (line 612) sets `fingerprint=None` for OVH keypairs, so this will return
None instead of a valid fingerprint value. Both code paths should return the
same dictionary structure with the same keys. Since the calling code on line
149 only uses `key["keyName"]`, you should either: 1) return `keyId` in both
cases (preferred, for consistency), or 2) ensure that only `keyName` is used
and remove the extra keys from the return values.
```suggestion
key_pair = self.import_key_pair_from_string(
key_name, pubkey, ex_location=location
)
return {"keyName": key_pair.name, "keyId": key_pair.extra["id"]}
```
##########
libcloud/compute/drivers/ovh.py:
##########
@@ -148,14 +139,33 @@ def create_node(self, name, image, size, location,
ex_keyname=None):
"flavorId": size.id,
"region": location.id,
}
+ if auth and ex_keyname:
+ raise AttributeError("Cannot specify auth and ex_keyname together")
+
+ if auth:
+ auth = self._get_and_check_auth(auth)
+ # pylint: disable=no-member
+ key =
self.ex_find_or_import_keypair_by_key_material(location,auth.pubkey)
+ ex_keyname = key["keyName"]
if ex_keyname:
key_id = self.get_key_pair(ex_keyname, location).extra["id"]
data["sshKeyId"] = key_id
response = self.connection.request(action, data=data, method="POST")
return self._to_node(response.object)
-
+
+ def ex_find_or_import_keypair_by_key_material(self, location, pubkey):
+ key_pairs = self.list_key_pairs(ex_location=location)
+ for key_pair in key_pairs:
+ if key_pair.public_key == pubkey:
+ return {"keyName": key_pair.name, "keyId":
key_pair.extra["id"]}
+ key_fingerprint = get_pubkey_ssh2_fingerprint(pubkey)
+ key_comment = get_pubkey_comment(pubkey, default="unnamed")
+ key_name = "{}-{}".format(key_comment, key_fingerprint)
+ key_pair=self.import_key_pair_from_string(key_name, pubkey,
ex_location=location)
Review Comment:
Potential issue with key name collision. If the same public key material
already exists in OVH but with a different name than the generated one
(computed from fingerprint and comment), attempting to import it on line 166
may fail if OVH enforces uniqueness on public key material. Consider handling
potential exceptions from `import_key_pair_from_string` and checking if the
failure is due to a duplicate key, in which case you should search for the
existing key by its fingerprint or public key material and return that instead.
```suggestion
try:
key_pair = self.import_key_pair_from_string(
key_name, pubkey, ex_location=location
)
except Exception as e:
# If the provider enforces uniqueness on public key material,
trying to
# import an already-existing key (possibly under a different
name) may
# raise an error. In that case, attempt to find and return the
existing
# key instead of failing.
message = str(e).lower()
duplicate_indicators = ("already exists", "duplicate",
"conflict")
if not any(indicator in message for indicator in
duplicate_indicators):
# Not clearly a duplicate-key situation; propagate the error.
raise
# Best-effort lookup of an existing key that matches this
material.
key_pairs = self.list_key_pairs(ex_location=location)
for key_pair in key_pairs:
if getattr(key_pair, "public_key", None) == pubkey:
return {
"keyName": key_pair.name,
"keyFingerprint": getattr(key_pair, "fingerprint",
None),
}
if getattr(key_pair, "fingerprint", None) == key_fingerprint:
return {
"keyName": key_pair.name,
"keyFingerprint": key_pair.fingerprint,
}
# No matching key could be found even though the provider
reported a
# duplicate; re-raise to avoid hiding a potential inconsistency.
raise
```
##########
libcloud/compute/drivers/ovh.py:
##########
@@ -148,14 +139,33 @@ def create_node(self, name, image, size, location,
ex_keyname=None):
"flavorId": size.id,
"region": location.id,
}
+ if auth and ex_keyname:
+ raise AttributeError("Cannot specify auth and ex_keyname together")
+
+ if auth:
+ auth = self._get_and_check_auth(auth)
+ # pylint: disable=no-member
+ key =
self.ex_find_or_import_keypair_by_key_material(location,auth.pubkey)
+ ex_keyname = key["keyName"]
if ex_keyname:
key_id = self.get_key_pair(ex_keyname, location).extra["id"]
data["sshKeyId"] = key_id
response = self.connection.request(action, data=data, method="POST")
return self._to_node(response.object)
-
+
+ def ex_find_or_import_keypair_by_key_material(self, location, pubkey):
+ key_pairs = self.list_key_pairs(ex_location=location)
+ for key_pair in key_pairs:
+ if key_pair.public_key == pubkey:
+ return {"keyName": key_pair.name, "keyId":
key_pair.extra["id"]}
+ key_fingerprint = get_pubkey_ssh2_fingerprint(pubkey)
+ key_comment = get_pubkey_comment(pubkey, default="unnamed")
+ key_name = "{}-{}".format(key_comment, key_fingerprint)
+ key_pair=self.import_key_pair_from_string(key_name, pubkey,
ex_location=location)
+ return {"keyName": key_pair.name, "keyFingerprint":
key_pair.fingerprint}
+
Review Comment:
Trailing whitespace detected. Remove the trailing whitespace at the end of
this line to maintain consistency with the rest of the codebase and adhere to
Python style guidelines (PEP 8).
##########
libcloud/compute/drivers/ovh.py:
##########
@@ -148,14 +139,33 @@ def create_node(self, name, image, size, location,
ex_keyname=None):
"flavorId": size.id,
"region": location.id,
}
+ if auth and ex_keyname:
+ raise AttributeError("Cannot specify auth and ex_keyname together")
+
+ if auth:
+ auth = self._get_and_check_auth(auth)
+ # pylint: disable=no-member
+ key =
self.ex_find_or_import_keypair_by_key_material(location,auth.pubkey)
Review Comment:
Missing space after comma. Should be
`self.ex_find_or_import_keypair_by_key_material(location, auth.pubkey)` for
consistency with Python style guidelines (PEP 8).
```suggestion
key = self.ex_find_or_import_keypair_by_key_material(location,
auth.pubkey)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]