mysql的thread_running数量分析


本篇内容主要讲解“mysql的thread_running数量分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“mysql的thread_running数量分析”吧!thread pool的原理:已在server层完成解析;层创建多组常驻线程,用于接收客户端连接发送的query并代为执行,而不是为每个连接单独创建一个线程。层进行running thread数量判断,如果达到阈值则直接报错或sleep状态变量记录了当前并发执行stmt/command的数量,执行前加1执行后减1突然飙高的诱因:客户端连接暴增;系统性能瓶颈,如CPU,IO或者mem swap异常sql会表现出hang住的假象。执行,为此引入两个阈值low_watermarkhigh_watermark,以及变量threads_running_ctl_mode(selects或者all )前,检查thread_running若其已达high_watermark阈值则直接拒绝执行并返回错误:mysql
server is too busy若其位于lowhigh之间,则sleep 5ms,然后继续尝试,累计等待100ms后则执行对于已经开启事务和super用户,不做限制控制query类型:SELECTS/ALL,默认为SELECTS,表示只影响SELECT语句源码见注1优化为基于FIFOcond-wait/signal(实现8FIFO)高水位限流(这点保持不变)低水位优化;其他解决方案:mariadb开发thread poolpercona在其上实现了优先队列;优势:思路与thread pool一致,但代码更简洁(不到1000);而且增加了特定query的过滤;代码见注2

新增thread_active记录并发线程数,位于mysql_execute_command(sql解析之后),高水位则在query解析之前判断只统计select/DML,而commit/rollback则放过。采用FIFO,当thread_active >=
thread_running_low_watermark时进程进入FIFO等待,其他线程执行完sql后唤醒FIFO内,同时引入threads_running_wait_timeout控制线程在FIFO最大等待时间,超时则直接报错返回。引入8FIFO,降低了进出FIFO的锁竞争,线程采用RR分配到不同fifo,每个队列限制并发运行线程为threads_running_low_watermark/8,开始执行query[解析后进行低水位判断,若通过则执行],执行当前sql完毕后,thread可能发起新query,则重复[]过程。:进入FIFO排队最长时间,等待超时后sql被拒,默认100,单位为毫秒ms当前并发SELECT/INSERT/UPDATE/DELETE执行的线程数目;:当前进入到FIFO中等待的线程数目;未打补丁版本,设置innodb_thread_concurrency=0未打补丁版本,innodb_thread_concurrency=32低水位限流补丁版本(活跃线程数不超过641http://www.gpfeng.com/wp-content/uploads/2013/09/threads_running_control.txt
+static my_bool thread_running_control(THD *thd, ulong tr)
+{
+ int slept_cnt= 0;
+ ulong tr_low, tr_high;
+ DBUG_ENTER(“thread_running_control”);
+
+ /*
+ Super user/slave thread will not be affected at any time,
+ transactions that have already started will continue.
+ */
+ if ( thd->security_ctx->master_access & SUPER_ACL|| –对于super权限的用户和已经开启的事务不做限制
+ thd->in_active_multi_stmt_transaction() ||
+ thd->slave_thread)
+ DBUG_RETURN(FALSE);
+
+ /*
+ To promise that tr_low will never be greater than tr_high,
+ as values may be changed between these two statements.
+ eg.
+ (low, high) = (200, 500)
+ 1. read low = 200
+ 2. other sessions: set low = 20; set high = 80
+ 3. read high = 80
+ Don’t take a lock here to avoid lock contention.
+ */
+ do
+ {
+ tr_low= thread_running_low_watermark;
+ tr_high= thread_running_high_watermark;
+
+ } while (tr_low > tr_high);
+
+check_buzy:
+
+ /* tr_high is promised to be non-zero.*/
+ if ((tr_low == 0 && tr + DBUG_RETURN(FALSE);
+
+ if (tr >= tr_high)
+ {
+ int can_reject= 1;
+
+ /* thread_running_ctl_mode: 0 -> SELECTS, 1 -> ALL. */
+ if (thread_running_ctl_mode == 0)
+ {
+ int query_is_select= 0;
+ if (thd->query_length() >= 8)
+ {
+ char *p= thd->query(); –读取query text的前6个字符,以判断是否为select
+ if (my_toupper(system_charset_info, p[0]) == ‘S’ &&
+ my_toupper(system_charset_info, p[1]) == ‘E’ &&
+ my_toupper(system_charset_info, p[2]) == ‘L’ &&
+ my_toupper(system_charset_info, p[3]) == ‘E’ &&
+ my_toupper(system_charset_info, p[4]) == ‘C’ &&
+ my_toupper(system_charset_info, p[5]) == ‘T’)
+
+ query_is_select= 1;
+ }
+
+ if (!query_is_select)
+ can_reject= 0;
+ }
+
+ if (can_reject)
+ {
+ inc_thread_rejected();
+ DBUG_RETURN(TRUE);
+ }
+ else
+ DBUG_RETURN(FALSE);
+ }
+
+ if (tr_low != 0 && tr >= tr_low)
+ {
+ /*
+ If total slept time exceed 100ms and thread running does not
+ reach high watermark, let it in.
+ */
+ if (slept_cnt >= 20)
+ DBUG_RETURN(FALSE);
+
+ dec_thread_running()
+
+ /* wait for 5ms. */
+ my_sleep(5000UL);
+
+ slept_cnt++;
+ tr= inc_thread_running() – 1;
+
+ goto check_buzy;
+ }
+
+ DBUG_RETURN(FALSE);
+}
+
+/**
Perform one connection-level (COM_XXXX) command.
@param command type of command to perform
@@ -1016,7 +1126,8 @@
thd->set_query_id(get_query_id());
if (!(server_command_flags[command] & CF_SKIP_QUERY_ID))
next_query_id();
– inc_thread_running();
+ /* remember old value of thread_running for *thread_running_control*. */
+ int32 tr= inc_thread_running() – 1;
if (!(server_command_flags[command] & CF_SKIP_QUESTIONS))
statistic_increment(t开发云主机域名hd->status_var.questions, &LOCK_status);

@@ -1129,6 +1240,13 @@
{
if (alloc_query(thd, packet, packet_length))
break; // fatal error is set
+
+ if (thread_running_control(thd, (ulong)tr))
+ {
+ my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, MYF(0));
+ break;
+ }
+
MYSQL_QUERY_START(thd->query(), thd->thread_id, (char *) (thd->db ? thd->db : “”), &thd->security_ctx->priv_user[0])

注2
http://www.gpfeng.com/wp-content/uploads/2014/01/tr-control.diff_.txt
+/**
Perform one connection-level (COM_XXXX) command.

@param command type of command to perform
@@ -1177,7 +1401,7 @@
command= COM_SHUTDOWN;
}
thd->set_query_id(next_query_id());
– inc_thread_running();
+ int32 tr= inc_thread_running();

if (!(server_command_flags[command] & CF_SKIP_QUESTIONS))
statistic_increment(thd->status_var.questions, &LOCK_status);
@@ -1209,6 +1433,15 @@
goto done;
}

+ if (command == COM_QUERY && alloc_query(thd, packet, packet_length))
+ goto endof_case; // fatal error is set
+
+ if (thread_running_control_high(thd, tr))
+ {
+ my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, MYF(0));
+ goto endof_case;
+ }
+
switch (command) {
case COM_INIT_DB:
{
@@ -1311,8 +1544,6 @@
}
case COM_QUERY:
{
– if (alloc_query(thd, packet, packet_length))
– break; // fatal error is set
MYSQL_QUERY_START(thd->query(), thd->thread_id,
(char *) (thd->db ? thd->db : “”),
&thd->security_ctx->priv_user[0],
@@ -1751,6 +1982,7 @@
my_message(ER_UNKNOWN_COM_ERROR, ER(ER_UNKNOWN_COM_ERROR), MYF(0));
break;
}
+endof_case:

done:
DBUG_ASSERT(thd->derived_tables == NULL &&
@@ -2502,12 +2734,37 @@
Opt_trace_array trace_command_steps(&thd->opt_trace, “steps”);

DBUG_ASSERT(thd->transaction.stmt.cannot_safely_rollback() == FALSE);
+ bool count_active= false;

if (need_traffic_control(thd, lex->sql_command))
{
thd->killed = THD::KILL_QUERY;
goto error;
}
+
+ switch (lex->sql_command) {
+
+ case SQLCOM_SELECT:
+ case SQLCOM_UPDATE:
+ case SQLCOM_UPDATE_MULTI:
+ case SQLCOM_DELETE:
+ case SQLCOM_DELETE_MULTI:
+ case SQLCOM_INSERT:
+ case SQLCOM_INSERT_SELECT:
+ case SQLCOM_REPLACE:
+ case SQLCOM_REPLACE_SELECT:
+ count_active= true;
+ break;
+ default:
+ break;
+ }
+
+ if (count_active && thread_running_control_low_enter(thd))
+ {
+ my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, myf(0));
+ goto error;
+ }
+
status_var_increment(thd->status_var.com_stat[lex->sql_command]);

switch (gtid_pre_statement_checks(thd))
@@ -4990,6 +5247,9 @@

finish:

+ if (count_active)
+ thread_running_control_low_exit(thd);
+
DBUG_ASSERT(!thd->in_active_multi_stmt_transaction() ||
thd->in_multi_stmt_transaction_mode());

+static my_bool thread_running_control_high(THD *thd, int32 tr)
+{
+ int32 tr_high;
+ DBUG_ENTER(“thread_running_control_high”);
+
+ tr_high= (int32)thread_running_high_watermark;
+
+ /* thread_running_ctl_mode: 0 -> SELECTS, 1 -> ALL. */
+ if ((!tr_high || tr + thd->transaction.is_active() ||
+ thd->get_command() != COM_QUERY ||
+ thd->security_ctx->master_access & SUPER_ACL ||
+ thd->slave_thread)
+ DBUG_RETURN(FALSE);
+
+ const char *query= thd->query();
+ uint32 len= thd->query_length();
+
+ if ((!has_prefix(query, len, “SELECT”, 6) && thread_running_ctl_mode == 0) || –不再是逐个字符判断
+ has_prefix(query, len, “COMMIT”, 6) ||
+ has_prefix(query, len, “ROLLBACK”, 8))
+ DBUG_RETURN(FALSE);
+
+ /* confirm again*/
+ if (tr > tr_high && get_thread_running() > tr_high)
+ {
+ __sync_add_and_fetch(&thread_rejected, 1);
+ DBUG_RETURN(TRUE);
+ }
+
+ DBUG_RETURN(FALSE);
+}
+

+static my_bool thread_running_control_low_enter(THD *thd)
+{
+ int res= 0;
+ int32 tr_low;
+ my_bool ret= FALSE;
+ my_bool slept= FALSE;
+ struct timespec timeout;
+ Thread_conc_queue *queue;
+ DBUG_ENTER(“thread_running_control_low_enter”);
+
+ /* update global status */
+ __sync_add_and_fetch(&thread_active, 1);
+
+ tr_low= (int32)queue_tr_low_watermark;
+ queue= thread_conc_queues + thd->query_id % N_THREAD_CONC_QUEUE;
+
+ queue->lock();–问1:在进行低水位判断前,先锁定FIFO,避免低水位验证失败时无法获取FIFO锁进而不能放入FIFO;
+
+retry:
+
+ if ((!tr_low || queue->thread_active + (thd->lex->sql_command != SQLCOM_SELECT && thread_running_ctl_mode == 0) ||
+ (!slept && (thd->transaction.is_active() ||
+ thd->security_ctx->master_access & SUPER_ACL || thd->slave_thread)))
+ {
+ queue->thread_active++; –判断是否满足进入FIFO条件,如不满足则立即更新thread_active++,解锁queue并退出;
+ queue->unlock();
+ DBUG_RETURN(ret);
+ }
+
+ if (!slept)
+ {
+ queue->unlock();
+
+ /* sleep for 500 us */
+ my_sleep(500);
+ slept= TRUE;
+ queue->lock();
+
+ goto retry;
+ }
+
+ /* get a free wait-slot */
+ Thread_wait_slot *slot= queue->pop_free();
+
+ /* can’t find a free wait slot, must let the query enter */
+ if (!slot)– 当FIFO都满了,即无法把当前线程放入,则必须放行让该sql正常执行
+ {
+ queue->thread_active++;
+ queue->unlock();
+ DBUG_RETURN(ret);
+ }
+
+ slot->signaled= false;
+ slot->wait_ended= false;
+
+ /* put slot into waiting queue. */
+ queue->push_back_wait(slot);
+ queue->thread_wait++;
+
+ queue->unlock();
+
+ /* update global status */
+ thd_proc_info(thd, “waiting in server fifo”);
+ __sync_sub_and_fetch(&thread_active, 1);
+ __sync_add_and_fetch(&thread_wait, 1);
+
+ /* cond-wait for at most thread_running_wait_timeout(ms). */
+ set_timespec_nsec(timeout, thread_running_wait_timeout_ns);
+
+ mysql_mutex_lock(&slot->mutex);
+ while (!slot->signaled)
+ {
+ res= mysql_cond_timedwait(&slot->cond, &slot->mutex, &timeout);
+ /* no need to signal if cond-wait timedout */
+ slot->signaled= true;
+ }
+ mysql_mutex_unlock(&slot->mutex);
+
+ queue->lock();
+ queue->thread_wait–;
+ queue->thread_active++;
+
+ /* remove slot from waiting queue. */
+ queue->remove_wait(slot);
+ /* put slot into the free queue for reuse. */
+ queue->push_back_free(slot);
+
+ queue->unlock();
+
+ /* update global status */
+ __sync_sub_and_fetch(&thread_wait, 1);
+ __sync_add_and_fetch(&thread_active, 1);
+ thd_proc_info(thd, 0);
+
+ if (res == ETIMEDOUT || res == ETIME)
+ {
+ ret= TRUE; // indicate that query is rejected.
+ __sync_add_and_fetch(&thread_rejected, 1);
+ }
+
+ DBUG_RETURN(ret);
+}
到此,相信大家对“mysql的thread_running数量分析”有了更深的了解,不妨来实际操作一番吧!这里是开发云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

相关推荐: MySQL中InnoDB MRR优化的示例分析

这篇文章将为大家详细讲解有关MySQL中InnoDB MRR优化的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。前言MRR 是 Multi-Range Read 的简写,目的是减少磁盘随机访问,将随机访问转化为较为顺…

免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。

(0)
打赏 微信扫一扫 微信扫一扫
上一篇 06/28 08:37
下一篇 06/28 08:37

相关推荐