MySQL 半同步 与Raft对比

来源:这里教程网 时间:2026-03-01 16:01:47 作者:

MySQL的after_sync半同步与raft 保证一致性的方式有些类似。 after_sync是master在sync binlog后等待从库把事务写到relay log后的ack,拿到ack后,在commit,然后在返回给客户端提交成功的信息。 raft中的日志有commit和applied 两个列表,commited 代表日志写入了文件,applied代表日志应用到了状态机。 raft也是leader先写日志,然后在等待大部分的节点信息都写入了日志,提交了,然后leader在提交,提交日志后,leader在应用到状态机,然后在返回给客户端提交成功的信息, 给其他节点提交信息,其他节点应用日志到状态机,其他节点网络慢的情况下,leader会不停重试传输。 针对leader1宕机的几种状态下的故障。参考

https://www.cnblogs.com/mindwind/p/5231986.html


场景3中,新主会提交之前的日志,客户端在重试,不是重复执行了吗?状态机执行了2次命令。 这个状态机不会重复执行,因为之前的leader已经挂了,还没有apply log,所以不会发送apply 的请求出去。如果leader apply log,返回给客户端确认,但是follower没有收到apply的信号,leader就挂了,虽然新主上有commit的日志,但是不会apply,怎么办?这个是会应用的,新的leader,在接受用户请求之前会执行

r. dispatchLogs([]* logFuture{ noop})

然后会执行

func (r *Raft) processLogs(index uint64, future *logFuture) {
// Reject logs we've applied already
lastApplied := r.getLastApplied()
if index <= lastApplied {
r.logger.Printf("[WARN] raft: Skipping application of old log: %d", index)
return
}
// Apply all the preceding logs
for idx := r.getLastApplied() + 1; idx <= index; idx++ {
// Get the log, either from the future or from our log store
if future != nil && future.log.Index == idx {
r.processLog(&future.log, future, false)
} else {
l := new(Log)
if err := r.logs.GetLog(idx, l); err != nil {
r.logger.Printf("[ERR] raft: Failed to get log at %d: %v", idx, err)
panic(err)
}
r.processLog(l, nil, false)
}
// Update the lastApplied index and term
r.setLastApplied(idx)
}

在进行

for idx := r.getLastApplied() + 1; idx <= index; idx++

这个判断的时候,lastapplied一定比index小,index就是lastindex,这样已经是提交状态的日志会被applied。  raft实现了cap中的cp ,没有保证a ,a代表的是用户的请求一定有响应,在出现脑裂的情况下,如果一个leader的请求没有被大多数节点接受,那么就没有办法提交,没法给客户响应,如果3个节点中有2个节点挂掉,就剩一个节点,其实这个时候请求过来后,判断不是主,不会执行,也会返回给客户端not leader,所以这里的响应,是指的不会处理请求,进行业务逻辑处理。在raft代码中,我们可以看到是在apply log后才向客户端发送的响应

case commitEntry := <-r.fsmCommitCh:--通过这个channel能找到commit后向这个channel发送的数据
// Apply the log if a command
var resp interface{}
if commitEntry.log.Type == LogCommand {
start := time.Now()
resp = r.fsm.Apply(commitEntry.log)
metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
}
// Update the indexes
lastIndex = commitEntry.log.Index
lastTerm = commitEntry.log.Term
// Invoke the future if given。--给客户端返回信息
if commitEntry.future != nil {
commitEntry.future.response = resp
commitEntry.future.respond(nil)
}

follower的幂等实现,就是判断entries的第一个index,是否小于等于当前follower的最后一个logindex,如果是,把这之间的删除掉

// Process any new entries
if n := len(a.Entries); n > 0 {
start := time.Now()
first := a.Entries[0]
last := a.Entries[n-1]
// Delete any conflicting entries
lastLogIdx, _ := r.getLastLog()
if first.Index <= lastLogIdx {
r.logger.Printf("[WARN] raft: Clearing log suffix from %d to %d", first.Index, lastLogIdx)
if err := r.logs.DeleteRange(first.Index, lastLogIdx); err != nil {
r.logger.Printf("[ERR] raft: Failed to clear log suffix: %v", err)
return
}
}

相关推荐