使用GO实现Paxos共识算法的方法

什么是Paxos共识算法

最初的服务往往都是通过单体架构对外提供的,即单Server-单Database模式。随着业务的不断扩展,用户和请求数都在不断上升,如何应对大量的请求就成了每个服务都需要解决的问题,这也就是我们常说的高并发。为了解决单台服务器面对高并发的苍白无力,可以通过增加服务器数量来解决,即多Server-单Database(Master-Slave)模式,此时的压力就来到了数据库一方,数据库的IO效率决定了整个服务的效率,继续增加Server数量将无法提升服务性能。这就衍生出了当前火热的微服务架构。当用户请求经由负载均衡分配到某一服务实例上后,如何保证该服务的其他实例最终能够得到相同的数据变化呢?这就要用到Paxos分布式共识协议,Paxos解决的就是共识问题,也就是一段时间后,无论get哪一个服务实例,都能获取到相同的数据。目前国内外的分布式产品很多都使用了Paxos协议,可以说Paxos几乎就是共识协议的标准和代名词。

Paxos有两种协议,我们常常提到的其实是Basic Paxos,另一种叫Multi Paxos,如无特殊说明,本文中提到的Paxos协议均为Basic Paxos。

Paxos协议是由图灵奖获得者Leslie Lamport于1998年在其论文《The Part-Time Parliament》中首次提出的,讲述了一个希腊小岛Paxos是如何通过决议的。但由于该论文晦涩艰深,当时的计算机界大牛们也没几个人能理解。于是Lamport2001年再次发表了《Paxos Made Simple》,摘要部分是这么写的:

The Paxos algorithm, when presented in plain English, is very simple.

翻译过来就是:不会吧,不会吧,这么简单的Paxos算法不会真的有人弄不懂吧?然而事实却是很多人对Paxos都望而却步,理解Paxos其实并不难,但是Paxos的难点在于工程化,如何利用Paxos协议写出一个能过够真正在生产环境中跑起来的服务才是Paxos最难的地方,关于Paxos的工程化可以参考微信后台团队撰写的《微信自研生产级paxos类库PhxPaxos实现原理介绍》

Paxos如何保证一致性的

Paxos协议一共有两个阶段:Prepare和Propose,两种角色:Proposer和Acceptor,每一个服务实例既是Proposer,同时也是Acceptor,Proposer负责提议,Acceptor决定是否接收来自Proposer的提议,一旦提议被多数接受,那么我们就可以宣称对该提议包含的值达成了一致,而且不会再改变。

阶段一:Prepare 准备

  • Proposer生成全局唯一ProposalID(时间戳+ServerID)
  • Proposer向所有Acceptor(包括Proposer自己)发送Prepare(n = ProposalID)请求
  • Acceptor比较n和minProposal, if n > minProposal, minProposal = n,Acceptor返回已接受的提议(acceptedProposal, acceptedValue)
  • 承诺1:不再接受n <= minProposal的Prepare请求
  • 承诺2:不再接受n < minProposal的Propose请求
  • 应答1:返回此前已接受的提议
  • 当Proposer收到大于半数的返回后
  • Prepare请求被拒绝,重新生成ProposalID并发送Prepare请求
  • Prepare请求被接受且有已接受的提议,选择最大的ProposalID对应的值作为提议的值
  • Prepare请求被接受且没有已接受的提议,可选择任意提议值

    阶段二:Propose 提议
  • Proposer向所有Acceptor(包括Proposer自己)发送Accept(n=ProposalID,value=ProposalValue)请求
  • Acceptor比较n和minProposal, if n >= minProposal, minProposal = n, acceptedValue = value,返回已接受的提议(minProposal,acceptedValue)
  • 当Proposer收到大于半数的返回后
  • Propose请求被拒绝,重新生成ProposalID并发送Prepare请求
  • Propose请求被接受,则数据达成一致性

一旦提议被半数以上的服务接受,那么我们就可以宣称整个服务集群在这一提议上达成了一致。

需要注意的是,在一个服务集群中以上两个阶段是很有可能同时发生的。 例如:实例A已完成Prepare阶段,并发送了Propose请求。同时实例B开始了Prepare阶段,并生成了更大的ProposalID发送Prepare请求,可能导致实例A的Propose请求被拒绝。 每个服务实例也是同时在扮演Proposer和Acceptor角色,向其他服务发送请求的同时,可能也在处理别的服务发来的请求。

使用GO语言实现Paxos协议

服务注册与发现

由于每个服务实例都是在执行相同的代码,那我们要如何知晓其他服务实例的入口呢(IP和端口号)?方法之一就是写死在代码中,或者提供一份配置文件。服务启动后可以读取该配置文件。但是这种方法不利于维护,一旦我们需要移除或添加服务则需要在每个机器上重新休息配置文件。

除此之外,我们可以通过一个第三方服务:服务的注册与发现来注册并获知当前集群的总服务实例数,即将本地的配置文件改为线上的配置服务。

服务注册:Register函数,服务实例启动后通过调用这个RPC方法将自己注册在服务管理中

func (s *Service) Register(args *RegisterArgs, reply *RegisterReply) error {
 s.mu.Lock()
 defer s.mu.Unlock()

 server := args.ServerInfo
 for _, server := range s.Servers {
  if server.IPAddress == args.ServerInfo.IPAddress && server.Port == args.ServerInfo.Port {
   reply.Succeed = false
   return nil
  }
 }
 reply.ServerID = len(s.Servers)
 reply.Succeed = true
 s.Servers = append(s.Servers, server)

 fmt.Printf("Current registerd servers:\n%v\n", s.Servers)

 return nil
}

服务发现:GetServers函数,服务通过调用该RPC方法获取所有服务实例的信息(IP和端口号)

func (s *Service) GetServers(args *GetServersArgs, reply *GetServersReply) error {
 // return all servers
 reply.ServerInfos = s.Servers

 return nil
}

Prepare阶段

Proposer,向所有的服务发送Prepare请求,并等待直到半数以上的服务返回结果,这里也可以等待所有服务返回后再处理,但是Paxos协议可以容忍小于半数的服务宕机,因此我们只等待大于N/2个返回即可。当返回的结果有任何一个请求被拒绝,那Proposer即认为这次的请求被拒绝,返回重新生成ProposalID并发送新一轮的Prepare请求。

func (s *Server) CallPrepare(allServers []ServerInfo, proposal Proposal) PrepareReply {
 returnedReplies := make([]PrepareReply, 0)
 for _, otherS := range allServers {
  // use a go routine to call every server
  go func(otherS ServerInfo) {
   delay := rand.Intn(10)
   time.Sleep(time.Second * time.Duration(delay))
   args := PrepareArgs{s.Info, proposal.ID}
   reply := PrepareReply{}
   fmt.Printf("【Prepare】Call Prepare on %v:%v with proposal id %v\n", otherS.IPAddress, otherS.Port, args.ProposalID)
   if Call(otherS, "Server.Prepare", &args, &reply) {
    if reply.HasAcceptedProposal {
     fmt.Printf("【Prepare】%v:%v returns accepted proposal: %v\n", otherS.IPAddress, otherS.Port, reply.AcceptedProposal)
    } else {
     fmt.Printf("【Prepare】%v:%v returns empty proposal\n", otherS.IPAddress, otherS.Port)
    }
    s.mu.Lock()
    returnedReplies = append(returnedReplies, reply)
    s.mu.Unlock()
   }
  }(otherS)
 }
 for {
  // wait for responses from majority
  if len(returnedReplies) > (len(allServers))/2.0 {
   checkReplies := returnedReplies
   // three possible response
   // 1. deny the prepare, and return an empty/accepted proposal
   // as the proposal id is not higher than minProposalID on server (proposal id <= server.minProposalID)
   // 2. accept the prepare, and return an empty proposal as the server has not accept any proposal yet
   // 3. accept the prepare, and return an accepted proposal
   // check responses from majority
   // find the response with max proposal id
   acceptedProposal := NewProposal()
   for _, r := range checkReplies {
    // if any response refused the prepare, this server should resend prepare
    if !r.PrepareAccepted {
     return r
    }
    if r.HasAcceptedProposal && r.AcceptedProposal.ID > acceptedProposal.ID {
     acceptedProposal = r.AcceptedProposal
    }
   }
   // if some other server has accepted proposal, return that proposal with max proposal id
   // if no other server has accepted proposal, return an empty proposal
   return PrepareReply{HasAcceptedProposal: !acceptedProposal.IsEmpty(), AcceptedProposal: acceptedProposal, PrepareAccepted: true}
  }
  //fmt.Printf("Waiting for response from majority...\n")
  time.Sleep(time.Second * 1)
 }
}

Acceptor,通过比较ProposalID和minProposal,如果ProposalID小于等于minProposal,则拒绝该Prepare请求,否则更新minProposal为ProposalID。最后返回已接受的提议

func (s *Server) Prepare(args *PrepareArgs, reply *PrepareReply) error {
 s.mu.Lock()
 defer s.mu.Unlock()
 // 2 promises and 1 response
 // Promise 1
 // do not accept prepare request which ProposalID <= minProposalID
 // Promise 2
 // do not accept propose request which ProposalID < minProposalID
 // Response 1
 // respond with accepted proposal if any
 if reply.PrepareAccepted = args.ProposalID > s.minProposalID; reply.PrepareAccepted {
  // ready to accept the proposal with Id s.minProposalID
  s.minProposalID = args.ProposalID
 }
 reply.HasAcceptedProposal = s.readAcceptedProposal()
 reply.AcceptedProposal = s.Proposal
 return nil
}

Propose阶段

Proposer,同样首先向所有的服务发送Propose请求,并等待知道半数以上的服务返回结果。如果返回的结果有任何一个请求被拒绝,则Proposer认为这次的请求被拒绝,返回重新生成ProposalID并发送新一轮的Prepare请求

func (s *Server) CallPropose(allServers []ServerInfo, proposal Proposal) ProposeReply {
 returnedReplies := make([]ProposeReply, 0)
 for _, otherS := range allServers {
  go func(otherS ServerInfo) {
   delay := rand.Intn(5000)
   time.Sleep(time.Millisecond * time.Duration(delay))
   args := ProposeArgs{otherS, proposal}
   reply := ProposeReply{}
   fmt.Printf("【Propose】Call Propose on %v:%v with proposal: %v\n", otherS.IPAddress, otherS.Port, args.Proposal)
   if Call(otherS, "Server.Propose", &args, &reply) {
    fmt.Printf("【Propose】%v:%v returns: %v\n", otherS.IPAddress, otherS.Port, reply)
    s.mu.Lock()
    returnedReplies = append(returnedReplies, reply)
    s.mu.Unlock()
   }
  }(otherS)
 }
 for {
  // wait for responses from majority
  if len(returnedReplies) > (len(allServers))/2.0 {
   checkReplies := returnedReplies
   for _, r := range checkReplies {
    if !r.ProposeAccepted {
     return r
    }
   }
   return checkReplies[0]
  }
  time.Sleep(time.Second * 1)
 }
}

Acceptor,通过比较ProposalID和minProposal,如果ProposalID小于minProposal,则拒绝该Propose请求,否则更新minProposal为ProposalID,并将提议持久化到本地磁盘中。

func (s *Server) Propose(args *ProposeArgs, reply *ProposeReply) error {
 if s.minProposalID <= args.Proposal.ID {
  s.mu.Lock()
  s.minProposalID = args.Proposal.ID
  s.Proposal = args.Proposal
  s.SaveAcceptedProposal()
  s.mu.Unlock()

  reply.ProposeAccepted = true
 }

 reply.ProposalID = s.minProposalID

 return nil
}

运行

运行结果:

这里我一共开启了3个服务实例,并在每次请求之前加入了随机的延迟,模拟网络通信中的延迟,因此每个服务的每个请求并不是同时发出的

动图一张:

静态结果一张:

可以看到3个服务尽管一开始会尝试以他们自己的端口号(5001,5002,5003)作为提议值,在Prepare/Propose失败后,都会重新生成更大的ProposalID并开启新一轮的提议过程(Prepare,Propose),且最后都以5003达成一致。

小结

至此,我们就用GO实现了Paxos协议的核心逻辑。但显而易见的是,这段代码仍然存在很多问题,完全无法满足生产环境的需求

  • 通过channel而不是mutex锁来共享数据
  • 如何处理服务实例的移除和增加
  • 如何避免陷入活锁

到此这篇关于使用GO实现Paxos共识算法的文章就介绍到这了,更多相关GO实现Paxos共识算法内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 图解Golang的GC垃圾回收算法

    虽然Golang的GC自打一开始,就被人所诟病,但是经过这么多年的发展,Golang的GC已经改善了非常多,变得非常优秀了. 以下是Golang GC算法的里程碑: v1.1 STW v1.3 Mark STW, Sweep 并行 v1.5 三色标记法 v1.8 hybrid write barrier 经典的GC算法有三种: 引用计数(reference counting) . 标记-清扫(mark & sweep) . 复制收集(Copy and Collection) . Golang的G

  • Golang实现拓扑排序(DFS算法版)

    问题描述:有一串数字1到5,按照下面的关于顺序的要求,重新排列并打印出来.要求如下:2在5前出现,3在2前出现,4在1前出现,1在3前出现. 该问题是一个非常典型的拓扑排序的问题,一般解决拓扑排序的方案是采用DFS-深度优先算法,对于DFS算法我的浅薄理解就是递归,因拓扑排序问题本身会有一些前置条件(本文不过多介绍拓扑算法的定义),所以解决该问题就有了以下思路. 先将排序要求声明成map(把map的key,value看作对顺序的要求,key应在value前出现),然后遍历1-5这几个数,将每次遍

  • golang实现分页算法实例代码

    前言 本文主要给大家介绍了关于golang分页算法的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧 示例代码如下: //分页方法,根据传递过来的页数,每页数,总数,返回分页的内容 7个页数 前 1,2,3,4,5 后 的格式返回,小于5页返回具体页数 func Paginator(page, prepage int, nums int64) map[string]interface{} { var firstpage int //前一页地址 var lastpage i

  • C++实现分水岭算法(Watershed Algorithm)

    分水岭分割方法(Watershed Segmentation),是一种基于拓扑理论的数学形态学的分割方法,其基本思想是把图像看作是测地学上的拓扑地貌,图像中每一点像素的灰度值表示该点的海拔高度,每一个局部极小值及其影响区域称为集水盆,而集水盆的边界则形成分水岭.分水岭的概念和形成可以通过模拟浸入过程来说明.在每一个局部极小值表面,刺穿一个小孔,然后把整个模型慢慢浸入水中,随着浸入的加深,每一个局部极小值的影响域慢慢向外扩展,在两个集水盆汇合处构筑大坝,即形成分水岭. 分水岭的计算过程是一个迭代标

  • 使用GO实现Paxos共识算法的方法

    什么是Paxos共识算法 最初的服务往往都是通过单体架构对外提供的,即单Server-单Database模式.随着业务的不断扩展,用户和请求数都在不断上升,如何应对大量的请求就成了每个服务都需要解决的问题,这也就是我们常说的高并发.为了解决单台服务器面对高并发的苍白无力,可以通过增加服务器数量来解决,即多Server-单Database(Master-Slave)模式,此时的压力就来到了数据库一方,数据库的IO效率决定了整个服务的效率,继续增加Server数量将无法提升服务性能.这就衍生出了当前

  • javascript笛卡尔积算法实现方法

    本文实例讲述了javascript笛卡尔积算法实现方法.分享给大家供大家参考.具体分析如下: 这里可根据给的对象或者数组生成笛卡尔积 //笛卡儿积组合 function descartes(list) { //parent上一级索引;count指针计数 var point = {}; var result = []; var pIndex = null; var tempCount = 0; var temp = []; //根据参数列生成指针对象 for(var index in list)

  • 使用C++实现全排列算法的方法详解

    复制代码 代码如下: <P>不论是哪种全排列生成算法,都遵循着"原排列"→"原中介数"→"新中介数"→"新排列"的过程.</P><P>其中中介数依据算法的不同会的到递增进位制数和递减进位制数.</P><P>关于排列和中介数的一一对应性的证明我们不做讨论,这里仅仅给出了排列和中介数的详细映射方法.</P> · 递增进位制和递减进位制数  所谓递增进位制和递减

  • C#折半插入排序算法实现方法

    本文实例讲述了C#折半插入排序算法实现方法.分享给大家供大家参考.具体实现方法如下: public static void BinarySort (int[] list) { for (int i = 1; i < list.Length; i+ +) { int low = 0; int high = i - 1; int Temp = list [i]; //Find while (low <= high) { int mid = (low + high) / 2; IF (Temp &l

  • php实现希尔排序算法的方法分析

    本文实例讲述了php实现希尔排序算法的方法.分享给大家供大家参考,具体如下: 虽然现在各种程序语言都有其各自强大的排序库函数,但是这些底层实现也都是利用这些基础或高级的排序算法. 理解这些复杂的排序算法还是很有意思的,体会这些排序算法的精妙~ 希尔排序(shell sort):希尔排序是基于插入排序的,区别在于插入排序是相邻的一个个比较(类似于希尔中h=1的情形),而希尔排序是距离h的比较和替换. 希尔排序中一个常数因子n,原数组被分成各个小组,每个小组由h个元素组成,很可能会有多余的元素.当然

  • Java实现SHA算法的方法详解

    本文实例讲述了Java实现SHA算法的方法.分享给大家供大家参考,具体如下: 一 简介 安全散列算法 固定长度摘要信息 二 SHA算法 SHA-1.SHA-2(SHA-224.SHA-256.SHA384.SHA-512) 三 SHA算法实现 package com.imooc.security.sha; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.s

  • Java实现RSA算法的方法详解

    本文实例讲述了Java实现RSA算法的方法.分享给大家供大家参考,具体如下: 一 介绍 唯一广泛接受并实现 用于数据加密和数字签名 公钥加密.私钥解密 私钥加密.公钥解密 二 RSA参数说明 三 实现 package com.imooc.security.rsa; import java.security.KeyFactory; import java.security.KeyPair; import java.security.KeyPairGenerator; import java.sec

  • python二分法查找算法实现方法【递归与非递归】

    本文实例讲述了python二分法查找算法实现方法.分享给大家供大家参考,具体如下: 二分法查找 二分查找又称折半查找,优点是比较次数少,查找速度快,平均性能好:其缺点是要求待查表为有序表,且插入删除困难.因此,折半查找方法适用于不经常变动而查找频繁的有序列表.首先,假设表中元素是按升序排列,将表中间位置记录的关键字与查找关键字比较,如果两者相等,则查找成功:否则利用中间位置记录将表分成前.后两个子表,如果中间位置记录的关键字大于查找关键字,则进一步查找前一子表,否则进一步查找后一子表.重复以上过

  • python聚类算法选择方法实例

    说明 1.如果数据集是高维度的,选择谱聚类是子空间的一种. 2.如果数据量是中小型的,比如在100W条以内,K均值会是更好的选择:如果数据量超过100W条,可以考虑使用MiniBatchKMeans. 3.如果数据集中有噪声(离群点),使用基于密度的DBSCAN可以有效解决这个问题. 4.若追求更高的分类准确性,则选择谱聚类比K均值准确性更好. 实例 import numpy as np import matplotlib.pyplot as plt # 数据准备 raw_data = np.l

  • java实现LRU缓存淘汰算法的方法

    LRU算法:最近最少使用淘汰算法(Least Recently Used).LRU是淘汰最长时间没有被使用的缓存(即使该缓存被访问的次数最多). 如何实现LRU缓存淘汰算法 场景: 我们现在有这么个真实场景,我在爬取某个网站时,控制该网站的代理IP并发数,太多会搞垮对方网站的对吧,要蹲号子的呢.这里我需要维护一个代理IP代理池,而且这些IP肯定不是一直都很稳定的,但是又不能取一个就丢一个,这样太浪费资源.所以我会将这些IP缓存起来,进行按需提取,采用LRU最近最少使用的策略去管理代理IP. 代码

随机推荐