123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572 |
- /*
- * @Description: 数据库操作类
- * @Author: yct
- * @Date: 2020-07-15 14:08:46
- * @LastEditTime: 2020-07-24 14:19:31
- * @LastEditors: yct
- */
- #include "database_controller.h"
- Database_controller::Database_controller()
- {
- m_database_controller_status = E_UNKNOWN;
- mp_reconnect_thread = nullptr;
- m_conn_pool_size = 0;
- }
- Database_controller::~Database_controller()
- {
- database_controller_uninit();
- }
- Error_manager Database_controller::database_controller_init()
- {
- return database_controller_init_from_protobuf(DATABASE_COMMUNICATION_PARAMETER_PATH);
- }
- Error_manager Database_controller::database_controller_init_from_protobuf(std::string prototxt_path)
- {
- parkspace_proto::Database_communication_configuration_all t_database_config;
- if(! proto_tool::read_proto_param(prototxt_path,t_database_config) )
- {
- return Error_manager(DB_PROTOBUF_ERROR,MINOR_ERROR,
- "database_controller_init_from_protobuf(std::string prototxt_path) failed");
- }
- else
- {
- Error_manager t_error;
- t_error=database_controller_init(t_database_config.database_communication_configurations().db_ip(),\
- t_database_config.database_communication_configurations().db_port(),\
- t_database_config.database_communication_configurations().db_username(),\
- t_database_config.database_communication_configurations().db_passwd(),\
- t_database_config.database_communication_configurations().db_name(),\
- t_database_config.database_communication_configurations().db_conn_pool_size());
- if ( t_error != Error_code::SUCCESS )
- {
- return t_error;
- }
- }
- return SUCCESS;
- }
- //初始化
- Error_manager Database_controller::database_controller_init(std::string ip, int port, std::string username, std::string pass, std::string db_name, int conn_size)
- {
- Error_manager t_error;
- //系统结束标记与连接池大小
- // mb_exit = false;
- m_conn_pool_size = conn_size;
- m_db_param.connect_string = std::string("tcp://").append(ip).append(":").append(std::to_string(port));
- m_db_param.username = username;
- m_db_param.pass = pass;
- m_db_param.db_name = db_name;
- if(sql::mysql::get_driver_instance() == nullptr)
- {
- //胡力 直接修改状态
- m_database_controller_status = E_FAULT;
- // mb_initialized = false;
- return DB_INIT_FAILED;
- return Error_manager(Error_code::DB_INIT_FAILED, Error_level::MAJOR_ERROR,
- "数据库初始化失败 Database_controller::database_controller_init error ");
- }
- ////创建补充 数据库连接的通路
- t_error = fill_up_connect_channel();
- //守护线程,检查连接状态并保持连接数量
- m_database_reconnect_condition.reset(false,false,false);
- mp_reconnect_thread = new std::thread(&Database_controller::database_reconnect_thread, this);
- return t_error;
- }
- //反初始化
- Error_manager Database_controller::database_controller_uninit()
- {
- // mb_exit = true;
- if(mp_reconnect_thread!=nullptr)
- {
- m_database_reconnect_condition.kill_all();
- }
- if(mp_reconnect_thread!=nullptr)
- {
- if(mp_reconnect_thread->joinable())
- mp_reconnect_thread->join();
- delete mp_reconnect_thread;
- mp_reconnect_thread = nullptr;
- }
- m_database_connect_channel_list.clear();
- return SUCCESS;
- }
- Database_controller::Database_controller_status Database_controller::get_database_controller_status()
- {
- return m_database_controller_status;
- }
- Error_manager Database_controller::check_status()
- {
- if(get_database_controller_status()==E_READY)
- {
- return SUCCESS;
- }
- else if(get_database_controller_status() == E_UNKNOWN)
- {
- return Error_manager(Error_code::DB_INIT_FAILED, Error_level::MINOR_ERROR,
- "数据库初始化失败 check_status() error ");
- }
- else if(get_database_controller_status() == E_DISCONNECT)
- {
- return Error_manager(Error_code::DB_CONNECT_FAILED, Error_level::MINOR_ERROR,
- "数据库连接失败 check_status() error ");
- }
- else if(get_database_controller_status() == E_FAULT)
- {
- return Error_manager(Error_code::DB_STATUS_ERROR, Error_level::MINOR_ERROR,
- "数据库状态错误 check_status() error ");
- }
- }
- //数据库连接状态
- bool Database_controller::is_connected()
- {
- if ( m_database_controller_status == E_READY)
- {
- return true;
- }
- else
- {
- return false;
- }
- //
- // updata_connect_channel();
- // return mb_connected;
- }
- //****** 增删改查功能 *******
- //增
- Error_manager Database_controller::sql_insert(std::string sql_str)
- {
- #ifdef COUT_SQL_STRING
- LOG(INFO) << " sql_str = "<< sql_str << " --- " << this;
- #endif
- Error_manager t_error;
- //huli
- t_error = check_status();
- if ( t_error != Error_code::SUCCESS )
- {
- return t_error;
- }
- boost::shared_ptr<Database_connect_channel> tp_database_connect_channel;
- t_error = occupancy_connect_channel(tp_database_connect_channel);
- if ( t_error != Error_code::SUCCESS )
- {
- return t_error;
- }
- char buf[1024];
- memset(buf, 0, 1024);
- try
- {
- tp_database_connect_channel->mp_connect_channel->setSchema(m_db_param.db_name);
- boost::scoped_ptr<sql::Statement> stmt(tp_database_connect_channel->mp_connect_channel->createStatement());
- stmt->execute(sql_str);
- giveback_connect_channel(tp_database_connect_channel);
- return SUCCESS;
- }
- catch (sql::SQLException &e)
- {
- giveback_connect_channel(tp_database_connect_channel);
- /* Use what() (derived from std::runtime_error) to fetch the error message */
- sprintf(buf, "# ERR: %s\n (MySQL error code: %d, SQLState: %s\nsql: %s", e.what(), e.getErrorCode(), e.getSQLState().c_str(), sql_str.c_str());
- usleep(1000* 3000);
- return Error_manager(DB_INSERT_FAILED, NEGLIGIBLE_ERROR, buf);
- }
- catch (std::runtime_error &e)
- {
- giveback_connect_channel(tp_database_connect_channel);
- sprintf(buf, "# ERR: %s\n ERR: runtime_error in %s \nsql: %s",e.what(),__FILE__, sql_str.c_str());
- return Error_manager(DB_INSERT_FAILED, NEGLIGIBLE_ERROR, buf);
- }
- // catch (std::exception &e)
- // {
- // sprintf(buf, "# ERR: %s\n ERR: Standard exception in %s ",e.what(),__FILE__);
- // return Error_manager(DB_INSERT_FAILED, NEGLIGIBLE_ERROR, buf);
- // }
-
- return SUCCESS;
- }
- //删
- Error_manager Database_controller::sql_delete(std::string sql_str)
- {
- #ifdef COUT_SQL_STRING
- LOG(INFO) << " sql_str = "<< sql_str << " --- " << this;
- #endif
- // 1.检查状态,一旦无可用连接,则主动创建一个,若依然失败则直接返回错误
- Error_manager t_error;
- //huli
- t_error = check_status();
- if ( t_error != Error_code::SUCCESS )
- {
- return t_error;
- }
- boost::shared_ptr<Database_connect_channel> tp_database_connect_channel;
- t_error = occupancy_connect_channel(tp_database_connect_channel);
- if ( t_error != Error_code::SUCCESS )
- {
- return t_error;
- }
- char buf[1024];
- memset(buf, 0, 1024);
- try
- {
- tp_database_connect_channel->mp_connect_channel->setSchema(m_db_param.db_name);
- boost::scoped_ptr<sql::Statement> stmt(tp_database_connect_channel->mp_connect_channel->createStatement());
- stmt->execute(sql_str);
- giveback_connect_channel(tp_database_connect_channel);
- return SUCCESS;
- }
- catch (sql::SQLException &e)
- {
- /* Use what() (derived from std::runtime_error) to fetch the error message */
- giveback_connect_channel(tp_database_connect_channel);
- sprintf(buf, "# ERR: %s\n (MySQL error code: %d, SQLState: %s\nsql: %s", e.what(), e.getErrorCode(), e.getSQLState().c_str(), sql_str.c_str());
- return Error_manager(DB_DELETE_FAILED, NEGLIGIBLE_ERROR, buf);
- }
- catch (std::runtime_error &e)
- {
- giveback_connect_channel(tp_database_connect_channel);
- sprintf(buf, "# ERR: %s\n ERR: runtime_error in %s \nsql: %s",e.what(),__FILE__, sql_str.c_str());
- return Error_manager(DB_DELETE_FAILED, NEGLIGIBLE_ERROR, buf);
- }
- return SUCCESS;
- }
- //改
- Error_manager Database_controller::sql_update(std::string sql_str)
- {
- #ifdef COUT_SQL_STRING
- LOG(INFO) << " sql_str = "<< sql_str << " --- " << this;
- #endif
- Error_manager t_error;
- //huli
- t_error = check_status();
- if ( t_error != Error_code::SUCCESS )
- {
- return t_error;
- }
- boost::shared_ptr<Database_connect_channel> tp_database_connect_channel;
- t_error = occupancy_connect_channel(tp_database_connect_channel);
- if ( t_error != Error_code::SUCCESS )
- {
- return t_error;
- }
- char buf[1024];
- memset(buf, 0, 1024);
- try
- {
- tp_database_connect_channel->mp_connect_channel->setSchema(m_db_param.db_name);
- boost::scoped_ptr<sql::Statement> stmt(tp_database_connect_channel->mp_connect_channel->createStatement());
- int affected_rows = stmt->executeUpdate(sql_str);
- giveback_connect_channel(tp_database_connect_channel);
- if (affected_rows > 0)
- {
- return SUCCESS;
- }
- else
- {
- return Error_manager(Error_code::DB_UPDATE_FAILED, Error_level::MINOR_ERROR,
- (std::string("数据库修改失败 Database_controller::sql_update error ")+sql_str).c_str());
- }
- }
- catch (sql::SQLException &e)
- {
- /* Use what() (derived from std::runtime_error) to fetch the error message */
- giveback_connect_channel(tp_database_connect_channel);
- sprintf(buf, "# ERR: %s\n (MySQL error code: %d, SQLState: %s\nsql: %s", e.what(), e.getErrorCode(), e.getSQLState().c_str(), sql_str.c_str());
- return Error_manager(DB_UPDATE_FAILED, NEGLIGIBLE_ERROR, buf);
- }
- catch (std::runtime_error &e)
- {
- giveback_connect_channel(tp_database_connect_channel);
- sprintf(buf, "# ERR: %s\n ERR: runtime_error in %s \nsql: %s",e.what(),__FILE__, sql_str.c_str());
- return Error_manager(DB_UPDATE_FAILED, NEGLIGIBLE_ERROR, buf);
- }
- return SUCCESS;
- }
- //查
- Error_manager Database_controller::sql_query(std::string sql_str, boost::shared_ptr< sql::ResultSet > &query_result)
- {
- #ifdef COUT_SQL_STRING
- LOG(INFO) << " sql_str = "<< sql_str << " --- " << this;
- #endif
- Error_manager t_error;
- //huli
- t_error = check_status();
- if ( t_error != Error_code::SUCCESS )
- {
- return t_error;
- }
- boost::shared_ptr<Database_connect_channel> tp_database_connect_channel;
- t_error = occupancy_connect_channel(tp_database_connect_channel);
- if ( t_error != Error_code::SUCCESS )
- {
- return t_error;
- }
- char buf[1024];
- memset(buf, 0, 1024);
- try
- {
- tp_database_connect_channel->mp_connect_channel->setSchema(m_db_param.db_name);
- boost::scoped_ptr<sql::Statement> stmt(tp_database_connect_channel->mp_connect_channel->createStatement());
- query_result = boost::shared_ptr< sql::ResultSet >(stmt->executeQuery(sql_str));
- giveback_connect_channel(tp_database_connect_channel);
- // 检查结果
- if(query_result==nullptr)
- {
- return Error_manager(Error_code::DB_QUERY_FAILED, Error_level::MAJOR_ERROR,
- (std::string("数据库查询失败 Database_controller::sql_query error ")+sql_str).c_str());
- }
- }
- catch (sql::SQLException &e)
- {
- /* Use what() (derived from std::runtime_error) to fetch the error message */
- giveback_connect_channel(tp_database_connect_channel);
- sprintf(buf, "# ERR: %s\n (MySQL error code: %d, SQLState: %s\nsql: %s", e.what(), e.getErrorCode(), e.getSQLState().c_str(), sql_str.c_str());
- return Error_manager(DB_UPDATE_FAILED, NEGLIGIBLE_ERROR, buf);
- }
- catch (std::runtime_error &e)
- {
- giveback_connect_channel(tp_database_connect_channel);
- sprintf(buf, "# ERR: %s\n ERR: runtime_error in %s \nsql: %s",e.what(),__FILE__, sql_str.c_str());
- return Error_manager(DB_UPDATE_FAILED, NEGLIGIBLE_ERROR, buf);
- }
- return SUCCESS;
- }
- Error_manager Database_controller::sql_control(std::string sql_str)
- {
- #ifdef COUT_SQL_STRING
- LOG(INFO) << " sql_str = "<< sql_str << " --- " << this;
- #endif
- Error_manager t_error;
- //huli
- t_error = check_status();
- if ( t_error != Error_code::SUCCESS )
- {
- return t_error;
- }
- boost::shared_ptr<Database_connect_channel> tp_database_connect_channel;
- t_error = occupancy_connect_channel(tp_database_connect_channel);
- if ( t_error != Error_code::SUCCESS )
- {
- return t_error;
- }
- char buf[1024];
- memset(buf, 0, 1024);
- try
- {
- tp_database_connect_channel->mp_connect_channel->setSchema(m_db_param.db_name);
- boost::scoped_ptr<sql::Statement> stmt(tp_database_connect_channel->mp_connect_channel->createStatement());
- stmt->execute(sql_str);
- giveback_connect_channel(tp_database_connect_channel);
- return SUCCESS;
- }
- catch (sql::SQLException &e)
- {
- giveback_connect_channel(tp_database_connect_channel);
- sprintf(buf, "# ERR: %s\n (MySQL error code: %d, SQLState: %s\nsql: %s", e.what(), e.getErrorCode(), e.getSQLState().c_str(), sql_str.c_str());
- return Error_manager(DB_CONTROL_FAILED, NEGLIGIBLE_ERROR, buf);
- }
- catch (std::runtime_error &e)
- {
- giveback_connect_channel(tp_database_connect_channel);
- sprintf(buf, "# ERR: %s\n ERR: runtime_error in %s \nsql: %s",e.what(),__FILE__, sql_str.c_str());
- return Error_manager(DB_CONTROL_FAILED, NEGLIGIBLE_ERROR, buf);
- }
- return SUCCESS;
- }
- void Database_controller::database_reconnect_thread()
- {
- //hili 这个用条件变量去控制
- while (m_database_reconnect_condition.is_alive())
- {
- m_database_reconnect_condition.wait_for_ex(std::chrono::seconds(1));
- if (m_database_reconnect_condition.is_alive())
- {
- updata_connect_channel();
- fill_up_connect_channel();
- }
- }
- }
- // 检查连接状态
- Error_manager Database_controller::updata_connect_channel()
- {
- for (auto iter = m_database_connect_channel_list.begin(); iter != m_database_connect_channel_list.end();)
- {
- if ((*iter)->mp_connect_channel == nullptr)
- {
- iter = m_database_connect_channel_list.erase(iter);
- }
- else if ( (*iter)->m_occupancy_lock.try_lock() )
- {
- if( (*iter)->mp_connect_channel->isValid() )
- {
- (*iter)->m_occupancy_lock.unlock();
- ++iter;
- }
- else
- {
- (*iter)->m_occupancy_lock.unlock();
- iter = m_database_connect_channel_list.erase(iter);
- }
- }
- else
- {
- ++iter;
- }
- }
- return Error_code::SUCCESS;
- }
- ////创建补充 数据库连接的通路
- Error_manager Database_controller::fill_up_connect_channel()
- {
- if ( m_conn_pool_size <=0 )
- {
- return Error_manager(Error_code::DB_CONNECT_CHANNEL_NUMBER_ERROR, Error_level::MINOR_ERROR,
- "数据库连接通道数量错误 Database_controller::fill_up_connect_channel() error ");
- }
- // 填充连接池
- int retry_count = DB_RECONNECTION_NUMBER;
- //胡力 建议修改如下
- while (m_database_connect_channel_list.size() < m_conn_pool_size && retry_count > 0)
- {
- try
- {
- //胡力 数据库连接的指针 建议改名 tp_sql_connnect
- boost::shared_ptr<sql::Connection> tp_connect_channel(sql::mysql::get_driver_instance()->connect(m_db_param.connect_string, m_db_param.username, m_db_param.pass));
- if (tp_connect_channel != nullptr && tp_connect_channel->isValid())
- {
- boost::shared_ptr<Database_connect_channel> tp_database_connect_channel(new Database_connect_channel);
- tp_database_connect_channel->mp_connect_channel = tp_connect_channel;
- // tp_database_connect_channel->m_occupancy_lock.unlock();
- m_database_connect_channel_list.push_back(tp_database_connect_channel);
- }
- else
- {
- retry_count--;
- }
- }
- catch (sql::SQLException &e)
- {
- char buf[1024];
- memset(buf, 0, 1024);
- sprintf(buf, "# ERR: %s\n (MySQL error code: %d, SQLState: %s", e.what(), e.getErrorCode(), e.getSQLState().c_str());
- retry_count--;
- }
- }
- if ( retry_count <= 0 )
- {
- m_database_controller_status = E_DISCONNECT;
- //胡力 如果报错 队列中部分成功要回收处理 需要讨论!!!!!!!!!!!!!!!!!!
- m_database_connect_channel_list.clear();
- return Error_manager(Error_code::DB_CONNECT_FAILED, Error_level::MINOR_ERROR,
- "数据库连接失败 Database_controller::fill_up_connect_channel() error ");
- }
- else
- {
- m_database_controller_status = E_READY;
- return Error_code::SUCCESS;
- }
- return SUCCESS;
- }
- //zhan you jieyong
- Error_manager Database_controller::occupancy_connect_channel(boost::shared_ptr<Database_connect_channel> & p_database_connect_channel)
- {
- for (auto iter = m_database_connect_channel_list.begin(); iter != m_database_connect_channel_list.end(); ++iter)
- {
- if ( (*iter)->mp_connect_channel != nullptr)
- {
- if ( (*iter)->m_occupancy_lock.try_lock() )
- {
- if ( (*iter)->mp_connect_channel->isValid() )
- // if ( !(*iter)->mp_connect_channel->isClosed() )
- {
- p_database_connect_channel = (*iter);
- return Error_code::SUCCESS;
- }
- (*iter)->m_occupancy_lock.unlock();
- }
- }
- //
- //
- //
- // if ( (*iter)->mp_connect_channel != nullptr && (*iter)->mp_connect_channel->isValid() )
- // {
- // if ( (*iter)->m_occupancy_lock.try_lock() )
- // {
- // p_database_connect_channel = (*iter);
- // return Error_code::SUCCESS;
- // }
- //
- // }
- }
- //if not find
- return Error_manager(Error_code::DB_CONNECT_CHANNEL_NOT_FOUND, Error_level::MINOR_ERROR,
- "数据库连接通道未找到 Database_controller::occupancy_connect_channel error ");
- }
- //guihuan
- Error_manager Database_controller::giveback_connect_channel(boost::shared_ptr<Database_connect_channel> p_database_connect_channel)
- {
- if ( p_database_connect_channel == nullptr )
- {
- return Error_manager(Error_code::POINTER_IS_NULL, Error_level::MINOR_ERROR,
- "指针为空 Database_controller::giveback_connect_channel error ");
- }
- p_database_connect_channel->m_occupancy_lock.unlock();
- return Error_code::SUCCESS;
- }
|