MySQL源码上理解下commit_order writeset writeset_session

来源:这里教程网 时间:2026-03-01 16:59:54 作者:

主要是尝试从代码上看下sequence_number,last_committed的关系,看下各自分别怎么实现的。 每个事务有2个逻辑时钟 @c Transaction_ctx::last_committed and @c Transaction_ctx::sequence_number. 并行事务跟踪3种模式

enum enum_binlog_transaction_dependency_tracking
{
DEPENDENCY_TRACKING_COMMIT_ORDER= 0,
DEPENDENCY_TRACKING_WRITESET= 1,
DEPENDENCY_TRACKING_WRITESET_SESSION= 2
};
事务依赖跟踪使用了单例模式
class Transaction_dependency_tracker
{
public:
Transaction_dependency_tracker():
m_opt_tracking_mode(DEPENDENCY_TRACKING_COMMIT_ORDER), m_writeset(25000) {}
void get_dependency(THD *thd, int64 &sequence_number, int64 &commit_parent);
void tracking_mode_changed();
void update_max_committed(THD *thd);
int64 get_max_committed_timestamp();
int64 step();
void rotate();
public:
  /* option opt_binlog_transaction_dependency_tracking */
long m_opt_tracking_mode;
Writeset_trx_dependency_tracker *get_writeset()
{
return &m_writeset;
}
private:
Writeset_trx_dependency_tracker m_writeset;
Commit_order_trx_dependency_tracker m_commit_order;
Writeset_session_trx_dependency_tracker m_writeset_session;
};

commit_order 模式

class Commit_order_trx_dependency_tracker
{
public:
  /**
    Main function that gets the dependencies using the COMMIT_ORDER tracker.
    @param [in]     thd             THD of the caller.
    @param [in,out] sequence_number sequence_number initialized and returned.
    @param [in,out] commit_parent   commit_parent to be returned.
   */
void get_dependency(THD *thd, int64 &sequence_number, int64 &commit_parent);
void update_max_committed(int64 sequence_number);
Logical_clock get_max_committed_transaction()
{
return m_max_committed_transaction;
}
int64 step();
void rotate();
private:
  /* Committed transactions timestamp */
Logical_clock m_max_committed_transaction;
  /* "Prepared" transactions timestamp */
Logical_clock m_transaction_counter;
};

根据tracking_mode设置,分别执行不同的get_dependency方法

void
Transaction_dependency_tracker::get_dependency(THD *thd,
int64 &sequence_number,
int64 &commit_parent)
{
sequence_number= commit_parent= 0;
switch(m_opt_tracking_mode)
{
case DEPENDENCY_TRACKING_COMMIT_ORDER:
m_commit_order.get_dependency(thd, sequence_number, commit_parent);
break;
case DEPENDENCY_TRACKING_WRITESET:
m_commit_order.get_dependency(thd, sequence_number, commit_parent);
m_writeset.get_dependency(thd, sequence_number, commit_parent);
break;
case DEPENDENCY_TRACKING_WRITESET_SESSION:
m_commit_order.get_dependency(thd, sequence_number, commit_parent);
m_writeset.get_dependency(thd, sequence_number, commit_parent);
m_writeset_session.get_dependency(thd, sequence_number, commit_parent);
break;
default:
DBUG_ASSERT(0);  // blow up on debug
      /*
        Fallback to commit order on production builds.
       */
m_commit_order.get_dependency(thd, sequence_number, commit_parent);
}
}

这里可以看到wreteset模式是先用了comit_order, 然后用自己的get_dependency, writeset_session 先用了commit_order,然后用了writeset,然后在用自己的 get_dependency,没有指定,模式就是 commit_order commit_order的计算方式

Commit_order_trx_dependency_tracker::get_dependency(THD *thd,
int64 &sequence_number,
int64 &commit_parent)
{
Transaction_ctx *trn_ctx= thd->get_transaction();
DBUG_ASSERT(trn_ctx->sequence_number
> m_max_committed_transaction.get_offset());
  /*
    Prepare sequence_number and commit_parent relative to the current
    binlog.  This is done by subtracting the binlog's clock offset
    from the values.
    A transaction that commits after the binlog is rotated, can have a
    commit parent in the previous binlog. In this case, subtracting
    the offset from the sequence number results in a negative
    number. The commit parent dependency gets lost in such
    case. Therefore, we log the value SEQ_UNINIT in this case.
  */
sequence_number=
trn_ctx->sequence_number - m_max_committed_transaction.get_offset();
commit_parent=
trn_ctx->last_committed <= m_max_committed_transaction.get_offset()
? SEQ_UNINIT
: trn_ctx->last_committed - m_max_committed_transaction.get_offset();
}

writeset 方式,可以看到使用上有些约束,超过后会清理

void
Writeset_trx_dependency_tracker::get_dependency(THD *thd,
int64 &sequence_number,
int64 &commit_parent)
{
Rpl_transaction_write_set_ctx *write_set_ctx=
thd->get_transaction()->get_transaction_write_set_ctx();
std::set<uint64> *writeset= write_set_ctx->get_write_set();
#ifndef DBUG_OFF
  /* The writeset of an empty transaction must be empty. */
if (is_empty_transaction_in_binlog_cache(thd))
DBUG_ASSERT(writeset->size() == 0);
#endif
  /*
    Check if this transaction has a writeset, if the writeset will overflow the
    history size, if the transaction_write_set_extraction is consistent
    between session and global or if changes in the tables referenced in this
    transaction cascade to other tables. If that happens revert to using the
    COMMIT_ORDER and clear the history to keep data consistent.
  */
bool can_use_writesets=
    // empty writeset implies DDL or similar, except if there are missing keys
(writeset->size() != 0 || write_set_ctx->get_has_missing_keys() ||
     /*
       The empty transactions do not need to clear the writeset history, since
       they can be executed in parallel.
     */
is_empty_transaction_in_binlog_cache(thd)) &&
    // hashing algorithm for the session must be the same as used by other rows in history
(global_system_variables.transaction_write_set_extraction ==
thd->variables.transaction_write_set_extraction) &&
    // must not use foreign keys
!write_set_ctx->get_has_related_foreign_keys();
bool exceeds_capacity= false;
if (can_use_writesets)
{
    /*
     Check if adding this transaction exceeds the capacity of the writeset
     history. If that happens, m_writeset_history will be cleared only after
     using its information for current transaction.
    */
exceeds_capacity=
m_writeset_history.size() + writeset->size() > m_opt_max_history_size;
    /*
     Compute the greatest sequence_number among all conflicts and add the
     transaction's row hashes to the history.
    */
int64 last_parent= m_writeset_history_start;
for (std::set<uint64>::iterator it= writeset->begin();
it != writeset->end(); ++it)
{
Writeset_history::iterator hst= m_writeset_history.find(*it);
if (hst != m_writeset_history.end())
{
if (hst->second > last_parent && hst->second < sequence_number)
last_parent= hst->second;
hst->second= sequence_number;
}
else
{
if (!exceeds_capacity)
m_writeset_history.insert(std::pair<uint64, int64>(*it, sequence_number));
}
}
    /*
      If the transaction references tables with missing primary keys revert to
      COMMIT_ORDER, update and not reset history, as it is unnecessary because
      any transaction that refers this table will also revert to COMMIT_ORDER.
    */
if (!write_set_ctx->get_has_missing_keys())
{
      /*
       The WRITESET commit_parent then becomes the minimum of largest parent
       found using the hashes of the row touched by the transaction and the
       commit parent calculated with COMMIT_ORDER.
      */
commit_parent= std::min(last_parent, commit_parent);
}
}
if (exceeds_capacity || !can_use_writesets)
{
m_writeset_history_start= sequence_number;
m_writeset_history.clear();
}
}

bool MYSQL_BIN_LOG::write_gtid(THD *thd, binlog_cache_data *cache_data,
Binlog_event_writer *writer)
{
DBUG_ENTER("MYSQL_BIN_LOG::write_gtid");
  /*
    The GTID for the THD was assigned at
    assign_automatic_gtids_to_flush_group()
  */
DBUG_ASSERT(thd->owned_gtid.sidno == THD::OWNED_SIDNO_ANONYMOUS ||
thd->owned_gtid.sidno > 0);
int64 sequence_number, last_committed;
  /* Generate logical timestamps for MTS */
m_dependency_tracker.get_dependency(thd, sequence_number, last_committed);
  /*
    In case both the transaction cache and the statement cache are
    non-empty, both will be flushed in sequence and logged as
    different transactions. Then the second transaction must only
    be executed after the first one has committed. Therefore, we
    need to set last_committed for the second transaction equal to
    last_committed for the first transaction. This is done in
    binlog_cache_data::flush. binlog_cache_data::flush uses the
    condition trn_ctx->last_committed==SEQ_UNINIT to detect this
    situation, hence the need to set it here.
  */
thd->get_transaction()->last_committed= SEQ_UNINIT;
  /*
    Generate and write the Gtid_log_event.
  */
Gtid_log_event gtid_event(thd, cache_data->is_trx_cache(),
last_committed, sequence_number,
cache_data->may_have_sbr_stmts());
uchar buf[Gtid_log_event::MAX_EVENT_LENGTH];
uint32 buf_len= gtid_event.write_to_memory(buf);
bool ret= writer->write_full_event(buf, buf_len);
DBUG_RETURN(ret);
}

两个逻辑时钟的说明

/* Binlog-specific logical timestamps. */
  /*
    Store for the transaction's commit parent sequence_number.
    The value specifies this transaction dependency with a "parent"
    transaction.
    The member is assigned, when the transaction is about to commit
    in binlog to a value of the last committed transaction's sequence_number.
    This and last_committed as numbers are kept ever incremented
    regardless of binary logs being rotated or when transaction
    is logged in multiple pieces.
    However the logger to the binary log may convert them
    according to its specification.
  */
int64 last_committed;
  /*
    The transaction's private logical timestamp assigned at the
    transaction prepare phase. The timestamp enumerates transactions
    in the binary log. The value is gained through incrementing (stepping) a
    global clock.
    Eventually the value is considered to increase max_committed_transaction
    system clock when the transaction has committed.
  */
int64 sequence_number;

mts 调度事件处理

Mts_submode_logical_clock::schedule_next_event(Relay_log_info* rli,
Log_event *ev)
{
longlong last_sequence_number= sequence_number;
bool gap_successor= false;
DBUG_ENTER("Mts_submode_logical_clock::schedule_next_event");
  // We should check if the SQL thread was already killed before we schedule
  // the next transaction
if (sql_slave_killed(rli->info_thd, rli))
DBUG_RETURN(0);
Slave_job_group *ptr_group=
rli->gaq->get_job_group(rli->gaq->assigned_group_index);
  /*
    A group id updater must satisfy the following:
    - A query log event ("BEGIN" ) or a GTID EVENT
    - A DDL or an implicit DML commit.
  */
switch (ev->get_type_code())
{
case binary_log::GTID_LOG_EVENT:
case binary_log::ANONYMOUS_GTID_LOG_EVENT:
    // TODO: control continuity
ptr_group->sequence_number= sequence_number=
static_cast<Gtid_log_event*>(ev)->sequence_number;
ptr_group->last_committed= last_committed=
static_cast<Gtid_log_event*>(ev)->last_committed;
break;
default:
sequence_number= last_committed= SEQ_UNINIT;
break;
}
DBUG_PRINT("info", ("sequence_number %lld, last_committed %lld",
sequence_number, last_committed));
if (first_event)
{
first_event= false;
}
else
{
if (unlikely(clock_leq(sequence_number, last_committed) &&
last_committed != SEQ_UNINIT))
{
      /* inconsistent (buggy) timestamps */
sql_print_error("Transaction is tagged with inconsistent logical "
"timestamps: "
"sequence_number (%lld) <= last_committed (%lld)",
sequence_number, last_committed);
DBUG_RETURN(ER_MTS_CANT_PARALLEL);
}
if (unlikely(clock_leq(sequence_number, last_sequence_number) &&
sequence_number != SEQ_UNINIT))
{
      /* inconsistent (buggy) timestamps */
sql_print_error("Transaction's sequence number is inconsistent with that "
"of a preceding one: "
"sequence_number (%lld) <= previous sequence_number (%lld)",
sequence_number, last_sequence_number);
DBUG_RETURN(ER_MTS_CANT_PARALLEL);
}
    /*
      Being scheduled transaction sequence may have gaps, even in
      relay log. In such case a transaction that succeeds a gap will
      wait for all ealier that were scheduled to finish. It's marked
      as gap successor now.
    */
compile_time_assert(SEQ_UNINIT == 0);
if (unlikely(sequence_number > last_sequence_number + 1))
{
DBUG_PRINT("info", ("sequence_number gap found, "
"last_sequence_number %lld, sequence_number %lld",
last_sequence_number, sequence_number));
DBUG_ASSERT(rli->replicate_same_server_id || true /* TODO: account autopositioning */);
gap_successor= true;
}
}
  /*
    The new group flag is practically the same as the force flag
    when up to indicate syncronization with Workers.
  */
is_new_group=
(/* First event after a submode switch; */
first_event ||
     /* Require a fresh group to be started; */
     // todo: turn `force_new_group' into sequence_number == SEQ_UNINIT condition
force_new_group ||
     /* Rewritten event without commit point timestamp (todo: find use case) */
sequence_number == SEQ_UNINIT ||
     /*
       undefined parent (e.g the very first trans from the master),
       or old master.
     */
last_committed == SEQ_UNINIT ||
     /*
       When gap successor depends on a gap before it the scheduler has
       to serialize this transaction execution with previously
       scheduled ones. Below for simplicity it's assumed that such
       gap-dependency is always the case.
     */
gap_successor ||
     /*
       previous group did not have sequence number assigned.
       It's execution must be finished until the current group
       can be assigned.
       Dependency of the current group on the previous
       can't be tracked. So let's wait till the former is over.
     */
last_sequence_number == SEQ_UNINIT);
  /*
    The coordinator waits till all transactions on which the current one
    depends on are applied.
  */
if (!is_new_group) //此处的比较重要,如果不是新组,根据if中的判断,如果last_committed > lwm_estimate会
进入等待,不能并行执行
{
longlong lwm_estimate= estimate_lwm_timestamp();
if (!clock_leq(last_committed, lwm_estimate) &&
rli->gaq->assigned_group_index != rli->gaq->entry)
{
      /*
        "Unlikely" branch.
        The following block improves possibly stale lwm and when the
        waiting condition stays, recompute min_waited_timestamp and go
        waiting.
        At awakening set min_waited_timestamp to commit_parent in the
        subsequent GAQ index (could be NIL).
      */
if (wait_for_last_committed_trx(rli, last_committed, lwm_estimate))
{
        /*
          MTS was waiting for a dependent transaction to finish but either it
          has failed or the applier was requested to stop. In any case, this
          transaction wasn't started yet and should not warn about the
          coordinator stopping in a middle of a transaction to avoid polluting
          the server error log.
        */
rli->reported_unsafe_warning= true;
DBUG_RETURN(-1);
}
      /*
        Making the slave's max last committed (lwm) to satisfy this
        transaction's scheduling condition.
      */
if (gap_successor)
last_lwm_timestamp= sequence_number - 1;
DBUG_ASSERT(!clock_leq(sequence_number, estimate_lwm_timestamp()));
}
delegated_jobs++;
DBUG_ASSERT(!force_new_group);
}
else
{
DBUG_ASSERT(delegated_jobs >= jobs_done);
DBUG_ASSERT(is_error || (rli->gaq->len + jobs_done == 1 + delegated_jobs));
DBUG_ASSERT(rli->mts_group_status == Relay_log_info::MTS_IN_GROUP);
    /*
      Under the new group fall the following use cases:
      - events from an OLD (sequence_number unaware) master;
      - malformed (missed BEGIN or GTID_NEXT) group incl. its
        particular form of CREATE..SELECT..from..@user_var (or rand- and
        int- var in place of @user- var).
        The malformed group is handled exceptionally each event is executed
        as a solitary group yet by the same (zero id) worker.
    */
if (-1 == wait_for_workers_to_finish(rli))
DBUG_RETURN (ER_MTS_INCONSISTENT_DATA);
rli->mts_group_status= Relay_log_info::MTS_IN_GROUP; //wait set it to NOT
DBUG_ASSERT(min_waited_timestamp == SEQ_UNINIT);
    /*
      the instant last lwm timestamp must reset when force flag is up.
    */
rli->gaq->lwm.sequence_number= last_lwm_timestamp= SEQ_UNINIT;
delegated_jobs= 1;
jobs_done= 0;
force_new_group= false;
    /*
      Not sequenced event can be followed with a logically relating
      e.g User var to be followed by CREATE table.
      It's supported to be executed in one-by-one fashion.
      Todo: remove with the event group parser worklog.
    */
if (sequence_number == SEQ_UNINIT && last_committed == SEQ_UNINIT)
rli->last_assigned_worker= *rli->workers.begin();
}
#ifndef DBUG_OFF
mysql_mutex_lock(&rli->mts_gaq_LOCK);
DBUG_ASSERT(is_error || (rli->gaq->len + jobs_done == delegated_jobs));
mysql_mutex_unlock(&rli->mts_gaq_LOCK);
#endif
DBUG_RETURN(0);
}

在等待last_committed中,只要是last_commit大于lwm,就一直等待 lwm 是 last_lwm_timestamp  last_lwm_timestamp= sequence_number - 1;

do
{
mysql_cond_wait(&rli->logical_clock_cond, &rli->mts_gaq_LOCK);
}
while ((!rli->info_thd->killed && !is_error) &&
!clock_leq(last_committed_arg, estimate_lwm_timestamp()));
my_atomic_store64(&min_waited_timestamp, SEQ_UNINIT);  // reset waiting flag
mysql_mutex_unlock(&rli->mts_gaq_LOCK);
thd->EXIT_COND(&old_stage);
set_timespec_nsec(&ts[1], 0);
my_atomic_add64(&rli->mts_total_wait_overlap, diff_timespec(&ts[1], &ts[0]));
}

相关推荐