继续上次那个需求的优化
使用MYSQL CONNECTOR/C++ JDBC接口开发MYSQL 应用程序 MYSQL C++接口 用AI和VSCODE开发MYSQLC++(JDBC)应用脚本 如何优化千倍降低加密耗时? 在这三篇文章介绍使用C++以及接口开发应用程序,或者小工具 最后优化是加密,放弃系统调用,直接用OPENSSL函数库.
今天我们继续优化,使用C++多线程 去并行更新MYSQL 下面先拿单线程数据看看
单线程模型
Total Rows:1
TotalRow1: 1538
TotalRow2: 1538
ColName: count(*)
SQLSTRING_VALUE: 1538
Get SQL LIST Elapsed Time:0s, 11ms, 741us, 658ns
ENCRY NEW KEY Elapsed Time:0s, 6ms, 950us, 639ns
UPDATE SQL Elapsed Time:4s, 585ms, 155us, 288ns
TotalUpdateRows=1538
ALL Elapsed Time:4s, 639ms, 383us, 839ns
Performance counter stats
for
'./main.exe':
238.39 msec task-clock
# 0.028 CPUs utilized
3,118 context-switches
# 0.013 M/sec
31 cpu-migrations
# 0.130 K/sec
1,392 page-faults
# 0.006 M/sec
564,511,231 cycles
# 2.368 GHz
274,603,902 instructions
# 0.49 insn per cycle
38,662,453 branches
# 162.179 M/sec
2,288,515 branch-misses
# 5.92% of all branches
8.484480222 seconds time elapsed
0.041685000 seconds user
0.282866000 seconds sys
产生3866万个分支; 分支预测失败率5.92%
多线程模型
SQLSTRING_VALUE: 1538
Get SQL LIST Elapsed Time:0s, 15ms, 676us, 971ns
ENCRY NEW KEY Elapsed Time:0s, 9ms, 639us, 807ns
RUN THREAD:0
RUN THREAD:1
RUN THREAD:2
RUN THREAD:3
Update RemainRow:2 Actaul Update Rows:3
Thread 3:TotalUpdateRows=383
Thread 2:TotalUpdateRows=384
Thread 0:TotalUpdateRows=384
0:线程已经结束执行。
Thread 1:TotalUpdateRows=384
1:线程已经结束执行。
2:线程已经结束执行。
3:线程已经结束执行。
Thread Update Elapsed Time:3s, 148ms, 616us, 18ns
MAIN FREE:
ALL Elapsed Time:3s, 197ms, 509us, 467ns
Performance counter stats
for
'./main.exe':
163.42 msec task-clock
# 0.048 CPUs utilized
1,604 context-switches
# 0.010 M/sec
54 cpu-migrations
# 0.330 K/sec
1,529 page-faults
# 0.009 M/sec
421,362,571 cycles
# 2.578 GHz
292,511,736 instructions
# 0.69 insn per cycle
39,041,804 branches
# 238.906 M/sec
1,662,566 branch-misses
# 4.26% of all branches
3.426073087 seconds time elapsed
0.041813000 seconds user
0.187438000 seconds sys
这么看来多线程下只提升了1秒 25% 有点鸡骨!
CPU耗时163.42毫秒;
发生1604次上下文切换;
54次CPU迁移; 4.2136亿CPU周期;
产生2.9251亿指令;
每周期0.69个指令;
产生39041万个分支;
分支预测失败率4.26% 绿色是提升,红色代表下降
简单的C++ 多线程
#
include
<iostream>
#
include
<thread>
#
include
<vector>
void
threadFunction
(
int id)
{
// 子线程的代码逻辑
std::
cout <<
"子线程 " << id <<
" 执行完毕" <<
std::
endl;
}
int
main
()
{
std::
vector<
std::thread> threads;
// 创建多个子线程并将它们添加到向量中
for (
int i =
0; i <
5; ++i)
{
threads.push_back(
std::thread(threadFunction, i));
}
// 等待所有子线程完成
for (
auto& t : threads)
{
t.join();
}
std::
cout <<
"所有子线程已结束" <<
std::
endl;
return
0;
}
调用线程库<thread.h> 这是C++的库. 另外还有LINUX系统线程库,一般为POSIX线程库 #include<pthread.h> 定义了 线程 数组 std::vector<std::thread> threads; 创建线程并插入到数组后面 threads.push_back(std::thread(threadFunction, i)); 创建线程 并分配工作函数,紧跟着函数的参数i std ::thread(threadFunction, i)
线程创建就立马运行; 下个循环就是主程序空转,等待工作线程全部结束.
编写多线程函数的时候遇到的问题如下 1 线程工作函数的参数不支持引用 报错是无法重载 2 线程参数 无论是引用,指针 都是值COPY方式 3 线程结束后会摧毁一切,包含参数传来的对象 4 JOIN()和 ThreadArray[i].detach(); 区别
2和1有点矛盾...... 3 意味着它会FREE 掉MYSQL的链接,报DUBLE FREE
*** Error in `/home/shark/projects/CPP_Projects/UPDATE_MYSQL_CRYPTO/main.exe': double free or corruption (fasttop): 0x0000000000655150 *** Error in `/home/shark/projects/CPP_Projects/UPDATE_MYSQL_CRYPTO/main.exe': invalid fastbin entry (free): 0x00000000006680a0
通过 GDB 堆栈了解到释放了connect对像
(gdb) bt
#0 0x00007ffff6195989 in raise () from /lib64/libc.so.6
#1 0x00007ffff6197098 in abort () from /lib64/libc.so.6
#2 0x00007ffff61d6197 in __libc_message () from /lib64/libc.so.6
#3 0x00007ffff61dd56d in _int_free () from /lib64/libc.so.6
#4 0x00007ffff75d0bdd in CRYPTO_free () from lib64/libcrypto.so.1.0.0
#5 0x00007ffff76973f7 in X509_VERIFY_PARAM_free () from lib64/libcrypto.so.1.0.0
#6 0x00007ffff79963c9 in SSL_free () from lib64/libssl.so.1.0.0
#7 0x00007ffff6e39fa8 in vio_ssl_delete (vio=0x643c70) at ../../mysql-8.0.11/vio/viossl.cc:350
#8 0x00007ffff6e19807 in end_server (mysql=0x642920) at ../../mysql-8.0.11/sql-common/client.cc:1486
#9 0x00007ffff6e19b0b in cli_safe_read_with_ok (mysql=mysql@entry=0x642920, parse_ok=parse_ok@entry=false, is_data_packet=is_data_packet@entry=0x0) at ../../mysql-8.0.11/sql-common/client.cc:1010
#10 0x00007ffff6e19c9f in cli_safe_read (mysql=mysql@entry=0x642920, is_data_packet=is_data_packet@entry=0x0) at ../../mysql-8.0.11/sql-common/client.cc:1115
#11 0x00007ffff6e1abe9 in cli_read_query_result (mysql=0x642920) at ../../mysql-8.0.11/sql-common/client.cc:5209
#12 0x00007ffff6e24bbf in execute (stmt=stmt@entry=0x743790, packet=packet@entry=0x7ffff0000a30 "", length=length@entry=171) at ../../mysql-8.0.11/libmysql/libmysql.cc:1933
#13 0x00007ffff6e25a15 in cli_stmt_execute (stmt=0x743790) at ../../mysql-8.0.11/libmysql/libmysql.cc:2057
#14 0x00007ffff6e26bb4 in mysql_stmt_execute (stmt=0x743790) at ../../mysql-8.0.11/libmysql/libmysql.cc:2392
#15 0x00007ffff6df9c2a in sql::mysql::MySQL_Prepared_Statement::do_query() () from lib64/libmysqlcppconn.so.7
#16 0x00007ffff6df60de in sql::mysql::MySQL_Prepared_Statement::executeUpdate() () from lib64/libmysqlcppconn.so.7
#17 0x0000000000405379 in ProcessUpdateData (RowLimit=100, prep_stmt=0x7436e0, subDataArray=std::vector of length 384, capacity 384 = {...}) at main.cpp:440
#18 0x000000000040ba35 in std::_Bind_simple<void (*(int, sql::PreparedStatement*, std::vector<TUpdateData, std::allocator<TUpdateData> >))(int, sql::PreparedStatement*, std::vector<TUpdateData, std::allocator<TUpdateData> >)>::_M_invoke<0ul, 1ul, 2ul>(std::_Index_tuple<0ul, 1ul, 2ul>) (this=0x751d70) at /usr/include/c++/4.8.2/functional:1732
#19 0x000000000040b854 in std::_Bind_simple<void (*(int, sql::PreparedStatement*, std::vector<TUpdateData, std::allocator<TUpdateData> >))(int, sql::PreparedStatement*, std::vector<TUpdateData, std::allocator<TUpdateData> >)>::operator()() (this=0x751d70) at /usr/include/c++/4.8.2/functional:1720
#20 0x000000000040b7dd in std::thread::_Impl<std::_Bind_simple<void (*(int, sql::PreparedStatement*, std::vector<TUpdateData, std::allocator<TUpdateData> >))(int, sql::PreparedStatement*, std::vector<TUpdateData, std::allocator<TUpdateData> >)> >::_M_run() (this=0x751d58) at /usr/include/c++/4.8.2/thread:115
#21 0x00007ffff6aedda0 in ?? () from /lib64/libstdc++.so.6
#22 0x00007ffff7bc7df3 in start_thread () from /lib64/libpthread.so.0
#23 0x00007ffff62563dd in clone () from /lib64/libc.so.6
为此不得不NEW一个出来
//传给线程工作函数的必须是NEW出新对象,线程退出它会摧毁所有传过去的对象
sql::mysql::MySQL_Driver *driverThread =
new sql::mysql::MySQL_Driver();
sql::Connection *connectThread = driverThread->connect(URL, config[
"DB_USER"], config[
"DB_PASW"]);
connectThread->setSchema(config[
"DB_NAME"]);
sql::PreparedStatement *prep_stmtThread = connectThread->prepareStatement(config[
"UPDATE_SQL"]);
std::
cout <<
"RUN THREAD:" << i <<
std::
endl;
ThreadArray[i] =
std::thread(ProcessUpdateData, i, prep_stmtThread, subUpdateDataArray);
// 值传给线程函数
4 AI 给了错误提示 循环启动线程,然后马上join() 导致主程序等待线程完成,再创建下个线程. 换成detach() 主程序就不等待了,立马把所有线程启动起来了. 可是这样主线程执行完后面的代码就结束了.主程序一结束立马把所有线程都杀了. 为此使用下面语句进行等待一段时间. 这就没有意思了
std::this_thread::sleep_for(std::chrono::seconds(10));
所以采用双循环模式,先把所有线程启动,运行,然后再另外个循环等待 其实这英文单词取得很绕 join() 应该是wait() detach()应该是nowait()
最后C++相对于C语言来说编程非常便利和友好化 1 动态数组 vercort<> 可以RESIZE() 后插自动RESIZE() 2 数组动态定义大小
std::thread ThreadArray[WorkThreadNum];
3 类型自动推动 auto i=3; 自动推动i是整型 4 智能指针 auto_ptr std::auto_ptr<int> ap1; 使用完不用delete,自动释放。 unique_ptr 独立指针,禁止指针拷贝; shared_ptr 共享指针,通过计数器方式解决释放问题
主程序 多线程调用核心源码
auto start_time_Thread =
std::chrono::high_resolution_clock::now();
u_int32_t RemainRows =
0;
// max 42,9496,7295
u_int32_t PerThreadProcessRow =
0;
// max 42,9496,7295
u_int8_t WorkThreadNum =
std::stoi(config[
"WORK_THEAD"]);
// max 256
int ROWLIMIT =
std::stoi(config[
"ROW_LIMIT"]);
// max 65535
if (Total_row1 % WorkThreadNum >
0)
{
PerThreadProcessRow = Total_row1 / WorkThreadNum;
RemainRows = Total_row1 % WorkThreadNum;
}
else
{
PerThreadProcessRow = Total_row1 / WorkThreadNum;
}
// UpdateDataArray.resize(Old_Data_Map.size());
// subUpdateDataArray.resize(PerThreadProcessRow);
// 两个MAP 对象 合并成一个结构体动态容器
std::
vector<TUpdateData>
UpdateDataArray
(Old_Data_Map.size()),
subUpdateDataArray
(PerThreadProcessRow);
auto Old_it = Old_Data_Map.begin();
auto New_it = New_Data_Map.begin();
for (
int i =
0; Old_it != Old_Data_Map.end(); i++, ++Old_it, ++New_it)
{
UpdateDataArray[i].Card_id = Old_it->first;
UpdateDataArray[i].Old_Encry = Old_it->second;
UpdateDataArray[i].New_Encry = New_it->second;
}
//生成线程数组,分配数据,启动线程
std::thread ThreadArray[WorkThreadNum];
for (
int i =
0; i < WorkThreadNum; i++)
{
for (
int j =
0; j < PerThreadProcessRow; j++)
{
int k = i * PerThreadProcessRow + j;
subUpdateDataArray[j] = UpdateDataArray[k];
}
//传给线程工作函数的必须是NEW出新对象,线程退出它会摧毁所有传过去的对象
sql::mysql::MySQL_Driver *driverThread =
new sql::mysql::MySQL_Driver();
sql::Connection *connectThread = driverThread->connect(URL, config[
"DB_USER"], config[
"DB_PASW"]);
connectThread->setSchema(config[
"DB_NAME"]);
sql::PreparedStatement *prep_stmtThread = connectThread->prepareStatement(config[
"UPDATE_SQL"]);
std::
cout <<
"RUN THREAD:" << i <<
std::
endl;
ThreadArray[i] =
std::thread(ProcessUpdateData, i, prep_stmtThread, subUpdateDataArray);
// 值传给线程函数
// ThreadArray[i].detach();
}
//使用主线程链接语句对象完成剩余数据的更新
sql::PreparedStatement *prep_stmt =
nullptr;
int UpdateCount=
0;
int TotalUpdateRows=
0;
prep_stmt = con->prepareStatement(config[
"UPDATE_SQL"]);
if (RemainRows >
0 )
{
for (
int f=WorkThreadNum*PerThreadProcessRow
-1; f < UpdateDataArray.size();f++)
{
prep_stmt->setString(
1, UpdateDataArray[f].New_Encry);
prep_stmt->setString(
2, UpdateDataArray[f].Card_id);
prep_stmt->setString(
3, UpdateDataArray[f].Old_Encry);
UpdateCount = prep_stmt->executeUpdate();
TotalUpdateRows = UpdateCount + TotalUpdateRows;
}
std::
cout <<
"Update RemainRow:"<<RemainRows<<
"\t Actaul Update Rows:"<<TotalUpdateRows<<
std::
endl;
RemainRows =
0;
}
//等待所有线程工作完成
for (
int i =
0; i < WorkThreadNum; ++i)
{
if (ThreadArray[i].joinable())
{
ThreadArray[i].join();
std::
cout << i <<
":线程已经结束执行。" <<
std::
endl;
}
}
auto end_time_Thread =
std::chrono::high_resolution_clock::now();
std::
cout <<
"Thread Update Elapsed Time:" << Calc_time_diff(end_time_Thread, start_time_Thread) <<
std::
endl;
线程工作函数
void
ProcessUpdateData
(
int RowLimit, sql::PreparedStatement *prep_stmt,
std::
vector<TUpdateData> subDataArray)
{
u_int64_t UpdateCount, TotalUpdateRows;
UpdateCount =
0;
TotalUpdateRows =
0;
try
{
// prep_stmt->getConnection()->setAutoCommit(false);
for (
auto &pArray : subDataArray)
{
prep_stmt->setString(
1, pArray.New_Encry);
prep_stmt->setString(
2, pArray.Card_id);
prep_stmt->setString(
3, pArray.Old_Encry);
UpdateCount = prep_stmt->executeUpdate();
TotalUpdateRows = UpdateCount + TotalUpdateRows;
}
std::
cout <<
"Thread " << RowLimit <<
":TotalUpdateRows=" << TotalUpdateRows <<
std::
endl;
}
catch (sql::SQLException &e)
{
std::
cout <<
"# ERR: SQLException in " << __FILE__;
// 打印异常 文件名 函数名 行号 SQL异常 代码 状态等信息
std::
cout <<
"(" << __FUNCTION__ <<
") on line " << __LINE__ <<
std::
endl;
std::
cout <<
"SQLException: " << e.what() <<
std::
endl;
std::
cout <<
"MySQL error code: " << e.getErrorCode() <<
std::
endl;
std::
cout <<
"SQLState: " << e.getSQLState() <<
std::
endl;
}
}
线程参数不支持引用,否则报错
In file included from /usr/include/c++/4.8.2/thread:39:0,from main.cpp:14:
/usr/include/c++/4.8.2/functional: In instantiation of
‘struct std::_Bind_simple<void (*(int, sql::PreparedStatement*, std::vector<TUpdateData>))
(int&, sql::PreparedStatement*, std::vector<TUpdateData>)>’:/usr/include/c++/4.8.2/thread:137:47:
required from ‘std::thread::thread(_Callable&&, _Args&& ...)
[with _Callable = void (&)(int&, sql::PreparedStatement*, std::vector<TUpdateData>);
_Args = {int&, sql::PreparedStatement*&, std::vector<TUpdateData, std::allocator<TUpdateData> >&}]’
main.cpp:209:99: required from here/usr/include/c++/4.8.2/functional:1697:61:
错误:no type named ‘type’
in ‘class std::result_of<void (*(int, sql::PreparedStatement*, std::vector<TUpdateData>))(int&,
sql::PreparedStatement*, std::vector<TUpdateData>)>’
typedef typename result_of<_Callable(_Args...)>::type result_type; ^/usr/include/c++/4.8.2/functional:1727:9: 错误:no type named ‘type’
in ‘class std::result_of<void (*(int, sql::PreparedStatement*, std::vector<TUpdateData>))(int&, sql::PreparedStatement*, std::vector<TUpdateData>)>’ _M_invoke(_Index_tuple<_Indices...>)
这样定义是不对的
void ProcessUpdateData(int& RowLimit, sql::PreparedStatement *prep_stmt, std::vector<TUpdateData> subDataArray)
日后有空研究线程的同步技术...
