主要是尝试从代码上看下sequence_number,last_committed的关系,看下各自分别怎么实现的。 每个事务有2个逻辑时钟 @c Transaction_ctx::last_committed and @c Transaction_ctx::sequence_number. 并行事务跟踪3种模式
enum enum_binlog_transaction_dependency_tracking
{
DEPENDENCY_TRACKING_COMMIT_ORDER= 0,
DEPENDENCY_TRACKING_WRITESET= 1,
DEPENDENCY_TRACKING_WRITESET_SESSION= 2
};
事务依赖跟踪使用了单例模式
class Transaction_dependency_tracker
{
public:
Transaction_dependency_tracker():
m_opt_tracking_mode(DEPENDENCY_TRACKING_COMMIT_ORDER), m_writeset(25000) {}
void get_dependency(THD *thd, int64 &sequence_number, int64 &commit_parent);
void tracking_mode_changed();
void update_max_committed(THD *thd);
int64 get_max_committed_timestamp();
int64 step();
void rotate();
public:
/* option opt_binlog_transaction_dependency_tracking */
long m_opt_tracking_mode;
Writeset_trx_dependency_tracker *get_writeset()
{
return &m_writeset;
}
private:
Writeset_trx_dependency_tracker m_writeset;
Commit_order_trx_dependency_tracker m_commit_order;
Writeset_session_trx_dependency_tracker m_writeset_session;
};
commit_order 模式
class Commit_order_trx_dependency_tracker
{
public:
/**
Main function that gets the dependencies using the COMMIT_ORDER tracker.
@param [in] thd THD of the caller.
@param [in,out] sequence_number sequence_number initialized and returned.
@param [in,out] commit_parent commit_parent to be returned.
*/
void get_dependency(THD *thd, int64 &sequence_number, int64 &commit_parent);
void update_max_committed(int64 sequence_number);
Logical_clock get_max_committed_transaction()
{
return m_max_committed_transaction;
}
int64 step();
void rotate();
private:
/* Committed transactions timestamp */
Logical_clock m_max_committed_transaction;
/* "Prepared" transactions timestamp */
Logical_clock m_transaction_counter;
};
根据tracking_mode设置,分别执行不同的get_dependency方法
void
Transaction_dependency_tracker::get_dependency(THD *thd,
int64 &sequence_number,
int64 &commit_parent)
{
sequence_number= commit_parent= 0;
switch(m_opt_tracking_mode)
{
case DEPENDENCY_TRACKING_COMMIT_ORDER:
m_commit_order.get_dependency(thd, sequence_number, commit_parent);
break;
case DEPENDENCY_TRACKING_WRITESET:
m_commit_order.get_dependency(thd, sequence_number, commit_parent);
m_writeset.get_dependency(thd, sequence_number, commit_parent);
break;
case DEPENDENCY_TRACKING_WRITESET_SESSION:
m_commit_order.get_dependency(thd, sequence_number, commit_parent);
m_writeset.get_dependency(thd, sequence_number, commit_parent);
m_writeset_session.get_dependency(thd, sequence_number, commit_parent);
break;
default:
DBUG_ASSERT(0); // blow up on debug
/*
Fallback to commit order on production builds.
*/
m_commit_order.get_dependency(thd, sequence_number, commit_parent);
}
}
这里可以看到wreteset模式是先用了comit_order, 然后用自己的get_dependency, writeset_session 先用了commit_order,然后用了writeset,然后在用自己的 get_dependency,没有指定,模式就是 commit_order commit_order的计算方式
Commit_order_trx_dependency_tracker::get_dependency(THD *thd,
int64 &sequence_number,
int64 &commit_parent)
{
Transaction_ctx *trn_ctx= thd->get_transaction();
DBUG_ASSERT(trn_ctx->sequence_number
> m_max_committed_transaction.get_offset());
/*
Prepare sequence_number and commit_parent relative to the current
binlog. This is done by subtracting the binlog's clock offset
from the values.
A transaction that commits after the binlog is rotated, can have a
commit parent in the previous binlog. In this case, subtracting
the offset from the sequence number results in a negative
number. The commit parent dependency gets lost in such
case. Therefore, we log the value SEQ_UNINIT in this case.
*/
sequence_number=
trn_ctx->sequence_number - m_max_committed_transaction.get_offset();
commit_parent=
trn_ctx->last_committed <= m_max_committed_transaction.get_offset()
? SEQ_UNINIT
: trn_ctx->last_committed - m_max_committed_transaction.get_offset();
}
writeset 方式,可以看到使用上有些约束,超过后会清理
void
Writeset_trx_dependency_tracker::get_dependency(THD *thd,
int64 &sequence_number,
int64 &commit_parent)
{
Rpl_transaction_write_set_ctx *write_set_ctx=
thd->get_transaction()->get_transaction_write_set_ctx();
std::set<uint64> *writeset= write_set_ctx->get_write_set();
#ifndef DBUG_OFF
/* The writeset of an empty transaction must be empty. */
if (is_empty_transaction_in_binlog_cache(thd))
DBUG_ASSERT(writeset->size() == 0);
#endif
/*
Check if this transaction has a writeset, if the writeset will overflow the
history size, if the transaction_write_set_extraction is consistent
between session and global or if changes in the tables referenced in this
transaction cascade to other tables. If that happens revert to using the
COMMIT_ORDER and clear the history to keep data consistent.
*/
bool can_use_writesets=
// empty writeset implies DDL or similar, except if there are missing keys
(writeset->size() != 0 || write_set_ctx->get_has_missing_keys() ||
/*
The empty transactions do not need to clear the writeset history, since
they can be executed in parallel.
*/
is_empty_transaction_in_binlog_cache(thd)) &&
// hashing algorithm for the session must be the same as used by other rows in history
(global_system_variables.transaction_write_set_extraction ==
thd->variables.transaction_write_set_extraction) &&
// must not use foreign keys
!write_set_ctx->get_has_related_foreign_keys();
bool exceeds_capacity= false;
if (can_use_writesets)
{
/*
Check if adding this transaction exceeds the capacity of the writeset
history. If that happens, m_writeset_history will be cleared only after
using its information for current transaction.
*/
exceeds_capacity=
m_writeset_history.size() + writeset->size() > m_opt_max_history_size;
/*
Compute the greatest sequence_number among all conflicts and add the
transaction's row hashes to the history.
*/
int64 last_parent= m_writeset_history_start;
for (std::set<uint64>::iterator it= writeset->begin();
it != writeset->end(); ++it)
{
Writeset_history::iterator hst= m_writeset_history.find(*it);
if (hst != m_writeset_history.end())
{
if (hst->second > last_parent && hst->second < sequence_number)
last_parent= hst->second;
hst->second= sequence_number;
}
else
{
if (!exceeds_capacity)
m_writeset_history.insert(std::pair<uint64, int64>(*it, sequence_number));
}
}
/*
If the transaction references tables with missing primary keys revert to
COMMIT_ORDER, update and not reset history, as it is unnecessary because
any transaction that refers this table will also revert to COMMIT_ORDER.
*/
if (!write_set_ctx->get_has_missing_keys())
{
/*
The WRITESET commit_parent then becomes the minimum of largest parent
found using the hashes of the row touched by the transaction and the
commit parent calculated with COMMIT_ORDER.
*/
commit_parent= std::min(last_parent, commit_parent);
}
}
if (exceeds_capacity || !can_use_writesets)
{
m_writeset_history_start= sequence_number;
m_writeset_history.clear();
}
}
bool MYSQL_BIN_LOG::write_gtid(THD *thd, binlog_cache_data *cache_data,
Binlog_event_writer *writer)
{
DBUG_ENTER("MYSQL_BIN_LOG::write_gtid");
/*
The GTID for the THD was assigned at
assign_automatic_gtids_to_flush_group()
*/
DBUG_ASSERT(thd->owned_gtid.sidno == THD::OWNED_SIDNO_ANONYMOUS ||
thd->owned_gtid.sidno > 0);
int64 sequence_number, last_committed;
/* Generate logical timestamps for MTS */
m_dependency_tracker.get_dependency(thd, sequence_number, last_committed);
/*
In case both the transaction cache and the statement cache are
non-empty, both will be flushed in sequence and logged as
different transactions. Then the second transaction must only
be executed after the first one has committed. Therefore, we
need to set last_committed for the second transaction equal to
last_committed for the first transaction. This is done in
binlog_cache_data::flush. binlog_cache_data::flush uses the
condition trn_ctx->last_committed==SEQ_UNINIT to detect this
situation, hence the need to set it here.
*/
thd->get_transaction()->last_committed= SEQ_UNINIT;
/*
Generate and write the Gtid_log_event.
*/
Gtid_log_event gtid_event(thd, cache_data->is_trx_cache(),
last_committed, sequence_number,
cache_data->may_have_sbr_stmts());
uchar buf[Gtid_log_event::MAX_EVENT_LENGTH];
uint32 buf_len= gtid_event.write_to_memory(buf);
bool ret= writer->write_full_event(buf, buf_len);
DBUG_RETURN(ret);
}
两个逻辑时钟的说明
/* Binlog-specific logical timestamps. */ /* Store for the transaction's commit parent sequence_number. The value specifies this transaction dependency with a "parent" transaction. The member is assigned, when the transaction is about to commit in binlog to a value of the last committed transaction's sequence_number. This and last_committed as numbers are kept ever incremented regardless of binary logs being rotated or when transaction is logged in multiple pieces. However the logger to the binary log may convert them according to its specification. */ int64 last_committed; /* The transaction's private logical timestamp assigned at the transaction prepare phase. The timestamp enumerates transactions in the binary log. The value is gained through incrementing (stepping) a global clock. Eventually the value is considered to increase max_committed_transaction system clock when the transaction has committed. */ int64 sequence_number;
mts 调度事件处理
Mts_submode_logical_clock::schedule_next_event(Relay_log_info* rli,
Log_event *ev)
{
longlong last_sequence_number= sequence_number;
bool gap_successor= false;
DBUG_ENTER("Mts_submode_logical_clock::schedule_next_event");
// We should check if the SQL thread was already killed before we schedule
// the next transaction
if (sql_slave_killed(rli->info_thd, rli))
DBUG_RETURN(0);
Slave_job_group *ptr_group=
rli->gaq->get_job_group(rli->gaq->assigned_group_index);
/*
A group id updater must satisfy the following:
- A query log event ("BEGIN" ) or a GTID EVENT
- A DDL or an implicit DML commit.
*/
switch (ev->get_type_code())
{
case binary_log::GTID_LOG_EVENT:
case binary_log::ANONYMOUS_GTID_LOG_EVENT:
// TODO: control continuity
ptr_group->sequence_number= sequence_number=
static_cast<Gtid_log_event*>(ev)->sequence_number;
ptr_group->last_committed= last_committed=
static_cast<Gtid_log_event*>(ev)->last_committed;
break;
default:
sequence_number= last_committed= SEQ_UNINIT;
break;
}
DBUG_PRINT("info", ("sequence_number %lld, last_committed %lld",
sequence_number, last_committed));
if (first_event)
{
first_event= false;
}
else
{
if (unlikely(clock_leq(sequence_number, last_committed) &&
last_committed != SEQ_UNINIT))
{
/* inconsistent (buggy) timestamps */
sql_print_error("Transaction is tagged with inconsistent logical "
"timestamps: "
"sequence_number (%lld) <= last_committed (%lld)",
sequence_number, last_committed);
DBUG_RETURN(ER_MTS_CANT_PARALLEL);
}
if (unlikely(clock_leq(sequence_number, last_sequence_number) &&
sequence_number != SEQ_UNINIT))
{
/* inconsistent (buggy) timestamps */
sql_print_error("Transaction's sequence number is inconsistent with that "
"of a preceding one: "
"sequence_number (%lld) <= previous sequence_number (%lld)",
sequence_number, last_sequence_number);
DBUG_RETURN(ER_MTS_CANT_PARALLEL);
}
/*
Being scheduled transaction sequence may have gaps, even in
relay log. In such case a transaction that succeeds a gap will
wait for all ealier that were scheduled to finish. It's marked
as gap successor now.
*/
compile_time_assert(SEQ_UNINIT == 0);
if (unlikely(sequence_number > last_sequence_number + 1))
{
DBUG_PRINT("info", ("sequence_number gap found, "
"last_sequence_number %lld, sequence_number %lld",
last_sequence_number, sequence_number));
DBUG_ASSERT(rli->replicate_same_server_id || true /* TODO: account autopositioning */);
gap_successor= true;
}
}
/*
The new group flag is practically the same as the force flag
when up to indicate syncronization with Workers.
*/
is_new_group=
(/* First event after a submode switch; */
first_event ||
/* Require a fresh group to be started; */
// todo: turn `force_new_group' into sequence_number == SEQ_UNINIT condition
force_new_group ||
/* Rewritten event without commit point timestamp (todo: find use case) */
sequence_number == SEQ_UNINIT ||
/*
undefined parent (e.g the very first trans from the master),
or old master.
*/
last_committed == SEQ_UNINIT ||
/*
When gap successor depends on a gap before it the scheduler has
to serialize this transaction execution with previously
scheduled ones. Below for simplicity it's assumed that such
gap-dependency is always the case.
*/
gap_successor ||
/*
previous group did not have sequence number assigned.
It's execution must be finished until the current group
can be assigned.
Dependency of the current group on the previous
can't be tracked. So let's wait till the former is over.
*/
last_sequence_number == SEQ_UNINIT);
/*
The coordinator waits till all transactions on which the current one
depends on are applied.
*/
if (!is_new_group) //此处的比较重要,如果不是新组,根据if中的判断,如果last_committed > lwm_estimate会
进入等待,不能并行执行
{
longlong lwm_estimate= estimate_lwm_timestamp();
if (!clock_leq(last_committed, lwm_estimate) &&
rli->gaq->assigned_group_index != rli->gaq->entry)
{
/*
"Unlikely" branch.
The following block improves possibly stale lwm and when the
waiting condition stays, recompute min_waited_timestamp and go
waiting.
At awakening set min_waited_timestamp to commit_parent in the
subsequent GAQ index (could be NIL).
*/
if (wait_for_last_committed_trx(rli, last_committed, lwm_estimate))
{
/*
MTS was waiting for a dependent transaction to finish but either it
has failed or the applier was requested to stop. In any case, this
transaction wasn't started yet and should not warn about the
coordinator stopping in a middle of a transaction to avoid polluting
the server error log.
*/
rli->reported_unsafe_warning= true;
DBUG_RETURN(-1);
}
/*
Making the slave's max last committed (lwm) to satisfy this
transaction's scheduling condition.
*/
if (gap_successor)
last_lwm_timestamp= sequence_number - 1;
DBUG_ASSERT(!clock_leq(sequence_number, estimate_lwm_timestamp()));
}
delegated_jobs++;
DBUG_ASSERT(!force_new_group);
}
else
{
DBUG_ASSERT(delegated_jobs >= jobs_done);
DBUG_ASSERT(is_error || (rli->gaq->len + jobs_done == 1 + delegated_jobs));
DBUG_ASSERT(rli->mts_group_status == Relay_log_info::MTS_IN_GROUP);
/*
Under the new group fall the following use cases:
- events from an OLD (sequence_number unaware) master;
- malformed (missed BEGIN or GTID_NEXT) group incl. its
particular form of CREATE..SELECT..from..@user_var (or rand- and
int- var in place of @user- var).
The malformed group is handled exceptionally each event is executed
as a solitary group yet by the same (zero id) worker.
*/
if (-1 == wait_for_workers_to_finish(rli))
DBUG_RETURN (ER_MTS_INCONSISTENT_DATA);
rli->mts_group_status= Relay_log_info::MTS_IN_GROUP; //wait set it to NOT
DBUG_ASSERT(min_waited_timestamp == SEQ_UNINIT);
/*
the instant last lwm timestamp must reset when force flag is up.
*/
rli->gaq->lwm.sequence_number= last_lwm_timestamp= SEQ_UNINIT;
delegated_jobs= 1;
jobs_done= 0;
force_new_group= false;
/*
Not sequenced event can be followed with a logically relating
e.g User var to be followed by CREATE table.
It's supported to be executed in one-by-one fashion.
Todo: remove with the event group parser worklog.
*/
if (sequence_number == SEQ_UNINIT && last_committed == SEQ_UNINIT)
rli->last_assigned_worker= *rli->workers.begin();
}
#ifndef DBUG_OFF
mysql_mutex_lock(&rli->mts_gaq_LOCK);
DBUG_ASSERT(is_error || (rli->gaq->len + jobs_done == delegated_jobs));
mysql_mutex_unlock(&rli->mts_gaq_LOCK);
#endif
DBUG_RETURN(0);
}
在等待last_committed中,只要是last_commit大于lwm,就一直等待 lwm 是 last_lwm_timestamp last_lwm_timestamp= sequence_number - 1;
do
{
mysql_cond_wait(&rli->logical_clock_cond, &rli->mts_gaq_LOCK);
}
while ((!rli->info_thd->killed && !is_error) &&
!clock_leq(last_committed_arg, estimate_lwm_timestamp()));
my_atomic_store64(&min_waited_timestamp, SEQ_UNINIT); // reset waiting flag
mysql_mutex_unlock(&rli->mts_gaq_LOCK);
thd->EXIT_COND(&old_stage);
set_timespec_nsec(&ts[1], 0);
my_atomic_add64(&rli->mts_total_wait_overlap, diff_timespec(&ts[1], &ts[0]));
}
