Source code for sqlalchemy_tree.options

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
    sqlalchemy_tree.options
    ~~~~~~~~~~~~~~~~~~~~~~~

    :copyright: (C) 2012-2014 the SQLAlchemy-ORM-Tree authors and contributors
                <see AUTHORS file>.
    :license: BSD, see LICENSE for more details.
"""

from __future__ import absolute_import, division, print_function, \
    with_statement, unicode_literals

import sqlalchemy

from .types import TreeIdType, TreeLeftType, TreeRightType, TreeDepthType
from ._compat import string_types, py2map as map


[docs]class TreeOptions(object): """A container for options for one tree. :parameters: see :class:`TreeManager`. """ def __init__(self, table, instance_manager_attr, parent_id_field=None, tree_id_field=None, left_field=None, right_field=None, depth_field=None, _attach_columns=True): # Record required options for future use: self.table = table self._node_manager_attr = None self.instance_manager_attr = instance_manager_attr self.delayed_op_attr = None # FIXME: Add support for composite primary keys. assert len(table.primary_key.columns) == 1, \ "Composite primary keys not supported" # Record the primary key column for future use: [self.pk_field] = table.primary_key.columns # The parent_id_field is auto-detected by joining the table with itself # and using the right-hand side of the resulting ON clause. This default # behavior may be overidden by specifying the desired JOIN column directly # or by name. if parent_id_field is None: # Auto-detect by building a self-JOIN command: self.parent_id_field = table.join(table).onclause.right elif isinstance(parent_id_field, string_types): # Column specified by name: self.parent_id_field = table.columns[parent_id_field] else: # Column specified directly: assert isinstance(parent_id_field, sqlalchemy.Column) assert parent_id_field.table is table self.parent_id_field = parent_id_field def _check_field(table, field, name, type_): """Check field argument (one of `tree_id_field`, `left_field`, `right_field`, or `depth_field`), convert it from field name to `Column` object (if needed), create the column object (again, if needed) and check the existing `Column` object for sanity.""" columns = [getattr(table.columns, key) for key in table.columns.keys()] # If ‘field’ is not specified, we try to autodetect it from the columns # of the table based on ‘type_’. if field is None: candidates = [c for c in columns if isinstance(c.type, type_)] if len(candidates) == 1: field = candidates[0] else: field = 'tree_' + name # We assume that we'll be passed either a string or a SQLAlchemy Column # object (duck typing is not allowed). If what we're passed is a Column # object, we just need to check that if not isinstance(field, string_types): assert isinstance(field, sqlalchemy.Column) assert field.table is table # Otherwise we're passed a string, and either we find a field with that # name in the existing table Columns (likely but not necessarily if the # developer specified their own field name), or we'll have to create a # new column of the specified name and type, and insert it into the # table's column descriptions. elif field in table.columns: # Column exists: field = table.columns[field] else: # Column not found; create it: field = sqlalchemy.Column(field, type_(), nullable=False) if _attach_columns: table.append_column(field) # And return (since we know the following checks are # redundant): return field # If we found the column or the developer specified it directly, we'll # do a quick sanity check to make sure that the column has the right # type and meta-attributes: assert isinstance(field.type, type_), \ "The type of %s field should be %r" % (name, type_) assert not field.nullable, \ "The %s field should not be nullable" % name # Field passes; return to caller: return field # Create (if necessary) or validate the properties of the core tree # fields: self.tree_id_field = _check_field( table, tree_id_field, 'id', TreeIdType) self.left_field = _check_field( table, left_field, 'left', TreeLeftType) self.right_field = _check_field( table, right_field, 'right', TreeRightType) self.depth_field = _check_field( table, depth_field, 'depth', TreeDepthType) self.required_fields = ( self.tree_id_field, self.left_field, self.right_field, self.depth_field, ) if _attach_columns: self.attach_indices() def attach_indices(self): # To speed up operations, we create an index containing just the core # three fields that we care about for tree operations: self.indices = [ sqlalchemy.Index( '__'.join((self.table.name, self.tree_id_field.name, self.left_field.name, self.right_field.name)), self.tree_id_field, self.left_field, self.right_field, # NOTE: Originally there was a constraint that tree_id, left, and # right be unique, simply as a sanity check. However on some # database backends that don't properly support atomic queries # this is causing an IntegrityError during tree operations. # unique=True ), ] map(self.table.append_constraint, self.indices) def class_mapped(self, manager): "" self.class_manager = manager self.node_class = manager.node_class self.node_manager_attr = self._get_node_manager_attr() self.delayed_op_attr = self._get_delayed_op_attr() self.parent_field_name = self._get_parent_field_name() def _get_node_manager_attr(self): from .manager import TreeManager if self._node_manager_attr is None: self._node_manager_attr = [ x for x in self.node_class.__dict__.items() if isinstance(x[1], TreeManager) ][0][0] return self._node_manager_attr def _get_delayed_op_attr(self): if self.delayed_op_attr is None: self._get_node_manager_attr() self.delayed_op_attr = '__'.join( [self._node_manager_attr, 'delayed_op']) return self.delayed_op_attr def _get_parent_field_name(self): for prop in self.node_class._sa_class_manager.mapper.iterate_properties: if (len(getattr(prop, 'local_side', [])) == 1 and prop.local_side[0].name == self.parent_id_field.name): return prop.key for prop in self.node_class._sa_class_manager.mapper.iterate_properties: if (getattr(prop, 'remote_side', None) is not None and getattr(prop.remote_side, 'name', None) == self.pk_field.name): return prop.key # Above test works for SQLAlchemy 0.7, the one below for 0.8 if (len(getattr(prop, 'remote_side', [])) == 1 and self.pk_field in prop.remote_side): return prop.key raise ValueError( u"could not auto-detect parent field name; tree extension will not " u"work property without a parent relationship defined")
[docs] def order_by_clause(self): """Get an object applicable for usage as an argument for `Query.order_by()`. Used to sort subtree query by `tree_id` then `left`.""" return sqlalchemy.sql.expression.asc(self.left_field)
# FIXME: We should be sorting based on ``tree_id`` first, then ``left`` # (see disabled code below), however this was generating SQL not # accepted by SQLite. Since most sorted queries are on just one # tree in practice, ordering by just ``left`` will do for now. But # when we have time we should find a cross-database method for # ordering by multiple columns. # # return sqlalchemy.sql.expression.ClauseList( # sqlalchemy.sql.expression.asc(self.tree_id_field), # sqlalchemy.sql.expression.asc(self.left_field), #)