/* * @Description: 数据库操作类 * @Author: yct * @Date: 2020-07-15 14:08:46 * @LastEditTime: 2020-07-24 14:19:31 * @LastEditors: yct */ #include "database_controller.h" Database_controller::Database_controller() { m_database_controller_status = E_UNKNOWN; mp_reconnect_thread = nullptr; m_conn_pool_size = 0; } Database_controller::~Database_controller() { database_controller_uninit(); } Error_manager Database_controller::database_controller_init() { return database_controller_init_from_protobuf(DATABASE_COMMUNICATION_PARAMETER_PATH); } Error_manager Database_controller::database_controller_init_from_protobuf(std::string prototxt_path) { parkspace_proto::Database_communication_configuration_all t_database_config; if(! proto_tool::read_proto_param(prototxt_path,t_database_config) ) { return Error_manager(DB_PROTOBUF_ERROR,MINOR_ERROR, "database_controller_init_from_protobuf(std::string prototxt_path) failed"); } else { Error_manager t_error; t_error=database_controller_init(t_database_config.database_communication_configurations().db_ip(),\ t_database_config.database_communication_configurations().db_port(),\ t_database_config.database_communication_configurations().db_username(),\ t_database_config.database_communication_configurations().db_passwd(),\ t_database_config.database_communication_configurations().db_name(),\ t_database_config.database_communication_configurations().db_conn_pool_size()); if ( t_error != Error_code::SUCCESS ) { return t_error; } } return SUCCESS; } //初始化 Error_manager Database_controller::database_controller_init(std::string ip, int port, std::string username, std::string pass, std::string db_name, int conn_size) { Error_manager t_error; //系统结束标记与连接池大小 // mb_exit = false; m_conn_pool_size = conn_size; m_db_param.connect_string = std::string("tcp://").append(ip).append(":").append(std::to_string(port)); m_db_param.username = username; m_db_param.pass = pass; m_db_param.db_name = db_name; if(sql::mysql::get_driver_instance() == nullptr) { //胡力 直接修改状态 m_database_controller_status = E_FAULT; // mb_initialized = false; return DB_INIT_FAILED; return Error_manager(Error_code::DB_INIT_FAILED, Error_level::MAJOR_ERROR, "数据库初始化失败 Database_controller::database_controller_init error "); } ////创建补充 数据库连接的通路 t_error = fill_up_connect_channel(); //守护线程,检查连接状态并保持连接数量 m_database_reconnect_condition.reset(false,false,false); mp_reconnect_thread = new std::thread(&Database_controller::database_reconnect_thread, this); return t_error; } //反初始化 Error_manager Database_controller::database_controller_uninit() { // mb_exit = true; if(mp_reconnect_thread!=nullptr) { m_database_reconnect_condition.kill_all(); } if(mp_reconnect_thread!=nullptr) { if(mp_reconnect_thread->joinable()) mp_reconnect_thread->join(); delete mp_reconnect_thread; mp_reconnect_thread = nullptr; } m_database_connect_channel_list.clear(); return SUCCESS; } Database_controller::Database_controller_status Database_controller::get_database_controller_status() { return m_database_controller_status; } Error_manager Database_controller::check_status() { if(get_database_controller_status()==E_READY) { return SUCCESS; } else if(get_database_controller_status() == E_UNKNOWN) { return Error_manager(Error_code::DB_INIT_FAILED, Error_level::MINOR_ERROR, "数据库初始化失败 check_status() error "); } else if(get_database_controller_status() == E_DISCONNECT) { return Error_manager(Error_code::DB_CONNECT_FAILED, Error_level::MINOR_ERROR, "数据库连接失败 check_status() error "); } else if(get_database_controller_status() == E_FAULT) { return Error_manager(Error_code::DB_STATUS_ERROR, Error_level::MINOR_ERROR, "数据库状态错误 check_status() error "); } } //数据库连接状态 bool Database_controller::is_connected() { if ( m_database_controller_status == E_READY) { return true; } else { return false; } // // updata_connect_channel(); // return mb_connected; } //****** 增删改查功能 ******* //增 Error_manager Database_controller::sql_insert(std::string sql_str) { #ifdef COUT_SQL_STRING LOG(INFO) << " sql_str = "<< sql_str << " --- " << this; #endif Error_manager t_error; //huli t_error = check_status(); if ( t_error != Error_code::SUCCESS ) { return t_error; } boost::shared_ptr tp_database_connect_channel; t_error = occupancy_connect_channel(tp_database_connect_channel); if ( t_error != Error_code::SUCCESS ) { return t_error; } char buf[1024]; memset(buf, 0, 1024); try { tp_database_connect_channel->mp_connect_channel->setSchema(m_db_param.db_name); boost::scoped_ptr stmt(tp_database_connect_channel->mp_connect_channel->createStatement()); stmt->execute(sql_str); giveback_connect_channel(tp_database_connect_channel); return SUCCESS; } catch (sql::SQLException &e) { giveback_connect_channel(tp_database_connect_channel); /* Use what() (derived from std::runtime_error) to fetch the error message */ sprintf(buf, "# ERR: %s\n (MySQL error code: %d, SQLState: %s\nsql: %s", e.what(), e.getErrorCode(), e.getSQLState().c_str(), sql_str.c_str()); usleep(1000* 3000); return Error_manager(DB_INSERT_FAILED, NEGLIGIBLE_ERROR, buf); } catch (std::runtime_error &e) { giveback_connect_channel(tp_database_connect_channel); sprintf(buf, "# ERR: %s\n ERR: runtime_error in %s \nsql: %s",e.what(),__FILE__, sql_str.c_str()); return Error_manager(DB_INSERT_FAILED, NEGLIGIBLE_ERROR, buf); } // catch (std::exception &e) // { // sprintf(buf, "# ERR: %s\n ERR: Standard exception in %s ",e.what(),__FILE__); // return Error_manager(DB_INSERT_FAILED, NEGLIGIBLE_ERROR, buf); // } return SUCCESS; } //删 Error_manager Database_controller::sql_delete(std::string sql_str) { #ifdef COUT_SQL_STRING LOG(INFO) << " sql_str = "<< sql_str << " --- " << this; #endif // 1.检查状态,一旦无可用连接,则主动创建一个,若依然失败则直接返回错误 Error_manager t_error; //huli t_error = check_status(); if ( t_error != Error_code::SUCCESS ) { return t_error; } boost::shared_ptr tp_database_connect_channel; t_error = occupancy_connect_channel(tp_database_connect_channel); if ( t_error != Error_code::SUCCESS ) { return t_error; } char buf[1024]; memset(buf, 0, 1024); try { tp_database_connect_channel->mp_connect_channel->setSchema(m_db_param.db_name); boost::scoped_ptr stmt(tp_database_connect_channel->mp_connect_channel->createStatement()); stmt->execute(sql_str); giveback_connect_channel(tp_database_connect_channel); return SUCCESS; } catch (sql::SQLException &e) { /* Use what() (derived from std::runtime_error) to fetch the error message */ giveback_connect_channel(tp_database_connect_channel); sprintf(buf, "# ERR: %s\n (MySQL error code: %d, SQLState: %s\nsql: %s", e.what(), e.getErrorCode(), e.getSQLState().c_str(), sql_str.c_str()); return Error_manager(DB_DELETE_FAILED, NEGLIGIBLE_ERROR, buf); } catch (std::runtime_error &e) { giveback_connect_channel(tp_database_connect_channel); sprintf(buf, "# ERR: %s\n ERR: runtime_error in %s \nsql: %s",e.what(),__FILE__, sql_str.c_str()); return Error_manager(DB_DELETE_FAILED, NEGLIGIBLE_ERROR, buf); } return SUCCESS; } //改 Error_manager Database_controller::sql_update(std::string sql_str) { #ifdef COUT_SQL_STRING LOG(INFO) << " sql_str = "<< sql_str << " --- " << this; #endif Error_manager t_error; //huli t_error = check_status(); if ( t_error != Error_code::SUCCESS ) { return t_error; } boost::shared_ptr tp_database_connect_channel; t_error = occupancy_connect_channel(tp_database_connect_channel); if ( t_error != Error_code::SUCCESS ) { return t_error; } char buf[1024]; memset(buf, 0, 1024); try { tp_database_connect_channel->mp_connect_channel->setSchema(m_db_param.db_name); boost::scoped_ptr stmt(tp_database_connect_channel->mp_connect_channel->createStatement()); int affected_rows = stmt->executeUpdate(sql_str); giveback_connect_channel(tp_database_connect_channel); if (affected_rows > 0) { return SUCCESS; } else { return Error_manager(Error_code::DB_UPDATE_FAILED, Error_level::MINOR_ERROR, (std::string("数据库修改失败 Database_controller::sql_update error ")+sql_str).c_str()); } } catch (sql::SQLException &e) { /* Use what() (derived from std::runtime_error) to fetch the error message */ giveback_connect_channel(tp_database_connect_channel); sprintf(buf, "# ERR: %s\n (MySQL error code: %d, SQLState: %s\nsql: %s", e.what(), e.getErrorCode(), e.getSQLState().c_str(), sql_str.c_str()); return Error_manager(DB_UPDATE_FAILED, NEGLIGIBLE_ERROR, buf); } catch (std::runtime_error &e) { giveback_connect_channel(tp_database_connect_channel); sprintf(buf, "# ERR: %s\n ERR: runtime_error in %s \nsql: %s",e.what(),__FILE__, sql_str.c_str()); return Error_manager(DB_UPDATE_FAILED, NEGLIGIBLE_ERROR, buf); } return SUCCESS; } //查 Error_manager Database_controller::sql_query(std::string sql_str, boost::shared_ptr< sql::ResultSet > &query_result) { #ifdef COUT_SQL_STRING LOG(INFO) << " sql_str = "<< sql_str << " --- " << this; #endif Error_manager t_error; //huli t_error = check_status(); if ( t_error != Error_code::SUCCESS ) { return t_error; } boost::shared_ptr tp_database_connect_channel; t_error = occupancy_connect_channel(tp_database_connect_channel); if ( t_error != Error_code::SUCCESS ) { return t_error; } char buf[1024]; memset(buf, 0, 1024); try { tp_database_connect_channel->mp_connect_channel->setSchema(m_db_param.db_name); boost::scoped_ptr stmt(tp_database_connect_channel->mp_connect_channel->createStatement()); query_result = boost::shared_ptr< sql::ResultSet >(stmt->executeQuery(sql_str)); giveback_connect_channel(tp_database_connect_channel); // 检查结果 if(query_result==nullptr) { return Error_manager(Error_code::DB_QUERY_FAILED, Error_level::MAJOR_ERROR, (std::string("数据库查询失败 Database_controller::sql_query error ")+sql_str).c_str()); } } catch (sql::SQLException &e) { /* Use what() (derived from std::runtime_error) to fetch the error message */ giveback_connect_channel(tp_database_connect_channel); sprintf(buf, "# ERR: %s\n (MySQL error code: %d, SQLState: %s\nsql: %s", e.what(), e.getErrorCode(), e.getSQLState().c_str(), sql_str.c_str()); return Error_manager(DB_UPDATE_FAILED, NEGLIGIBLE_ERROR, buf); } catch (std::runtime_error &e) { giveback_connect_channel(tp_database_connect_channel); sprintf(buf, "# ERR: %s\n ERR: runtime_error in %s \nsql: %s",e.what(),__FILE__, sql_str.c_str()); return Error_manager(DB_UPDATE_FAILED, NEGLIGIBLE_ERROR, buf); } return SUCCESS; } Error_manager Database_controller::sql_control(std::string sql_str) { #ifdef COUT_SQL_STRING LOG(INFO) << " sql_str = "<< sql_str << " --- " << this; #endif Error_manager t_error; //huli t_error = check_status(); if ( t_error != Error_code::SUCCESS ) { return t_error; } boost::shared_ptr tp_database_connect_channel; t_error = occupancy_connect_channel(tp_database_connect_channel); if ( t_error != Error_code::SUCCESS ) { return t_error; } char buf[1024]; memset(buf, 0, 1024); try { tp_database_connect_channel->mp_connect_channel->setSchema(m_db_param.db_name); boost::scoped_ptr stmt(tp_database_connect_channel->mp_connect_channel->createStatement()); stmt->execute(sql_str); giveback_connect_channel(tp_database_connect_channel); return SUCCESS; } catch (sql::SQLException &e) { giveback_connect_channel(tp_database_connect_channel); sprintf(buf, "# ERR: %s\n (MySQL error code: %d, SQLState: %s\nsql: %s", e.what(), e.getErrorCode(), e.getSQLState().c_str(), sql_str.c_str()); return Error_manager(DB_CONTROL_FAILED, NEGLIGIBLE_ERROR, buf); } catch (std::runtime_error &e) { giveback_connect_channel(tp_database_connect_channel); sprintf(buf, "# ERR: %s\n ERR: runtime_error in %s \nsql: %s",e.what(),__FILE__, sql_str.c_str()); return Error_manager(DB_CONTROL_FAILED, NEGLIGIBLE_ERROR, buf); } return SUCCESS; } void Database_controller::database_reconnect_thread() { //hili 这个用条件变量去控制 while (m_database_reconnect_condition.is_alive()) { m_database_reconnect_condition.wait_for_ex(std::chrono::seconds(1)); if (m_database_reconnect_condition.is_alive()) { updata_connect_channel(); fill_up_connect_channel(); } } } // 检查连接状态 Error_manager Database_controller::updata_connect_channel() { for (auto iter = m_database_connect_channel_list.begin(); iter != m_database_connect_channel_list.end();) { if ((*iter)->mp_connect_channel == nullptr) { iter = m_database_connect_channel_list.erase(iter); } else if ( (*iter)->m_occupancy_lock.try_lock() ) { if( (*iter)->mp_connect_channel->isValid() ) { (*iter)->m_occupancy_lock.unlock(); ++iter; } else { (*iter)->m_occupancy_lock.unlock(); iter = m_database_connect_channel_list.erase(iter); } } else { ++iter; } } return Error_code::SUCCESS; } ////创建补充 数据库连接的通路 Error_manager Database_controller::fill_up_connect_channel() { if ( m_conn_pool_size <=0 ) { return Error_manager(Error_code::DB_CONNECT_CHANNEL_NUMBER_ERROR, Error_level::MINOR_ERROR, "数据库连接通道数量错误 Database_controller::fill_up_connect_channel() error "); } // 填充连接池 int retry_count = DB_RECONNECTION_NUMBER; //胡力 建议修改如下 while (m_database_connect_channel_list.size() < m_conn_pool_size && retry_count > 0) { try { //胡力 数据库连接的指针 建议改名 tp_sql_connnect boost::shared_ptr tp_connect_channel(sql::mysql::get_driver_instance()->connect(m_db_param.connect_string, m_db_param.username, m_db_param.pass)); if (tp_connect_channel != nullptr && tp_connect_channel->isValid()) { boost::shared_ptr tp_database_connect_channel(new Database_connect_channel); tp_database_connect_channel->mp_connect_channel = tp_connect_channel; // tp_database_connect_channel->m_occupancy_lock.unlock(); m_database_connect_channel_list.push_back(tp_database_connect_channel); } else { retry_count--; } } catch (sql::SQLException &e) { char buf[1024]; memset(buf, 0, 1024); sprintf(buf, "# ERR: %s\n (MySQL error code: %d, SQLState: %s", e.what(), e.getErrorCode(), e.getSQLState().c_str()); retry_count--; } } if ( retry_count <= 0 ) { m_database_controller_status = E_DISCONNECT; //胡力 如果报错 队列中部分成功要回收处理 需要讨论!!!!!!!!!!!!!!!!!! m_database_connect_channel_list.clear(); return Error_manager(Error_code::DB_CONNECT_FAILED, Error_level::MINOR_ERROR, "数据库连接失败 Database_controller::fill_up_connect_channel() error "); } else { m_database_controller_status = E_READY; return Error_code::SUCCESS; } return SUCCESS; } //zhan you jieyong Error_manager Database_controller::occupancy_connect_channel(boost::shared_ptr & p_database_connect_channel) { for (auto iter = m_database_connect_channel_list.begin(); iter != m_database_connect_channel_list.end(); ++iter) { if ( (*iter)->mp_connect_channel != nullptr) { if ( (*iter)->m_occupancy_lock.try_lock() ) { if ( (*iter)->mp_connect_channel->isValid() ) // if ( !(*iter)->mp_connect_channel->isClosed() ) { p_database_connect_channel = (*iter); return Error_code::SUCCESS; } (*iter)->m_occupancy_lock.unlock(); } } // // // // if ( (*iter)->mp_connect_channel != nullptr && (*iter)->mp_connect_channel->isValid() ) // { // if ( (*iter)->m_occupancy_lock.try_lock() ) // { // p_database_connect_channel = (*iter); // return Error_code::SUCCESS; // } // // } } //if not find return Error_manager(Error_code::DB_CONNECT_CHANNEL_NOT_FOUND, Error_level::MINOR_ERROR, "数据库连接通道未找到 Database_controller::occupancy_connect_channel error "); } //guihuan Error_manager Database_controller::giveback_connect_channel(boost::shared_ptr p_database_connect_channel) { if ( p_database_connect_channel == nullptr ) { return Error_manager(Error_code::POINTER_IS_NULL, Error_level::MINOR_ERROR, "指针为空 Database_controller::giveback_connect_channel error "); } p_database_connect_channel->m_occupancy_lock.unlock(); return Error_code::SUCCESS; }