ivandasch commented on a change in pull request #2:
URL:
https://github.com/apache/ignite-python-thin-client/pull/2#discussion_r560816818
##########
File path: pyignite/cache.py
##########
@@ -185,10 +214,128 @@ def destroy(self):
"""
Destroys cache with a given name.
"""
- return cache_destroy(self._client, self._cache_id)
+ return cache_destroy(self.get_best_node(), self._cache_id)
@status_to_exception(CacheError)
- def get(self, key, key_hint: object=None) -> Any:
+ def _get_affinity(self, conn: 'Connection') -> Dict:
+ """
+ Queries server for affinity mappings. Retries in case
+ of an intermittent error (most probably “Getting affinity for topology
+ version earlier than affinity is calculated”).
+
+ :param conn: connection to Igneite server,
+ :return: OP_CACHE_PARTITIONS operation result value.
+ """
+ for _ in range(AFFINITY_RETRIES or 1):
+ result = cache_get_node_partitions(conn, self._cache_id)
+ if result.status == 0 and result.value['partition_mapping']:
+ break
+ time.sleep(AFFINITY_DELAY)
+
+ return result
+
+ @select_version
+ def get_best_node(
+ self, key: Any = None, key_hint: 'IgniteDataType' = None,
+ ) -> 'Connection':
+ """
+ Returns the node from the list of the nodes, opened by client, that
+ most probably contains the needed key-value pair. See IEP-23.
+
+ This method is not a part of the public API. Unless you wish to
+ extend the `pyignite` capabilities (with additional testing, logging,
+ examining connections, et c.) you probably should not use it.
+
+ :param key: (optional) pythonic key,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ :return: Ignite connection object.
+ """
+ conn = self._client.random_node
+
+ if self.client.partition_aware and key is not None:
+ if key_hint is None:
+ key_hint = AnyDataObject.map_python_type(key)
+
+ parts = -1
+
+ if self.affinity['version'] < self._client.affinity_version:
+ # update partition mapping
+ while True:
+ try:
+ self.affinity = self._get_affinity(conn)
+ break
+ except connection_errors:
+ # retry if connection failed
+ pass
+ except CacheError:
+ # server did not create mapping in time
+ return conn
+
+ # flatten it a bit
+ try:
+ self.affinity.update(self.affinity['partition_mapping'][0])
+ except IndexError:
+ return conn
+ del self.affinity['partition_mapping']
+
+ # calculate the number of partitions
+ parts = sum(
+ [len(p) for _, p in self.affinity['node_mapping'].items()]
+ ) if 'node_mapping' in self.affinity else 0
+
+ self.affinity['number_of_partitions'] = parts
+ else:
+ # get number of partitions
+ parts = self.affinity.get('number_of_partitions')
+
+ if not parts:
+ return conn
+
+ if self.affinity['is_applicable']:
+ affinity_key_id = self.affinity['cache_config'].get(
+ key_hint.type_id,
+ None
+ )
+ if affinity_key_id and isinstance(key, GenericObjectMeta):
+ key, key_hint = get_field_by_id(key, affinity_key_id)
+
+ # calculate partition for key or affinity key
+ # (algorithm is taken from `RendezvousAffinityFunction.java`)
+ base_value = key_hint.hashcode(key, self._client)
+ mask = parts - 1
+
+ if parts & mask == 0:
+ part = (base_value ^ (unsigned(base_value) >> 16)) & mask
+ else:
+ part = abs(base_value // parts)
+
+ assert 0 <= part < parts, 'Partition calculation has failed'
+
+ # search for connection
+ try:
+ node_uuid = next(
+ u for u, p
+ in self.affinity['node_mapping'].items()
+ if part in p
+ )
+ best_conn = next(
+ n for n in conn.client._nodes if n.uuid == node_uuid
+ )
+ if best_conn.alive:
Review comment:
what if best_conn is None? Catch StopIteration? Yuk...
```
node_uuid, best_conn = None, None
for u, p in self.affinity['node_mapping'].items():
if part in p:
node_uuid = u
break
if node_uuid:
for n in conn.client._nodes:
if n.uuid == node_uuid
best_conn = n
if best_conn and best_conn.alive:
conn = best_conn
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]