SQLAlchemy 的连接创建是 Lazy 的方式, 即在需要使用时才会去真正创建. 之前做的工作, 全是"定义".
连接的定义是在 engine 中做的.
engine 的定义包含了三部分的内容, 一是具体数据库类型的实现, 二是连接池, 三是策略(即engine 自己的实现).
所谓的数据库类型即是 MYSQL , Postgresql , SQLite 这些不同的数据库.
一般创建 engine 是使用 create_engine 方法:
engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/mydatabase')参数字符串的各部分的意义:
dialect+driver://username:password@host:port/database对于这个字符串, SQLAlchemy 提供了工具可用于处理它:
# -*- coding: utf-8 -*- from sqlalchemy import create_engine from sqlalchemy.engine.url import make_url from sqlalchemy.engine.url import URL s = 'postgresql://test@localhost:5432/bbcustom' url = make_url(s) s = URL(drivername='postgresql', username='test', password="", host="localhost", port=5432, database="bbcustom") engine = create_engine(url) engine = create_engine(s) print engine.execute('select id from "user"').fetchall()create_engine 函数有很多的控制参数, 这个后面再详细说.
create_engine 的调用, 实际上会变成 strategy.create 的调用. 而 strategy 就是 engine 的实现细节. strategy 可以在 create_engine 调用时通过 strategy 参数指定, 目前官方的支持有三种:
plain, 默认的threadlocal, 连接是线程局部的mock, 所有的 SQL 语句的执行会使用指定的函数mock 这个实现, 会把所有的 SQL 语句的执行交给指定的函数来做, 这个函数是由 create_engine的 executor 参数指定:
def f(sql, *args, **kargs): print sql, args, kargs s = 'postgresql://test@localhost:5432/bbcustom' engine = create_engine(s, strategy='mock', executor=f) print engine.execute('select id from "user"')各数据库的实现在 SQLAlchemy 中分成了两个部分, 一是数据库的类型, 二是具体数据库中适配的客户端实现. 比如对于 Postgresql 的访问, 可以使用 psycopg2 , 也可以使用 pg8000 :
s = 'postgresql+psycopg2://test@localhost:5432/bbcustom' s = 'postgresql+pg8000://test@localhost:5432/bbcustom' engine = create_engine(s)具体的适配工作, 是需要在代码中实现一个 Dialect 类来完成的. 官方的实现在 dialects 目录下.
获取具体的 Dialect 的行为, 则是前面提到的 URL 对象的 get_dialect 方法. create_engine 时你单传一个字符串, SQLAlchemy 自己也会使用 make_url 得到一个 URL 的实例).
SQLAlchemy 支持连接池, 在 create_engine 时添加相关参数即可使用.
pool_size 连接数max_overflow 最多多几个连接pool_recycle 连接重置周期pool_timeout 连接超时时间连接池效果:
# -*- coding: utf-8 -*- from sqlalchemy import create_engine from sqlalchemy.engine.url import make_url from sqlalchemy.engine.url import URL s = 'postgresql://test@localhost:5432/bbcustom' engine = create_engine(s, pool_size=2, max_overflow=0) from threading import Thread def f(): print engine.execute('select pg_sleep(5)').fetchall() p = [] for i in range(3): p.append(Thread(target=f)) for t in p: t.start()连接池的实现, 在 create_engine 调用时也可以指定:
from sqlalchemy.pool import QueuePool engine = create_engine('sqlite:///file.db', poolclass=QueuePool)还有:
from sqlalchemy.pool import NullPool engine = create_engine( 'postgresql+psycopg2://scott:tiger@localhost/test', poolclass=NullPool)或者仅仅是获取连接的方法:
import sqlalchemy.pool as pool import psycopg2 def getconn(): c = psycopg2.connect(username='ed', host='127.0.0.1', dbname='test') # do things with 'c' to set up return c engine = create_engine('postgresql+psycopg2://', creator=getconn)连接池可以被单独使用:
import sqlalchemy.pool as pool import psycopg2 def getconn(): c = psycopg2.connect(username='ed', host='127.0.0.1', dbname='test') return c mypool = pool.QueuePool(getconn, max_overflow=10, pool_size=5) conn = mypool.connect() cursor = conn.cursor() cursor.execute("select foo")连接池可以被多个 engine 共享使用:
e = create_engine('postgresql://', pool=mypool)对于 Table 的定义, 本来是直接的实例化调用, 通过 declarative 的包装, 可以像"定义类"这样的更直观的方式来完成.
user = Table('user', metadata, Column('user_id', Integer, primary_key = True), Column('user_name', String(16), nullable = False), Column('email_address', String(60)), Column('password', String(20), nullable = False) ) # -*- coding: utf-8 -*- from sqlalchemy import create_engine from sqlalchemy import Column from sqlalchemy.types import String, Integer, CHAR, BIGINT from sqlalchemy.ext.declarative import declarative_base BaseModel = declarative_base() Engine = create_engine('postgresql://test@localhost:5432/test', echo=True) class Blog(BaseModel): __tablename__ = 'blog' id = Column(CHAR(32), primary_key=True) title = Column(String(64), server_default='', nullable=False) text = Column(String, server_default='', nullable=False) user = Column(CHAR(32), index=True, server_default='', nullable=False) create = Column(BIGINT, index=True, server_default='0', nullable=False) class User(BaseModel): __tablename__ = 'user' id = Column(CHAR(32), primary_key=True) name = Column(String(32), server_default='', nullable=False) username = Column(String(32), index=True, server_default='', nullable=False) password = Column(String(64), server_default='', nullable=False) def init_db(): BaseModel.metadata.create_all(Engine) def drop_db(): BaseModel.metadata.drop_all(Engine) if __name__ == '__main__': #init_db() drop_db() #BaseModel.metadata.tables['user'].create(Engine, checkfirst=True) #BaseModel.metadata.tables['user'].drop(Engine, checkfirst=False) pass执行的顺序并不一定会和代码顺序一致, SQLAlchemy 自己会整合逻辑再执行.
SQLAlchemy 实现的查询非常强大, 写起来有一种随心所欲的感觉.
查询的结果, 有几种不同的类型, 这个需要注意, 像是:
instanceinstance of listkeyed tuple of listvalue of list 基本查询 session.query(User).filter_by(username='abc').all() session.query(User).filter(User.username=='abc').all() session.query(Blog).filter(Blog.create >= 0).all() session.query(Blog).filter(Blog.create >= 0).first() session.query(Blog).filter(Blog.create >= 0 | Blog.title == 'A').first() session.query(Blog).filter(Blog.create >= 0 & Blog.title == 'A').first() session.query(Blog).filter(Blog.create >= 0).offset(1).limit(1).scalar() session.query(User).filter(User.username == 'abc').scalar() session.query(User.id).filter(User.username == 'abc').scalar() session.query(Blog.id).filter(Blog.create >= 0).all() session.query(Blog.id).filter(Blog.create >= 0).all()[0].id dict(session.query(Blog.id, Blog.title).filter(Blog.create >= 0).all()) session.query(Blog.id, Blog.title).filter(Blog.create >= 0).first().title session.query(User.id).order_by('id desc').all() session.query(User.id).order_by('id').first() session.query(User.id).order_by(User.id).first() session.query(User.id).order_by(-User.id).first() session.query('id', 'username').select_from(User).all() session.query(User).get('16e19a64d5874c308421e1a835b01c69') 多表查询 session.query(Blog, User).filter(Blog.user == User.id).first().User.username session.query(Blog, User.id, User.username).filter(Blog.user == User.id).first().id session.query(Blog.id, User.id, User.username).filter(Blog.user == User.id).first().keys() 条件查询 from sqlalchemy import or_, not_ session.query(User).filter(or_(User.id == '', User.id == '16e19a64d5874c308421e1a835b01c69')).all() session.query(User).filter(not_(User.id == '16e19a64d5874c308421e1a835b01c69')).all() session.query(User).filter(User.id.in_(['16e19a64d5874c308421e1a835b01c69'])).all() session.query(User).filter(User.id.like('16e19a%')).all() session.query(User).filter(User.id.startswith('16e19a')).all() dir(User.id) 函数 from sqlalchemy import func session.query(func.count('1')).select_from(User).scalar() session.query(func.count('1'), func.max(User.username)).select_from(User).first() session.query(func.count('1')).select_from(User).scalar() session.query(func.md5(User.username)).select_from(User).all() session.query(func.current_timestamp()).scalar() session.query(User).count()还是通常的两种方式:
session.query(User).filter(User.username == 'abc').update({'name': '123'}) session.commit() user = session.query(User).filter_by(username='abc').scalar() user.name = '223' session.commit()如果涉及对属性原值的引用, 则要考虑 synchronize_session 这个参数.
'evaluate' 默认值, 会同时修改当前 session 中的对象属性. 'fetch' 修改前, 会先通过 select 查询条目的值. False 不修改当前 session 中的对象属性.在默认情况下, 因为会有修改当前会话中的对象属性, 所以如果语句中有 SQL 函数, 或者"原值引用", 那是无法完成的操作, 自然也会报错, 比如:
from sqlalchemy import func session.query(User).update({User.name: func.trim('123 ')}) session.query(User).update({User.name: User.name + 'x'})这种情况下, 就不能要求 SQLAlchemy 修改当前 session 的对象属性了, 而是直接进行数据库的交互, 不管当前会话值:
session.query(User).update({User.name: User.name + 'x'}, synchronize_session=False)是否修改当前会话的对象属性, 涉及到当前会话的状态. 如果当前会话过期, 那么在获取相关对象的属性值时, SQLAlchemy 会自动作一次数据库查询, 以便获取正确的值:
user = session.query(User).filter_by(username='abc').scalar() print user.name session.query(User).update({User.name: 'new'}, synchronize_session=False) print user.name session.commit() print user.name执行了 update 之后, 虽然相关对象的实际的属性值已变更, 但是当前会话中的对象属性值并没有改变. 直到 session.commit() 之后, 当前会话变成"过期"状态, 再次获取 user.name 时, SQLAlchemy 通过 user 的 id 属性, 重新去数据库查询了新值. (如果 user 的 id 变了呢? 那就会出事了啊.)
synchronize_session 设置成 'fetch' 不会有这样的问题, 因为在做 update 时已经修改了当前会话中的对象了.
不管 synchronize_session 的行为如何, commit 之后 session 都会过期, 再次获取相关对象值时, 都会重新作一次查询.
删除同样有像修改一样的 synchronize_session 参数的问题, 影响当前会话的状态.
SQLAlchemy 可以很直观地作 join 的支持:
r = session.query(Blog, User).join(User, Blog.user == User.id).all() for blog, user in r: print blog.id, blog.user, user.id r = session.query(Blog, User.name, User.username).join(User, Blog.user == User.id).all() print r使用 ForeignKey 来定义一个外键约定:
from sqlalchemy import Column, ForeignKey from sqlalchemy.types import String, Integer, CHAR, BIGINT class Blog(BaseModel): __tablename__ = 'blog' id = Column(BIGINT, primary_key=True, autoincrement=True) title = Column(String(64), server_default='', nullable=False) text = Column(String, server_default='', nullable=False) user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False) create = Column(BIGINT, index=True, server_default='0', nullable=False) class User(BaseModel): __tablename__ = 'user' id = Column(BIGINT, primary_key=True, autoincrement=True) name = Column(String(32), server_default='', nullable=False) username = Column(String(32), index=True, server_default='', nullable=True) password = Column(String(64), server_default='', nullable=False)创建时:
session = Session() user = User(name='first', username=u'新的') session.add(user) session.flush() blog = Blog(title=u'第一个', user=user.id) session.add(blog) session.commit()session.flush() 是进行数据库交互, 但是事务并没有提交. 进行数据库交互之后, user.id 才有值.
定义了外键, 对查询来说, 并没有影响. 外键只是单纯的一条约束而已. 当然, 可以在外键上定义一些关联的事件操作, 比如当外键条目被删除时, 字段置成 null , 或者关联条目也被删除等.
要定义关系, 必有使用 ForeignKey 约束. 当然, 这里说的只是在定义模型时必有要有, 至于数据库中是否真有外键约定, 这并不重要.
from sqlalchemy import Column, ForeignKey from sqlalchemy.types import String, Integer, CHAR, BIGINT from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship class Blog(BaseModel): __tablename__ = 'blog' id = Column(BIGINT, primary_key=True, autoincrement=True) title = Column(String(64), server_default='', nullable=False) user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False) create = Column(BIGINT, index=True, server_default='0', nullable=False) user_obj = relationship('User') class User(BaseModel): __tablename__ = 'user' id = Column(BIGINT, primary_key=True, autoincrement=True) name = Column(String(32), server_default='', nullable=False) blog_list = relationship('Blog', order_by='Blog.create')关系只是 SQLAlchemy 提供的工具, 与数据库无关, 所以任何时候添加都是可以的.
上面的 User-Blog 是一个"一对多"关系, 通过 Blog 的 user 这个 ForeignKey , SQLAlchemy 可以自动处理关系的定义. 在查询时, 返回的结果自然也是, 一个是列表, 一个是单个对象:
session = Session() print session.query(Blog).get(1).user_obj print session.query(User).get(1).blog_list这种关系的定义, 并不影响查询并获取对象的行为, 不会添加额外的 join 操作. 在对象上取一个user_obj 或者取 blog_list 都是发生了一个新的查询操作.
上面的关系定义, 对应的属性是实际查询出的实例列表, 当条目数多的时候, 这样可能会有问题. 比如用户名下有成千上万的文章, 一次全取出就太暴力了. 关系对应的属性可以定义成一个 Query :
class User(BaseModel): __tablename__ = 'user' id = Column(BIGINT, primary_key=True, autoincrement=True) name = Column(String(32), server_default='', nullable=False) blog_list = relationship('Blog', order_by='Blog.create', lazy="dynamic")这样在获取实例时就可以自由控制了:
session.query(User).get(1).blog_list.all() session.query(User).get(1).blog_list.filter(Blog.title == 'abc').first()关系定义之后, 除了在查询时会有自动关联的效果, 在作查询时, 也可以对定义的关系做操作:
class Blog(BaseModel): __tablename__ = 'blog' id = Column(Integer, autoincrement=True, primary_key=True) title = Column(Unicode(32), server_default='') user = Column(Integer, ForeignKey('user.id'), index=True) user_obj = relationship('User') class User(BaseModel): __tablename__ = 'user' id = Column(Integer, autoincrement=True, primary_key=True) name = Column(Unicode(32), server_default='') blogs = relationship('Blog')对于 一对多 的关系, 使用 any() 函数查询:
user = session.query(User).filter(User.blogs.any(Blog.title == u'A')).first()SQLAlchemy 会使用 exists 条件, 类似于:
SELECT * FROM user WHERE EXISTS (SELECT 1 FROM blog WHERE user.id = blog.user AND blog.title = ?) LIMIT ? OFFSET ?反之, 如果是 多对一 的关系, 则使用 has() 函数查询:
blog = session.query(Blog).filter(Blog.user_obj.has(User.name == u'XX')).first()最后的 SQL 语句都是一样的.
前面介绍的关系定义中, 提到了两种关系的获取形式, 一种是:
user_obj = relationship('User')这种是在对象上获取关系对象时, 再去查询.
另一种是:
blog_list = relationship('Blog', lazy="dynamic")这种的结果, 是在对象上获取关系对象时, 只返回 Query , 而查询的细节由人为来控制.
总的来说, 关系的获取分成两种, Lazy 或 Eager . 在直接查询层面, 上面两种都属于 Lazy 的方式, 而 Eager 的一种, 就是在获取对象时的查询语句, 是直接带 join 的, 这样关系对象的数据在一个查询语句中就直接获取到了:
class Blog(BaseModel): __tablename__ = 'blog' id = Column(BIGINT, primary_key=True, autoincrement=True) title = Column(String(64), server_default='', nullable=False) user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False) user_obj = relationship('User', lazy='joined', cascade='all') class User(BaseModel): __tablename__ = 'user' id = Column(BIGINT, primary_key=True, autoincrement=True) name = Column(String(32), server_default='', nullable=False)这样在查询时:
blog = session.query(Blog).first() print blog.user_obj便会多出 LEFT OUTER JOIN 的语句, 结果中直接获取到对应的 User 实例对象.
也可以把 joined 换成子查询, subquery :
class User(BaseModel): __tablename__ = 'user' id = Column(BIGINT, primary_key=True, autoincrement=True) name = Column(String(32), server_default='', nullable=False) blog_list = relationship('Blog', cascade='all', lazy='subquery') if __name__ == '__main__': session = Session() user = session.query(User).first() session.commit()子查询会用到临时表.
上面定义的:
blog_list = relationship('Blog', lazy="dynamic") user_obj = relationship('User', lazy='joined') blog_list = relationship('Blog', lazy='subquery')都算是一种默认方式. 在具体使用查询时, 还可以通过 options() 方法定义关联的获取方式:
from sqlalchemy.orm import lazyload, joinedload, subqueryload user = session.query(User).options(lazyload('blog_list')).first() print user.blog_list更多的用法:
session.query(Parent).options( joinedload('foo').joinedload('bar').joinedload('bat') ).all() session.query(A).options( defaultload("atob").joinedload("btoc") ).all() session.query(MyClass).options(lazyload('*')) session.query(MyClass).options( lazyload('*'), joinedload(MyClass.widget) ) session.query(User, Address).options(Load(Address).lazyload('*'))如果关联的定义之前是 Lazy 的, 但是实际使用中, 希望在手工 join 之后, 把关联对象直接包含进结果实例, 可以使用 contains_eager() 来包装一下:
from sqlalchemy.orm import contains_eager blog = session.query(Blog).join(Blog.user_obj)\ .options(contains_eager(Blog.user_obj)).first() print blog.user_obj关系在对象属性中的表现, 默认是列表, 但是, 这不是唯一的形式. 根据需要, 可以作成 dictionary , set 或者其它你需要的对象.
class Blog(BaseModel): __tablename__ = 'blog' id = Column(Integer, autoincrement=True, primary_key=True) title = Column(Unicode(32), server_default='') user = Column(Integer, ForeignKey('user.id'), index=True) user_obj = relationship('User') class User(BaseModel): __tablename__ = 'user' id = Column(Integer, autoincrement=True, primary_key=True) name = Column(Unicode(32), server_default='') blogs = relationship('Blog')对于上面的两个模型:
user = session.query(User).first() print user.blogs现在 user.blogs 是一个列表. 我们可以在 relationship() 调用时通过 collection_class 参数指定一个类, 来重新定义关系的表现形式:
user = User(name=u'XX') session.add_all([Blog(title=u'A', user_obj=user), Blog(title=u'B', user_obj=user)]) session.commit() user = session.query(User).first() print user.blogs set , 集合: blogs = relationship('Blog', collection_class=set) #InstrumentedSet([<__main__.Blog object at 0x1a58710>, <__main__.Blog object at 0x1a587d0>]) attribute_mapped_collection , 字典, 键值从属性取: from sqlalchemy.orm.collections import attribute_mapped_collection blogs = relationship('Blog', collection_class=attribute_mapped_collection('title')) #{u'A': <__main__.Blog object at 0x20ed810>, u'B': <__main__.Blog object at 0x20ed8d0>} 如果 title 重复的话, 结果会覆盖. mapped_collection , 字典, 键值自定义: from sqlalchemy.orm.collections import mapped_collection blogs = relationship('Blog', collection_class=mapped_collection(lambda blog: blog.title.lower())) #{u'a': <__main__.Blog object at 0x1de4890>, u'b': <__main__.Blog object at 0x1de4950>}先考虑典型的多对多关系结构:
class Blog(BaseModel): __tablename__ = 'blog' id = Column(BIGINT, primary_key=True, autoincrement=True) title = Column(String(64), server_default='', nullable=False) tag_list = relationship('Tag') tag_list = relationship('BlogAndTag') class Tag(BaseModel): __tablename__ = 'tag' id = Column(BIGINT, primary_key=True, autoincrement=True) name = Column(String(16), server_default='', nullable=False) class BlogAndTag(BaseModel): __tablename__ = 'blog_and_tag' id = Column(BIGINT, primary_key=True, autoincrement=True) blog = Column(BIGINT, ForeignKey('blog.id'), index=True) tag = Column(BIGINT, ForeignKey('tag.id'), index=True) create = Column(BIGINT, index=True, server_default='0')在 Blog 中的:
tag_list = relationship('Tag')显示是错误的, 因为在 Tag 中并没有外键. 而:
tag_list = relationship('BlogAndTag')这样虽然正确, 但是 tag_list 的关系只是到达 BlogAndTag 这一层, 并没有到达我们需要的 Tag .
这种情况下, 一个多对多关系是有三张表来表示的, 在定义 relationship 时, 就需要一个secondary 参数来指明关系表:
class Blog(BaseModel): __tablename__ = 'blog' id = Column(BIGINT, primary_key=True, autoincrement=True) title = Column(String(64), server_default='', nullable=False) tag_list = relationship('Tag', secondary=lambda: BlogAndTag.__table__) class Tag(BaseModel): __tablename__ = 'tag' id = Column(BIGINT, primary_key=True, autoincrement=True) name = Column(String(16), server_default='', nullable=False) class BlogAndTag(BaseModel): __tablename__ = 'blog_and_tag' id = Column(BIGINT, primary_key=True, autoincrement=True) blog = Column(BIGINT, ForeignKey('blog.id'), index=True) tag = Column(BIGINT, ForeignKey('tag.id'), index=True) create = Column(BIGINT, index=True, server_default='0')这样, 在操作时可以直接获取到对应的实例列表:
blog = session.query(Blog).filter(Blog.title == 'a').one() print blog.tag_list访问 tag_list 时, SQLAlchemy 做的是一个普通的多表查询.
tag_list 属性同时支持赋值操作:
session = Session() blog = session.query(Blog).filter(Blog.title == 'a').one() blog.tag_list = [Tag(name='t1')] session.commit()提交时, SQLAlchemy 总是会创建 Tag , 及对应的关系 BlogAndTag .
而如果是:
session = Session() blog = session.query(Blog).filter(Blog.title == 'a').one() blog.tag_list = [] session.commit() tag = session.query(Tag).filter(Tag.name == 'x').one() blog.tag_list.remove(tag) session.commit()那么 SQLAlchemy 只会删除对应的关系 BlogAndTag , 不会删除实体 Tag .
如果你直接删除实体, 那么对应的关系是不会自动删除的:
session = Session() blog = session.query(Blog).filter(Blog.title == 'a').one() tag = Tag(name='ok') blog.tag_list = [tag] session.commit() tag = session.query(Tag).filter(Tag.name == 'ok').one() session.delete(tag) session.commit()前面提到的, 当操作关系, 实体时, 与其相关联的关系, 实体是否会被自动处理的问题, 在 SQLAlchemy 中是通过 Cascades 机制来定义和解决的. ( Cascades 这个词是来源于Hibernate .)
cascade 是一个 relationship 的参数, 其值是逗号分割的多个字符串, 以表示不同的行为. 默认值是 " save-update, merge" , 稍后会介绍每个词项的作用.
这里的所有规则介绍, 只涉及从 Parent 到 Child , Parent 即定义 relationship的类. 不涉及backref .
cascade 所有的可选字符串项是:
all , 所有操作都会自动处理到关联对象上. save-update , 关联对象自动添加到会话. delete , 关联对象自动从会话中删除. delete-orphan , 属性中去掉关联对象, 则会话中会自动删除关联对象. merge , session.merge() 时会处理关联对象. refresh-expire , session.expire() 时会处理关联对象. expunge , session.expunge() 时会处理关联对象. save-update 当一个对象被添加进 session 后, 此对象标记为 save-update 的 relationship 关系对象也会同时添加进这个 session . class Blog(BaseModel): __tablename__ = 'blog' id = Column(BIGINT, primary_key=True, autoincrement=True) title = Column(String(64), server_default='', nullable=False) user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False) class User(BaseModel): __tablename__ = 'user' id = Column(BIGINT, primary_key=True, autoincrement=True) name = Column(String(32), server_default='', nullable=False) blog_list = relationship('Blog', cascade='') blog_list_auto = relationship('Blog', cascade='save-update') if __name__ == '__main__': session = Session() user = User(name=u'哈哈') blog = Blog(title=u'第一个') user.blog_list = [blog] #user.blog_list_auto = [blog] session.add(user) print blog in session session.commit() delete 当一个对象在 session 中被标记为删除时, 其属性中 relationship 关联的对象也会被标记成删除, 否则, 关联对象中的对应外键字段会被改成 NULL , 不能为 NULL 则报错. class Blog(BaseModel): __tablename__ = 'blog' id = Column(BIGINT, primary_key=True, autoincrement=True) title = Column(String(64), server_default='', nullable=False) user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False) class User(BaseModel): __tablename__ = 'user' id = Column(BIGINT, primary_key=True, autoincrement=True) name = Column(String(32), server_default='', nullable=False) blog_list = relationship('Blog', cascade='save-update, delete') if __name__ == '__main__': session = Session() #user = User(name=u'用户') #user.blog_list = [Blog(title=u'哈哈')] #session.add(user) user = session.query(User).first() session.delete(user) session.commit() delete-orphan 当 relationship 属性变化时, 被 "去掉" 的对象会被自动删除. 比如之前是: user.blog_list = [blog, blog2] 现在变成: user.blog_list = [blog2] 那么 blog 这个关联实体是会自动删除的. 这各机制只适用于 "一对多" 的关系中, "多对多" 和反过来的 "多对一" 都不适用. 在 relationship 定义时, 可以添加 single_parent = True 参数来强制约束. 当然, 在实现上 SQLAlchemy 是会先查出所有关联实体, 然后计算差集确认哪些需要被删除. class Blog(BaseModel): __tablename__ = 'blog' id = Column(BIGINT, primary_key=True, autoincrement=True) title = Column(String(64), server_default='', nullable=False) user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False) class User(BaseModel): __tablename__ = 'user' id = Column(BIGINT, primary_key=True, autoincrement=True) name = Column(String(32), server_default='', nullable=False) blog_list = relationship('Blog', cascade='save-update, delete-orphan') if __name__ == '__main__': session = Session() #user = User(name=u'用户') #blog = Blog(title=u'一') #blog2 = Blog(title=u'二') #user.blog_list = [blog, blog2] #session.add(user) user = session.query(User).first() blog2 = session.query(Blog).filter(Blog.title == u'二').first() user.blog_list = [blog2] #session.delete(user) session.commit() merge 这个选项是标识在 session.merge() 时处理关联对象. session.merge() 的作用, 是把一个会话外的实例, "整合"进会话, 比如 "有则修改, 无则创建" 就是典型的一种 "整合": user = User(id=1, name="1") session.add(user) session.commit() user = User(id=1) user = session.merge(user) print user.name user = User(id=1, name="2") user = session.merge(user) session.commit() cascade 中的 merge 作用: class Blog(BaseModel): __tablename__ = 'blog' id = Column(BIGINT, primary_key=True, autoincrement=True) title = Column(String(64), server_default='', nullable=False) user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False) class User(BaseModel): __tablename__ = 'user' id = Column(BIGINT, primary_key=True, autoincrement=True) name = Column(String(32), server_default='', nullable=False) blog_list = relationship('Blog', cascade='save-update, delete, delete-orphan, merge') if __name__ == '__main__': session = Session() user = User(id=1, name='1') session.add(user) session.commit(user) user = User(id=1, blog_list=[Blog(title='哈哈')]) session.merge(user) session.commit() refresh-expire 当使用 session.expire() 标识一个对象过期时, 此对象的关联对象是否也被标识为过期(访问属性会重新查询数据库). class Blog(BaseModel): __tablename__ = 'blog' id = Column(BIGINT, primary_key=True, autoincrement=True) title = Column(String(64), server_default='', nullable=False) user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False) class User(BaseModel): __tablename__ = 'user' id = Column(BIGINT, primary_key=True, autoincrement=True) name = Column(String(32), server_default='', nullable=False) blog_list = relationship('Blog', cascade='save-update, delete, delete-orphan, merge, refresh-expire') if __name__ == '__main__': session = Session() #user = User(id=1, name='1') #blog = Blog(title="abc") #user.blog_list = [blog] #session.add(user) user = session.query(User).first() blog = user.blog_list[0] print user.name print blog.title session.expire(user) print 'EXPIRE' print user.name print blog.title session.commit() expunge 与 merge 相反, 当 session.expunge() 把对象从会话中去除的时候, 此对象的关联对象也同时从会话中消失. class Blog(BaseModel): __tablename__ = 'blog' id = Column(BIGINT, primary_key=True, autoincrement=True) title = Column(String(64), server_default='', nullable=False) user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False) class User(BaseModel): __tablename__ = 'user' id = Column(BIGINT, primary_key=True, autoincrement=True) name = Column(String(32), server_default='', nullable=False) blog_list = relationship('Blog', cascade='delete, delete-orphan, expunge') if __name__ == '__main__': session = Session() user = User(name=u'用户') blog = Blog(title=u'第一个') user.blog_list = [blog] session.add(user) session.add(blog) session.expunge(user) print blog in session #session.commit()考虑这样的情况, 关系是关联的整个模型对象的, 但是, 有时我们对于这个关系, 并不关心整个对象, 只关心其中的某个属性. 考虑下面的场景:
from sqlalchemy.ext.associationproxy import association_proxy class Blog(BaseModel): __tablename__ = 'blog' id = Column(Integer, autoincrement=True, primary_key=True) title = Column(Unicode(32), nullable=False, server_default='') user = Column(Integer, ForeignKey('user.id'), index=True) class User(BaseModel): __tablename__ = 'user' id = Column(Integer, autoincrement=True, primary_key=True) name = Column(Unicode(32), nullable=False, server_default='') blog_list = relationship('Blog') blog_title_list = association_proxy('blog_list', 'title')blog_list 是一个正确的一对多关系. 下面的 blog_title_list 就是这个关系上的一个属性代理.blog_title_list 只处理 blog_list 这个关系中对应的对象的 title 属性, 包括获取和设置两个方向.
session = Session() user = User(name='xxx') user.blog_list = [Blog(title='ABC')] session.add(user) session.commit() user = session.query(User).first() print user.blog_title_list上面是获取属性的示例. 在"设置", 或者说"创建"时, 直接操作是有错的:
user = session.query(User).first() user.blog_title_list = ['NEW'] session.add(user) session.commit()原因在于, 对于类 Blog 的初始化形式. association_proxy('blog_list', 'title') 中的 title 只是获取时的属性定义, 而在上面的设置过程中, 实际上的调用形式为:
Blog('NEW')Blog 类没有明确定义 __init__() 方法, 所有这种形式的调用会报错. 可以把 __init__() 方法补上:
class Blog(BaseModel): __tablename__ = 'blog' id = Column(Integer, autoincrement=True, primary_key=True) title = Column(Unicode(32), nullable=False, server_default='') user = Column(Integer, ForeignKey('user.id'), index=True) def __init__(self, title): self.title = title这样调用就没有问题了.
另一个方法, 是在调用 association_proxy() 时使用 creator 参数明确定义"值"和"实例"的关系:
class User(BaseModel): __tablename__ = 'user' id = Column(Integer, autoincrement=True, primary_key=True) name = Column(Unicode(32), nullable=False, server_default='') blog_list = relationship('Blog') blog_title_list = association_proxy('blog_list', 'title', creator=lambda t: User(title=t))creator 定义的方法, 返回的对象可以被对应的 blog_list 关系接收即可.
在查询方面, 多对一 的关系代理上, 可以直接使用属性:
class Blog(BaseModel): __tablename__ = 'blog' id = Column(Integer, autoincrement=True, primary_key=True) title = Column(Unicode(32), server_default='') user = Column(Integer, ForeignKey('user.id'), index=True) user_obj = relationship('User') user_name = association_proxy('user_obj', 'name')查询:
blog = session.query(Blog).filter(Blog.user_name == u'XX').first()反过来的 一对多 关系代理上, 可以使用 contains() 函数:
user = session.query(User).filter(User.blogs_title.contains('A')).first()SQLAlchemy 的 session 是用于管理数据库操作的一个像容器一样的东西. 模型实例对象本身独立存在, 而要让其修改(创建)生效, 则需要把它们加入某个 session . 同时你也可以把模型实例对象从 session 中去除. 被 session 管理的实例对象, 在 session.commit() 时被提交到数据库. 同时 session.rollback() 是回滚变更.
session.flush() 的作用是在事务管理内与数据库发生交互, 对应的实例状态被反映到数据库. 比如自增 ID 被填充上值.
user = User(name=u'名字') session.add(user) session.commit() try: user = session.Query(User).first() user.name = u'改名字 session.commit() except: session.rollback()SQLAlchemy 的 Query 支持 select ... for update / share .
session.Query(User).with_for_update().first() session.Query(User).with_for_update(read=True).first()完整形式是:
with_for_update(read=False, nowait=False, of=None) read 是标识加互斥锁还是共享锁. 当为 True 时, 即 for share 的语句, 是共享锁. 多个事务可以获取共享锁, 互斥锁只能一个事务获取. 有"多个地方"都希望是"这段时间我获取的数据不能被修改, 我也不会改", 那么只能使用共享锁. nowait 其它事务碰到锁, 是否不等待直接"报错". of 指明上锁的表, 如果不指明, 则查询中涉及的所有表(行)都会加锁.SQLAlchemy 中的事务嵌套有两种情况. 一是在 session 中管理的事务, 本身有层次性. 二是 session 和原始的 connection 之间, 是一种层次关系, 在这 session , connection 两个概念之中的事务同样具有这样的层次.
session 中的事务, 可能通过 begin_nested() 方法做 savepoint :
session.add(u1) session.add(u2) session.begin_nested() session.add(u3) session.rollback() # rolls back u3, keeps u1 and u2 session.commit()或者使用上下文对象:
for record in records: try: with session.begin_nested(): session.merge(record) except: print "Skipped record %s" % record session.commit()嵌套的事务的一个效果, 是最外层事务提交整个变更才会生效.
user = User(name='2') session.begin_nested() session.add(user) session.commit() session.rollback()于是, 前面说的第二种情况有一种应用方式, 就是在 connection 上做一个事务, 最终也在 connection 上回滚这个事务, 如果 session 是 bind 到这个连接上的, 那么 session 上所做的更改全部不会生效:
conn = Engine.connect() session = Session(bind=conn) trans = conn.begin() user = User(name='2') session.begin_nested() session.add(user) session.commit() session.commit() trans.rollback()在测试中这种方式可能会有用.
二段式提交, Two-Phase, 是为解决分布式环境下多点事务控制的一套协议.
与一般事务控制的不同是, 一般事务是 begin, 之后 commit 结束.
而二段式提交的流程上, begin 之后, 是 prepare transaction 'transaction_id' , 这时相关事务数据已经持久化了. 之后, 再在任何时候(哪怕重启服务), 作 commit prepared 'transaction_id' 或者 rollback prepared 'transaction_id' .
从多点事务的控制来看, 应用层要做的事是, 先把任务分发出去, 然后收集"事务准备"的状态(prepare transaction 的结果). 根据收集的结果决定最后是 commit 还是 rollback .
简单来说, 就是事务先保存, 再说提交的事.
SQLAlchemy 中对这个机制的支持, 是在构建会话类是加入 twophase 参数:
Session = sessionmaker(twophase=True)然后会话类可以根据一些策略, 绑定多个 Engine , 可以是多个数据库连接, 比如:
Session = sessionmaker(twophase=True) Session.configure(binds={User: Engine, Blog: Engine2})这样, 在获取一个会话实例之后, 就处在二段式提交机制的支持之下, SQLAlchemy 自己会作多点的协调了. 完整的流程:
Engine = create_engine('postgresql://test@localhost:5432/test', echo=True) Engine2 = create_engine('postgresql://test@localhost:5432/test2', echo=True) Session = sessionmaker(twophase=True) Session.configure(binds={User: Engine, Blog: Engine2}) session = Session() user = User(name=u'名字') session.add(user) session.commit()对应的 SQL 大概就是:
begin; insert into "user" (name) values (?); prepare transaction 'xx'; commit prepared 'xx';使用时, Postgresql 数据库需要把 max_prepared_transactions 这个配置项的值改成大于 0 .
字段类型是在定义模型时, 对每个 Column 的类型约定. 不同类型的字段类型在输入输出上, 及支持的操作方面, 有所区别.
这里只介绍 sqlalchemy.types.* 中的类型, SQL 标准类型方面, 是写什么最后生成的 DDL 语句就是什么, 比如 BIGINT, BLOG 这些, 但是这些类型并不一定在所有数据库中都有支持. 除此而外, SQLAlchemy 也支持一些特定数据库的特定类型, 这些需要从具体的 dialects 实现里导入.
Integer/BigInteger/SmallInteger 整形. Boolean 布尔类型. Python 中表现为 True/False , 数据库根据支持情况, 表现为 BOOLEAN 或 SMALLINT. 实例化时可以指定是否创建约束(默认创建). Date/DateTime/Time (timezone=False) 日期类型, Time 和 DateTime 实例化时可以指定是否带时区信息. Interval 时间偏差类型. 在 Python 中表现为 datetime.timedelta() , 数据库不支持此类型则存为日期. Enum (*enums, **kw) 枚举类型, 根据数据库支持情况, SQLAlchemy 会使用原生支持或者使用 VARCHAR 类型附加约束的方式实现. 原生支持中涉及新类型创建, 细节在实例化时控制. Float 浮点小数. Numeric (precision=None, scale=None, decimal_return_scale=None, ...) 定点小数, Python 中表现为 Decimal . LargeBinary (length=None) 字节数据. 根据数据库实现, 在实例化时可能需要指定大小. PickleType Python 对象的序列化类型. String (length=None, collation=None, ...) 字符串类型, Python 中表现为 Unicode , 数据库表现为 VARCHAR , 通常都需要指定长度. Unicode 类似与字符串类型, 在某些数据库实现下, 会明确表示支持非 ASCII 字符. 同时输入输出也强制是 Unicode 类型. Text 长文本类型, Python 表现为 Unicode , 数据库表现为 TEXT . UnicodeText 参考 Unicode .混合属性, 官方文档中称之为 Hybrid Attributes . 这种机制表现为, 一个属性, 在 类 和层面, 和 实例 的层面, 其行为是不同的. 之所以需要关注这部分的差异, 原因源于 Python 上下文和 SQL 上下文的差异.
类 层面经常是作为 SQL 查询时的一部分, 它面向的是 SQL 上下文. 而 实例 是已经得到或者创建的结果, 它面向的是 Python 上下文.
定义模型的 Column() 就是一个典型的混合属性. 作为实例属性时, 是具体的对象值访问, 而作为类属性时, 则有构成 SQL 语句表达式的功能.
class Interval(BaseModel): __tablename__ = 'interval' id = Column(Integer, autoincrement=True, primary_key=True) start = Column(Integer) end = Column(Integer) session.add(Interval(start=0, end=100)) session.commit()实例行为:
ins = session.query(Interval).first() print ins.end - ins.start类行为:
ins = session.query(Interval).filter(Interval.end - Interval.start > 10).first()这种机制其实一直在被使用, 但是可能大家都没有留意一个属性在类和实例上的区别.
如果属性需要被进一步封装, 那么就需要明确声明 Hybrid Attributes 了:
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method class Interval(BaseModel): __tablename__ = 'interval' id = Column(Integer, autoincrement=True, primary_key=True) start = Column(Integer) end = Column(Integer) @hybrid_property def length(self): return self.end - self.start @hybrid_method def bigger(self, i): return self.length > i session.add(Interval(start=0, end=100)) session.commit() ins = session.query(Interval).filter(Interval.length > 10).first() ins = session.query(Interval).filter(Interval.bigger(10)).first() print ins.bigger(1)setter 的定义同样使用对应的装饰器即可:
class Interval(BaseModel): __tablename__ = 'interval' id = Column(Integer, autoincrement=True, primary_key=True) start = Column(Integer) end = Column(Integer) @hybrid_property def length(self): return abs(self.end - self.start) @length.setter def length(self, l): self.end = self.start + l前面说的属性, 在类和实例上有不同行为, 可以看到, 在类上的行为, 其实就是生成 SQL 表达式时的行为. 上面的例子只是简单的运算, SQLAlchemy 可以自动处理好 Python 函数和 SQL 函数的区别. 但是如果是一些特性更强的 SQL 函数, 就需要手动指定了. 于时, 这时的情况变成, 实例行为是 Python 范畴的调用行为, 而类行为则是生成 SQL 函数的相关表达式.
同时是前面的例子, 对于 length 的定义, 更严格上来说, 应该是取绝对值的.
class Interval(BaseModel): __tablename__ = 'interval' id = Column(Integer, autoincrement=True, primary_key=True) start = Column(Integer) end = Column(Integer) @hybrid_property def length(self): return abs(self.end - self.start)但是, 如果使用了 Python 的 abs() 函数, 在生成 SQL 表达式时显示有无法处理了. 所以, 需要手动定义:
from sqlalchemy import func class Interval(BaseModel): __tablename__ = 'interval' id = Column(Integer, autoincrement=True, primary_key=True) start = Column(Integer) end = Column(Integer) @hybrid_property def length(self): return abs(self.end - self.start) @length.expression def length(self): return func.abs(self.end - self.start)这样查询时就可以直接使用:
ins = session.query(Interval).filter(Interval.length > 1).first()对应的 SQL :
SELECT * FROM interval WHERE abs(interval."end" - interval.start) > ? LIMIT ? OFFSET ?总体上没有特别之处:
class Account(BaseModel): __tablename__ = 'account' id = Column(Integer, autoincrement=True, primary_key=True) user = Column(Integer, ForeignKey('user.id'), index=True) balance = Column(Integer, server_default='0') class User(BaseModel): __tablename__ = 'user' id = Column(Integer, autoincrement=True, primary_key=True) name = Column(Unicode(32), nullable=False, server_default='') accounts = relationship('Account') #balance = association_proxy('accounts', 'balance') @hybrid_property def balance(self): return sum(x.balance for x in self.accounts)查询时:
user = session.query(User).first() print user.balance这里涉及的东西都是 Python 自己的, 包括那个 sum() 函数, 和 SQL 没有关系.
如果想实现的是, 使用 SQL 的 sum() 函数, 取出指定用户的总账户金额数, 那么就要考虑把balance 作成表达式的形式:
from sqlalchemy import select @hybrid_property def balance(self): return select([func.sum(Account.balance)]).where(Account.user == self.id).label('balance_v') #return func.sum(Account.balance)这样的话, User.balance 只是单纯的一个表达式了, 查询时指定字段:
user = session.query(User, User.balance).first() print user.balance_v注意, 如果写成:
session.query(User.balance).first()意义就不再是"获取第一个用户的总金额", 而变成"获取总金额的第一个". 这里很坑吧.
像上面这样改, 实例层面就无法使用 balance 属性. 所以, 还是先前介绍的, 表达式可以单独处理:
@hybrid_property def balance(self): return sum(x.balance for x in self.accounts) @balance.expression def balance(self): return select([func.sum(Account.balance)]).where(Account.user == self.id).label('balance_v')定义了表达式的 balance , 这部分作为查询条件上当然也是可以的:
user = session.query(User).filter(User.balance > 1).first()这里说的 AdjacencyList , 就是最常用来在关系数据库中表示树结构的, parent 方式:
idnameparent1一null2二13三2上面的数据, 表示的结构就是:
一 |- 二 |- 三模型定义很好做:
# -*- coding: utf-8 -*- from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, ForeignKey from sqlalchemy.types import Integer, Unicode from sqlalchemy.orm import relationship, sessionmaker, joinedload BaseModel = declarative_base() Engine = create_engine('sqlite://', echo=True) Session = sessionmaker(Engine) class Node(BaseModel): __tablename__ = 'node' id = Column(Integer, autoincrement=True, primary_key=True) name = Column(Unicode(32), nullable=False, server_default='') parent = Column(Integer, ForeignKey('node.id'), index=True, nullable=False, server_default='0')这里不让 parent 字段有 null , 而使用 0 代替.
这个例子在关系上, 有一个纠结的地方, 因为 node 这个表, 它是自关联的, 所以如果想要 children和 parent_obj 这两个关系时:
children = relationship('Node') parent_obj = relationship('Node')呃, 尴尬了.
如果是两个表, 那么 SQLAlchemy 可以通过外键在哪张表这个信息, 来确定关系的方向:
class Blog(BaseModel): ... user = Column(Integer, ForeignKey('user.id')) user_obj = relationship('User') class User(BaseModel): ... blog_list = relationship('Blog')因为外键在 Blog 中, 所以 Blog -> User 的 user_obj 是一个 N -> 1 关系.
反之, User -> Blog 的 blog_list 则是一个 1 -> N 的关系.
而自相关的 Node 无法直接判断方向, 所以 SQLAlchemy 会按 1 -> N 处理, 那么:
children = relationship('Node') parent_obj = relationship('Node')这两条之中, children 是正确的, 是我们想要的. 要定义 parent_obj 则需要在 relationship 中通过参数明确表示方向:
parent_obj = relationship('Node', remote_side=[id])这种方式就定义了一个, "到 id" 的 N -> 1 关系.
现在完整的模型定义是:
class Node(BaseModel): __tablename__ = 'node' id = Column(Integer, autoincrement=True, primary_key=True) name = Column(Unicode(32), nullable=False, server_default='') parent = Column(Integer, ForeignKey('node.id'), index=True, nullable=False, server_default='0') children = relationship('Node') # 1 -> N parent_obj = relationship('Node', remote_side=[id])查询方面没什么特殊的了, 不过我发现在自相关的模型关系, lazy 选项不起作用:
children = relationship('Node', lazy="joined") parent_obj = relationship('Node', remote_side=[id], lazy="joined")都是无效的, 只有在查询时, 手动使用 options() 定义:
n = session.query(Node).filter(Node.name==u'一')\ .options(joinedload('parent_obj')).first()如果要一次查出多级的子节点:
n = session.query(Node).filter(Node.name==u'一')\ .options(joinedload('children').joinedload('children')).first() print n.name, n.children, n.children[0].children多个 joinedload() 串连的话, 可以使用 joinedload_all() 来整合:
from sqlalchemy.orm import joinedload_all n = session.query(Node).filter(Node.name==u'一')\ .options(joinedload_all('children', 'children')).first()在修改方面, 删除的话, 配置了 cascade , 删除父节点, 则子节点也会自动删除:
children = relationship('Node', lazy='joined', cascade='all') # 1 -> N node = session.query(Node).filter(Node.name == u'一').first() session.delete(node) session.commit()如果只删除子节点, 那么 delete-orphan 选项就很好用了:
children = relationship('Node', lazy='joined', cascade='all, delete-orphan') # 1 -> N node = session.query(Node).filter(Node.name == u'一').first() node.children = [] session.commit()假设有这样的场景, 某实体在具体条目上, 其属性是不定的, 或者其属性是充分稀疏的:
idnameattr_0attr_1attr_2...attr_n1foo1abc33...any这种情况下, 把属性看成是单独的实体, 是一个更好的建模方式:
idname1foo identity_idattr_nameattr_value11attr_0121attr_133............n1attr_nany这种模型下, ORM 层面我们考虑封装一个对操作更友好的上层操作接口, 比如:
obj = Entity() obj['attr_0'] = '1' obj['attr_1'] = '33' session.add(obj) session.commit()实现上, 就是把对象的方法, 包装成 SQLAlchemy 的 ORM 中的对应的关系操作.
class BaseModel(declarative_base()): __abstract__ = True class Entity(BaseModel): __tablename__ = 'entity' id = Column(Integer, autoincrement=True, primary_key=True) name = Column(Unicode(32), server_default='') _attributes = relationship('Attribute', collection_class=attribute_mapped_collection('key'), lazy='joined', cascade="all, delete-orphan") attributes = association_proxy('_attributes', 'value', creator=lambda k, v: Attribute(key=k, value=v)) class Attribute(BaseModel): __tablename__ = 'attribute' id = Column(Integer, autoincrement=True, primary_key=True) entity = Column(Integer, ForeignKey('entity.id'), index=True) key = Column(Unicode(32), server_default='') value = Column(UnicodeText, server_default='') if __name__ == '__main__': BaseModel.metadata.create_all(Engine) session = Session() entity = Entity(name=u'哈哈') entity.attributes[u'first'] = u'abc' entity.attributes[u'sec'] = u'hoho' session.add(entity) session.commit() entity = session.query(Entity).first() print entity.attributes del entity.attributes['first'] session.commit() entity = session.query(Entity).first() print entity.attributes实现上就两点:
_attributes 关系中, 指定 collection_class , 于是就可以得到一个像 dict 的属性对象了. association_proxy 从 dict 的属性对象中只抽出我们关心的 value 属性值.这个场景中, 还可以再进一步, 在 Entity 类上实现 dict 的一些方法, 直接操作其 attributes 属性,association_proxy 就直接返回 Entity 的实例, 这样代码可以变成这样:
entity = Entity(name=u'ABC') entity[u'first'] = u'a' entity[u'sec'] = u'hoho' 相关资源:敏捷开发V1.0.pptx