#!/usr/bin/env python # -*- coding:utf-8 -*- # author:SunXiuWen # make_time:2019/1/13 import logging import sys from time import sleep sys.path.append("..") import pymysql # 无法加载DButils包 from DBUtils.PooledDB import PooledDB # from dbutils.pooled_db import PooledDB """ @功能:创建数据库连接池 """ class DB(object): """docstring for DbConnection""" __pool = None def __init__(self, db_ip, db_port, db_name, db_user, db_password): self.db_ip = db_ip self.db_port = db_port self.db_name = db_name self.db_user = db_user self.db_password = db_password self.pool = self.__get_conn_pool() # 获取连接 def __get_conn_pool(self): if self.__pool is None: try: self.__pool = PooledDB(creator=pymysql, host=self.db_ip, port=self.db_port, user=self.db_user, passwd=self.db_password, db=self.db_name, use_unicode=True, charset='utf8') except Exception as e: logging.error("%s : %s" % (Exception, e)) return self.__pool # 获取连接 def _get_connection(self): try: conn = self.pool.connection() cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) return conn, cursor except Exception as e: logging.error(" db.pool.connection ERROR: %s" % e) return None, None # 关闭连接 def _close_connection(self, conn, cursor): if cursor: cursor.close() if conn: conn.close() # 查询 def query_all_sql(self, sql, param=None): conn, cursor = self._get_connection() try: cursor.execute(sql, param) result = cursor.fetchall() self._close_connection(conn, cursor) except Exception as e: self._close_connection(conn, cursor) logging.error(str(e)) result = None return result def query_one_sql(self, sql, param=None): conn, cursor = self._get_connection() try: cursor.execute(sql, param) result = cursor.fetchone() self._close_connection(conn, cursor) except Exception as e: self._close_connection(conn, cursor) logging.error(str(e)) result = None return result # 执行 def execute_sql(self, sql, param=None): conn, cursor = self._get_connection() try: result = cursor.execute(sql, param) conn.commit() self._close_connection(conn, cursor) except Exception as e: conn.rollback() self._close_connection(conn, cursor) logging.error(str(e)) result = None return result # class MySqLHelper(object): # def __init__(self, db_ip, db_port, db_name, db_user, db_password): # self.db = MyConnectionPool(db_ip, db_port, db_name, db_user, db_password) # 从数据池中获取连接 # # # def __new__(cls, *args, **kwargs): # # if not hasattr(cls, 'inst'): # 单例 # # cls.inst = super(MySqLHelper, cls).__new__(cls, *args, **kwargs) # # return cls.inst # # # 封装执行命令 # def execute(self, sql, param=None, autoclose=False): # """ # 【主要判断是否有参数和是否执行完就释放连接】 # :param sql: 字符串类型,sql语句 # :param param: sql语句中要替换的参数"select %s from tab where id=%s" 其中的%s就是参数 # :param autoclose: 是否关闭连接 # :return: 返回连接conn和游标cursor # """ # cursor, conn = self.db.getconn() # 从连接池获取连接 # count = 0 # try: # # count : 为改变的数据条数 # if param: # count = cursor.execute(sql, param) # else: # count = cursor.execute(sql) # conn.commit() # if autoclose: # self.close(cursor, conn) # except Exception as e: # print("db error_msg:", e.args) # return cursor, conn, count # # # 释放连接 # def close(self, cursor, conn): # """释放连接归还给连接池""" # cursor.close() # conn.close() # # # 查询所有 # def selectall(self, sql, param=None): # try: # cursor, conn, count = self.execute(sql, param) # res = cursor.fetchall() # return res # except Exception as e: # print("db error_msg:", e.args) # self.close(cursor, conn) # return count # # # 查询单条 # def selectone(self, sql, param=None): # try: # cursor, conn, count = self.execute(sql, param) # res = cursor.fetchone() # self.close(cursor, conn) # return res # except Exception as e: # print("db error_msg:", e.args) # self.close(cursor, conn) # return count # # # 增加 # def insertone(self, sql, param): # try: # cursor, conn, count = self.execute(sql, param) # conn.commit() # self.close(cursor, conn) # return count # except Exception as e: # print("db error_msg:", e.args) # conn.rollback() # self.close(cursor, conn) # return count # # # 删除 # def delete(self, sql, param=None): # try: # cursor, conn, count = self.execute(sql, param) # conn.commit() # self.close(cursor, conn) # return count # except Exception as e: # print("db error_msg:", e.args) # conn.rollback() # self.close(cursor, conn) # return count # # # 修改 # def update(self, sql, param=None): # try: # cursor, conn, count = self.execute(sql, param) # conn.commit() # self.close(cursor, conn) # return count # except Exception as e: # print("db error_msg:", e.args) # conn.rollback() # self.close(cursor, conn) # return count # if __name__ == '__main__': db = DB() # while True: # sql1 = 'select * from space where unit=%s' # args = '11' # ret = db.query_sql(sql=sql1,param=args) # print(ret) # (79, 2, 1, 2.2, None, 2, 1) # 查询单条 sql1 = 'select * from space where unit=%s' args = '11' ret = db.query_one_sql(sql=sql1, param=args) print(ret) # (79, 2, 1, 2.2, None, 2, 1) # 查询所有 sql2 = "select * from space" ret = db.query_all_sql(sql=sql2) print(ret) # 增加 sql3 = 'insert into test VALUES (%s,%s,%s,%s)' ret = db.execute_sql(sql3, ('鄂A6X3B0', 'DASDASDEFDFSDASDADASDAS', 1, 2)) print(ret) # 删除 sql4 = 'delete from test WHERE car=%s' args = '鄂A6X3B0' ret = db.execute_sql(sql4, args) print(ret) sleep(0.1) # 修改 sql5 = 'update test set ki=%s WHERE car=%s' args = ('100', 'wkk') ret = db.update_sql(sql5, args) print(ret)