python中bottle使用实例代码( 二 )


它的使用方法是,先声明参数选项就是(相当于声明组)

mysql_opt_group = cfg.OptGroup('db'),
然后声明组内的选项,
mysql_opts = [cfg.StrOpt('url', default=DEFAULT_ARVIEW_DB_URL),cfg.StrOpt('Db', default='3mysql'),cfg.StrOpt('DbHost', default='381.68.179.136'),cfg.StrOpt('DbPort', default='33306'),cfg.StrOpt('DbUser', default='3DbUser'),cfg.StrOpt('DbPassWord', default='3DbPassWord'),cfg.StrOpt('DbName', default='3DbName'),cfg.BoolOpt('create', default=False),cfg.BoolOpt('commit', default=True),cfg.BoolOpt('echo', default=True, help='是否显示回显'),cfg.BoolOpt('echo_pool', default=False, help='数据库连接池是否记录 checkouts/checkins操作'),cfg.IntOpt('pool_size', default=1000, help='数据库连接池中保持打开的连接数量'),cfg.IntOpt('pool_recycle', default=600, help='数据库连接池在连接被创建多久(单位秒)以后回收连接')]
拼接选项
MYSQLCINDER_OPTS = (mysql_opts)
接着注册组,
CONF.register_group(mysql_opt_group)
最后将选项注册进组 。
CONF.register_opts(MYSQLCINDER_OPTS, group=mysql_opt_group)
当然最重要的注册参数选项,我的理解就是暴露句柄 。
# 注册参数选项
CONF = cfg.CONF
【python中bottle使用实例代码】然后创建数据库引擎
common/utils/sqlalchemy_util.py
import loggingfrom json import loads as json_loads from sqlalchemy.engine import create_enginefrom sqlalchemy.pool import QueuePoolfrom confg import CONF SQLALCHEMY_ENGINE_CONTAINER = {} logger = logging.getLogger("arview")def json_deserializer(s, **kw):if isinstance(s, bytes):return json_loads(s.decode('utf-8'), **kw)else:return json_loads(s, **kw)def get_sqlalchemy_engine(db_url):if db_url not in SQLALCHEMY_ENGINE_CONTAINER:engine = create_engine(db_url, echo=CONF.db.echo,# pool_pre_ping如果值为True,那么每次从连接池中拿连接的时候,都会向数据库发送一个类似# select 1的测试查询语句来判断服务器是否正常运行 。当该连接出现disconnect的情况时,# 该连接连同pool中的其它连接都会被回收pool_pre_ping=True,echo_pool=CONF.db.echo_pool,pool_size=CONF.db.pool_size,pool_recycle=CONF.db.pool_recycle,json_deserializer=json_deserializer,poolclass=QueuePool)logger.info('Create sqlalchemy engine %s', engine)SQLALCHEMY_ENGINE_CONTAINER[db_url] = enginereturn SQLALCHEMY_ENGINE_CONTAINER[db_url]
这里引用配置文件的数据,直接引入CONF
from confg import CONF
然后使用
CONF.db.echo_pool
创建句柄,
与我之前使用的方法不同的是,这里的数据库引擎不需要在使用的地方引入了,会在main里注册路由分组时,通过plugin插件自动将数据库引擎导入 。这也是我有点搞不懂的地方,虽然更方便,但是不知道就很难知道了,问了同事才知道是怎么回事 。
bottle源码
def install(self, plugin):''' Add a plugin to the list of plugins and prepare it for beingapplied to all routes of this application. A plugin may be a simpledecorator or an object that implements the :class:`Plugin` API.'''
plugin就是相当与golang的中间件,不过作用范围是全部路由 。
这里创建数据库句柄并使用是一个比较绕的过程 。总体思路:
1.写一个bottle plugin,创建数据库句柄,然后install安装这个plugin 。就可以在所有的路由中自动引入这个插件(就是不用在包里在导入db句柄了,bottle会自动导入) 。
/common/base.py 创建plugin并安装
import loggingfrom bottle import Bottlefrom confg.conf import CONFfrom sqlalchemy import create_enginefrom sqlalchemy.orm import sessionmaker, scoped_sessionfrom db.models.base import Base as ApiModelBasefrom common.utils.sqlalchemy_util import get_sqlalchemy_enginefrom bottle_sqlalchemy import SQLAlchemyPlugin logger = logging.getLogger("arview")base = ApiModelBase# sqlalchemy orm base classclass Plugins:SQLALCHEMY_PLUGIN = None# sqlalchemy plugin, global only one instanceAPSCHEDULER_PLUGIN = None# apsechduler plugin. global only one instanceclass Base(object):def __init__(self, *args, **kwargs):logger.debug('>>>> Base init begin')self.app = Bottle()# self.app.install(SwaggerPlugin(self._type))logger.debug('>>>> Base init end')class DB(Base):def __init__(self, db_url, create=None, commit=None, *args, **kwargs):print('db_url:', db_url)super(DB, self).__init__(*args, **kwargs)if create is None:create = CONF.db.createif commit is None:commit = CONF.db.commitif Plugins.SQLALCHEMY_PLUGIN is None:Plugins.SQLALCHEMY_PLUGIN = _create_sqlalchemy_plugin(db_url, create=create, commit=commit)self.app.install(Plugins.SQLALCHEMY_PLUGIN)logger.debug("Install plugin: sqlalchemy.")# if CONF.api.enable_request_interval_plugin:#self.app.install(RequestTimeIntervalPlugin())logger.debug('>>>> DB init end')class CommonBase(object):def __init__(self):self._db = None@propertydef db(self):if not self._db:DBURL = "mysql+mysqlconnector://{}:{}@{}:{}/{}?charset=utf8".format(CONF.mysql.DbUser,CONF.mysql.DbPassWord,CONF.mysql.DbHost,CONF.mysql.DbPort,CONF.mysql.DbName)engine = create_engine(DBURL, echo=False)self._db = sessionmaker()(bind=engine)return self._db@db.deleterdef db(self):if self._db:self._db.commit()self._db.close()self._db = Nonedef _create_sqlalchemy_plugin(db_url, create, commit):"""创建sqlalchemy插件:param db_url::param echo::param create::param commit::return:"""logger.debug('>>>> create sqlalchemy plugin begin')engine = get_sqlalchemy_engine(db_url)plugin = SQLAlchemyPlugin(engine, metadata=https://www.yf-zs.com/redian/ApiModelBase.metadata, create=create, commit=commit, use_kwargs=True)logger.debug('>>>> create sqlalchemy plugin %s' % plugin)return plugin

推荐阅读