вторник, 22 августа 2017 г., 17:29:41 UTC+3 пользователь Mike Bayer написал:
>
> On Tue, Aug 22, 2017 at 3:43 AM, Антонио Антуан <[email protected]
> <javascript:>> wrote:
> > Hi guys
> > I tried to implement horizontal sharding in my project. Everything is
> ok,
> > except bulk_inserts.
> > When I run tests, I got this error:
> > File "/home/anton/Projects/proj/core/model/messages.py", line 210, in
> > create
> > Session.bulk_insert_mappings(MessageEntity, to_commit)
> > File "build/bdist.linux-x86_64/egg/sqlalchemy/orm/scoping.py", line
> 157,
> > in do
> > return getattr(self.registry(), name)(*args, **kwargs)
> > File "build/bdist.linux-x86_64/egg/sqlalchemy/orm/session.py", line
> 2345,
> > in bulk_insert_mappings
> > mapper, mappings, False, False, return_defaults, False)
> > File "build/bdist.linux-x86_64/egg/sqlalchemy/orm/session.py", line
> 2416,
> > in _bulk_save_mappings
> > transaction.rollback(_capture_exception=True)
> > File "build/bdist.linux-x86_64/egg/sqlalchemy/util/langhelpers.py",
> line
> > 60, in __exit__
> > compat.reraise(exc_type, exc_value, exc_tb)
> > File "build/bdist.linux-x86_64/egg/sqlalchemy/orm/session.py", line
> 2411,
> > in _bulk_save_mappings
> > mapper, mappings, transaction, isstates, return_defaults)
> > File "build/bdist.linux-x86_64/egg/sqlalchemy/orm/persistence.py",
> line
> > 35, in _bulk_insert
> > "connection_callable / per-instance sharding "
> > NotImplementedError: connection_callable / per-instance sharding not
> > supported in bulk_insert()
> >
> > I do not understand what 'connection_callable' does, but I really need
> to
> > implement it. Is there any ways to do it?
>
> The idea of bulk_insert() is strictly one of performance with some
> degree of convenience; you don't need it to actually accomplish
> anything as there are many other ways to achieve what it does. the
> "connection_callable" thing is specifically so that an ORM persistence
> operation can get at the appropriate database connection for a
> particular INSERT/UPDATE/DELETE, however this adds complexity so is
> not part of bulk_insert().
>
> In the case of application side horizontal sharding, you'd want to
> take the data you're passing to it and pre-sort it into individual
> per-shard sets, then use a Session per shard to run the inserts.
>
> Also, application side horizontal sharding is not a great way to do
> sharding in any case. If you're on Postgresql I'd strongly recommend
> using table inheritance instead.
>
Yes, Postgresql used into a project. But I can't use inheritance, because I
have several servers, each of them located in separate country.
Yes, I could use inheritance and FDW, but in that case system become more
complicated.
When application instance runs on master node - it can read and write to
master database and can only read from geo-slaves.
When application instance runs on geo-slave - it can read from master, read
from replicated table on current node and write data (some statistics) to
some table on current node.
Periodically master read data-tables (statistics) from each geo-slave and
write new data on master database.
Previously my project used something like that:
class ClusterMeta(object):
_pg_config = {}
_pg_sessions = defaultdict(list)
is_slave = config.get('default', 'is_slave')
prefix = config.get('default', 'prefix')
def _get_pg_conf(self):
for name, conn_string in config['sqlalchemy'].items():
if name.startswith('conn_string_slave_'):
sp = name.replace('conn_string_slave_', '').lower()
self._pg_config[sp] = conn_string
elif name in ['conn_string_master', 'conn_string']:
sp = 'default' if name == 'conn_string' else 'master'
if sp in self._pg_config:
raise ValueError('{} conn_string already
configured!'.format(sp.upper()))
else:
self._pg_config[sp] = conn_string
def _create_pg_session(self, conn_string):
"""
:rtype: tuple(base, session)
"""
engine = create_engine(conn_string,
convert_unicode=True)
session = ScopedSession(sessionmaker(bind=engine))
base = declarative_base()
base.query = session.query_property()
return base, session
def get_session(self, prefix, with_base=False):
if prefix not in self._pg_sessions:
conn_string = self._pg_config.get(prefix)
if not conn_string:
raise ValueError('No conn_string for prefix "%s"' % prefix)
if prefix not in self._pg_sessions:
self._pg_sessions[prefix] = []
s = self._create_pg_session(conn_string)
self._pg_sessions[prefix].insert(0, s)
else:
s = self._pg_sessions[prefix][0]
return s if with_base else s[1]
cluster_meta = ClusterMeta()
# Master
Base, Session = cluster_meta.get_session('default', with_base=True)
# Slave
if cluster_meta.is_slave:
BaseSlave, SessionSlave =
cluster_meta.get_session(cluster_meta.slave_prefix, with_base=True)
else:
BaseSlave, SessionSlave = None, None, None, None
'default' is replica on geo-node or master-db on master-node.
SessionSlave is a session connected to database with statistics data on
particular node.As you can see, we have several Base, so we need to create
several models for each of them.
Now we union two databases on geo node (replica and data) on one database,
never write to replica tables and still write to data-tables.
"_create_pg_session" looks now like that:
def _create_pg_session(self):
"""
:rtype: tuple(base, session)
"""
engines = {}
for prefix, conn_string in self._pg_config.items():
engines[prefix] = create_engine(conn_string,
convert_unicode=True)
create_session = sessionmaker(bind=engines['default'],
class_=ShardedSessionWithDefaultBind)
choser_list = lambda *a, **kw: ['default']
choser_one = lambda *a, **kw: 'default'
create_session.configure(shards=engines,
shard_chooser=choser_one,
id_chooser=choser_list,
query_chooser=choser_list)
session = ScopedSession(create_session)
base = declarative_base()
base.query = session.query_property()
return base, session
...
class ShardedSessionWithDefaultBind(ShardedSession):
def get_bind(self, mapper, shard_id=None, instance=None, clause=None, **kw):
if shard_id is None:
shard_id = 'default'
return super(ShardedSessionWithDefaultBind, self).get_bind(mapper,
shard_id, instance, clause, **kw)
Base, Session = cluster_meta.get_session('default', with_base=True)
So, and now we always make queries without shard_id chosing: we always make
queries on 'default' database. One exception: when we run app on master and
read statistics from geo-slaves, we point it explicitly:
"Session.query(...).set_shard_id(NODE_PREFIX)". So, yes, we use sharding
mechanizm only for that.
My question still the same: can I override "connection_callable" member
with None on my "ShardedSessionWithDefaultBind" class?
Also, if you can advise any other mechanism to make things as simple as we
have them now (with horizontal_sharding mechanism), it would be great, our
team will appreciate it :)
> >
> > If it can help: I ALWAYS perform UPDATE/INSERT/DELETE and most of SELECT
> > queries in only one database. So, I wrote such subclass:
> >
> >
> > class ShardedSessionWithDefaultBind(ShardedSession):
> > def get_bind(self, mapper, shard_id=None, instance=None,
> clause=None,
> > **kw):
> > if shard_id is None:
> > shard_id = default_shard_id
> > return super(ShardedSessionWithDefaultBind,
> self).get_bind(mapper,
> > shard_id, instance, clause, **kw)
> >
> >
> > Maybe I can override "__init__" for my class and write
> > "self.connnection_callable = None"?
> > My research of sqlalchemy code didn't make me sure that it is safe
> enough.
> > But I see, that "connection_callable" used only for checking into
> > "_bulk_insert" and "_bulk_update" functions in
> sqlalchemy.orm.persistence
> > module.
> > So, if I 100% sure, that I ALWAYS perform INSERT/UPDATE/DELETE queries
> in
> > only one database, can I make it?
> >
> > --
> > SQLAlchemy -
> > The Python SQL Toolkit and Object Relational Mapper
> >
> > http://www.sqlalchemy.org/
> >
> > To post example code, please provide an MCVE: Minimal, Complete, and
> > Verifiable Example. See http://stackoverflow.com/help/mcve for a full
> > description.
> > ---
> > You received this message because you are subscribed to the Google
> Groups
> > "sqlalchemy" group.
> > To unsubscribe from this group and stop receiving emails from it, send
> an
> > email to [email protected] <javascript:>.
> > To post to this group, send email to [email protected]
> <javascript:>.
> > Visit this group at https://groups.google.com/group/sqlalchemy.
> > For more options, visit https://groups.google.com/d/optout.
>
--
SQLAlchemy -
The Python SQL Toolkit and Object Relational Mapper
http://www.sqlalchemy.org/
To post example code, please provide an MCVE: Minimal, Complete, and Verifiable
Example. See http://stackoverflow.com/help/mcve for a full description.
---
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.