基于python的Paxos算法实现

理解一个算法最快,最深刻的做法,我觉着可能是自己手动实现,虽然项目中不用自己实现,有已经封装好的算法库,供我们调用,我觉着还是有必要自己亲自实践一下。

这里首先说明一下,python这种动态语言,对不熟悉的人可能看着比较别扭,不像java那样参数类型是固定的,所以看着会有些蛋疼。这里环境用的是python2.7。

class Message:
  # command
  MSG_ACCEPTOR_AGREE = 0 # 追随者约定
  MSG_ACCEPTOR_ACCEPT = 1 # 追随者接受
  MSG_ACCEPTOR_REJECT = 2 # 追随者拒绝-网络不通
  MSG_ACCEPTOR_UNACCEPT = 3 # 追随者网络通-不同意
  MSG_ACCEPT = 4 # 接受
  MSG_PROPOSE = 5 # 提议
  MSG_EXT_PROPOSE = 6 # 额外提议
  MSG_HEARTBEAT = 7 # 心跳,每隔一段时间同步消息
  def __init__(self, command=None):
    self.command = command
  # 把收到的消息原原路返回,作为应答消息
  def copyAsReply(self, message):
    # 提议ID #当前的ID #发给谁 #谁发的
    self.proposalID, self.instanceID, self.to, self.source = message.proposalID, message.instanceID, message.source, message.to
    self.value = message.value # 发的信息

然后是利用socket,线程和队列实现的消息处理器:

# 基于socket传递消息,封装网络传递消息
import threading
import pickle
import socket
import queue
class MessagePump(threading.Thread):
  # 收取消息线程
  class MPHelper(threading.Thread):
    #
    def __init__(self, owner):
      self.owner = owner
      threading.Thread.__init__(self)
    def run(self):
      while not self.owner.abort: # 只要所有者线程没有结束,一直接受消息
        try:
          (bytes, addr) = self.owner.socket.recvfrom(2048) # 收取消息
          msg = pickle.loads(bytes) # 读取二进制数据转化为消息
          msg.source = addr[1]
          self.owner.queue.put(msg) # 队列存入消息
        except Exception as e:
          pass

  def __init__(self, owner, port, timeout=2):
    threading.Thread.__init__(self)
    self.owner = owner
    self.abort = False
    self.timeout = 2
    self.port = port
    self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP通信
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 200000) # 通信参数
    self.socket.bind(("localhost", port)) # 通信地址,ip,端口
    self.socket.settimeout(timeout) # 超时设置
    self.queue = queue.Queue() # 队列
    self.helper = MessagePump.MPHelper(self) # 接收消息

  # 运行主线程
  def run(self):
    self.helper.start() # 开启收消息的线程
    while not self.abort:
      message = self.waitForMessage() # 阻塞等待
      self.owner.recvMessage(message) # 收取消息

  # 等待消息
  def waitForMessage(self):
    try:
      msg = self.queue.get(True, 3) # 抓取数据,最多等待3s
      return msg
    except:
      return None

  # 发送消息
  def sendMessage(self, message):
    bytes = pickle.dumps(message) # 转化为二进制
    address = ("localhost", message.to) # 地址ip,端口(ip,port)
    self.socket.sendto(bytes, address)
    return True
  #是否停止收取消息
  def doAbort(self):
    self.abort = True

再来一个消息处理器,模拟消息的传递,延迟,丢包,其实这个类没什么卵用,这个是为模拟测试准备的

from MessagePump import MessagePump
import random
class AdversarialMessagePump(MessagePump): # 类的继承
  # 对抗消息传输,延迟消息并任意顺序传递,模拟网络的延迟,消息传送并不是顺序
  def __init__(self, owner, port, timeout=2):
    MessagePump.__init__(self, owner, port, timeout) # 初始化父类
    self.messages = set() # 集合避免重复

  def waitForMessage(self):
    try:
      msg = self.queue.get(True, 0.1) # 从队列抓取数据
      self.messages.add(msg) # 添加消息
    except Exception as e: # 处理异常
      pass
      # print(e)
    if len(self.messages) > 0 and random.random() < 0.95: # Arbitrary!
      msg = random.choice(list(self.messages)) # 随机抓取消息发送
      self.messages.remove(msg) # 删除消息
    else:
      msg = None
    return msg

再来一个是记录类

# InstanceRecord本地记录类,主要记录追随者、领导者最高编号的协议
from PaxosLeaderProtocol import PaxosLeaderProtocol
class InstanceRecord:
  def __init__(self):
    self.protocols = {}
    self.highestID = (-1, -1) # (port,count)
    self.value = None

  def addProtocol(self, protocol):
    self.protocols[protocol.proposalID] = protocol
    #
    if protocol.proposalID[1] > self.highestID[1] or (
        protocol.proposalID[1] == self.highestID[1] and protocol.proposalID[0] > self.highestID[0]):
      self.highestID = protocol.proposalID # 取得编号最大的协议

  def getProtocol(self, protocolID):
    return self.protocols[protocolID]

  def cleanProtocols(self):
    keys = self.protocols.keys()
    for k in keys:
      protocol = self.protocols[k]
      if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
        print("删除协议")
        del self.protocols[k]

下面就是Acceptor的实现:

# 追随者
from MessagePump import MessagePump
from Message import Message
from InstanceRecord import InstanceRecord
from PaxosAcceptorProtocol import PaxosAcceptorProtocol
class PaxosAcceptor:
  def __init__(self, port, leaders):
    self.port = port
    self.leaders = leaders
    self.instances = {} # 接口列表
    self.msgPump = MessagePump(self, self.port) # 消息传递器
    self.failed = False

  # 开始消息传送
  def start(self):
    self.msgPump.start()

  # 停止
  def stop(self):
    self.msgPump.doAbort()

  # 失败
  def fail(self):
    self.failed = True

  def recover(self):
    self.failed = False

  # 发送消息
  def sendMessage(self, message):
    self.msgPump.sendMessage(message)

  # 收消息,只收取为提议的消息
  def recvMessage(self, message):
    if message == None:
      return
    if self.failed: # 失败状态不收取消息
      return

    if message.command == Message.MSG_PROPOSE: # 判断消息是否为提议
      if message.instanceID not in self.instances:
        record = InstanceRecord() # 记录器
        self.instances[message.instanceID] = record
      protocol = PaxosAcceptorProtocol(self) # 创建协议
      protocol.recvProposal(message) # 收取消息
      self.instances[message.instanceID].addProtocol(protocol)
    else:
      self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)

  # 通知客户端,
  def notifyClient(self, protocol, message):
    if protocol.state == PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED: # 提议被接受,通知
      self.instances[protocol.instanceID].value = message.value # 储存信息
      print(u"协议被客户端接受 %s" % message.value)

  # 获取最高同意的建议
  def getHighestAgreedProposal(self, instance):
    return self.instances[instance].highestID # (port,count)

  # 获取接口数据
  def getInstanceValue(self, instance):
    return self.instances[instance].value

那再看下AcceptorProtocol的实现:

from Message import Message
class PaxosAcceptorProtocol(object):
  # State variables
  STATE_UNDEFINED = -1 # 协议没有定义的情况0
  STATE_PROPOSAL_RECEIVED = 0 # 收到消息
  STATE_PROPOSAL_REJECTED = 1 # 拒绝链接
  STATE_PROPOSAL_AGREED = 2 # 同意链接
  STATE_PROPOSAL_ACCEPTED = 3 # 同意请求
  STATE_PROPOSAL_UNACCEPTED = 4 # 拒绝请求

  def __init__(self, client):
    self.client = client
    self.state = PaxosAcceptorProtocol.STATE_UNDEFINED

  # 收取,只处理协议类型的消息
  def recvProposal(self, message):

    if message.command == Message.MSG_PROPOSE: # 协议
      self.proposalID = message.proposalID
      self.instanceID = message.instanceID
      (port, count) = self.client.getHighestAgreedProposal(message.instanceID) # 端口,协议内容的最高编号
      # 检测编号处理消息协议
      # 判断协议是否最高
      if count < self.proposalID[1] or (count == self.proposalID[1] and port < self.proposalID[0]):
        self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED # 协议同意
        print("同意协议:%s, %s " % (message.instanceID, message.value))
        value = self.client.getInstanceValue(message.instanceID)
        msg = Message(Message.MSG_ACCEPTOR_AGREE) # 同意协议
        msg.copyAsReply(message)
        msg.value = value
        msg.sequence = (port, count)
        self.client.sendMessage(msg) # 发送消息
      else: # 不再接受比最高协议小的提议
        self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_REJECTED
      return self.proposalID
    else:
      # 错误重试
      pass
  # 过度
  def doTransition(self, message): # 如果当前协议状态是接受连接,消息类型是接受
    if self.state == PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED and message.command == Message.MSG_ACCEPT:
      self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED # 接收协议
      msg = Message(Message.MSG_ACCEPTOR_ACCEPT) # 创造消息
      msg.copyAsReply(message) # 拷贝并回复
      for l in self.client.leaders:
        msg.to = l
        self.client.sendMessage(msg) # 给领导发送消息
      self.notifyClient(message) # 通知自己
      return True
    raise Exception("并非预期的状态和命令")

  # 通知 自己客户端
  def notifyClient(self, message):
    self.client.notifyClient(self, message)

接着看下Leader和LeaderProtocol实现:

# 领导者
import threading
import Queue
import time
from Message import Message
from MessagePump import MessagePump
from InstanceRecord import InstanceRecord
from PaxosLeaderProtocol import PaxosLeaderProtocol
class PaxosLeader:
  # 定时监听
  class HeartbeatListener(threading.Thread):
    def __init__(self, leader):
      self.leader = leader
      self.queue = Queue.Queue() # 消息队列
      self.abort = False
      threading.Thread.__init__(self)

    def newHB(self, message):
      self.queue.put(message)

    def doAbort(self):
      self.abort = True

    def run(self): # 读取消息
      elapsed = 0
      while not self.abort:
        s = time.time()
        try:
          hb = self.queue.get(True, 2)
          # 设定规则,谁的端口号比较高,谁就是领导
          if hb.source > self.leader.port:
            self.leader.setPrimary(False)
        except:
          self.leader.setPrimary(True)

  # 定时发送
  class HeartbeatSender(threading.Thread):
    def __init__(self, leader):
      threading.Thread.__init__(self)
      self.leader = leader
      self.abort = False
    def doAbort(self):
      self.abort = True
    def run(self):
      while not self.abort:
        time.sleep(1)
        if self.leader.isPrimary:
          msg = Message(Message.MSG_HEARTBEAT)
          msg.source = self.leader.port
          for leader in self.leader.leaders:
            msg.to = leader
            self.leader.sendMessage(msg)

  def __init__(self, port, leaders=None, acceptors=None):
    self.port = port
    if leaders == None:
      self.leaders = []
    else:
      self.leaders = leaders
    if acceptors == None:
      self.acceptors = []
    else:
      self.acceptors = acceptors
    self.group = self.leaders + self.acceptors # 集合合并
    self.isPrimary = False # 自身是不是领导
    self.proposalCount = 0
    self.msgPump = MessagePump(self, port) # 消息传送器
    self.instances = {}
    self.hbListener = PaxosLeader.HeartbeatListener(self) # 监听
    self.hbSender = PaxosLeader.HeartbeatSender(self) # 发送心跳
    self.highestInstance = -1 # 协议状态
    self.stoped = True # 是否正在运行
    self.lasttime = time.time() # 最后一次时间

  def sendMessage(self, message):
    self.msgPump.sendMessage(message)

  def start(self):
    self.hbSender.start()
    self.hbListener.start()
    self.msgPump.start()
    self.stoped = False

  def stop(self):
    self.hbSender.doAbort()
    self.hbListener.doAbort()
    self.msgPump.doAbort()
    self.stoped = True

  def setPrimary(self, primary): # 设置领导者
    if self.isPrimary != primary:
      # Only print if something's changed
      if primary:
        print(u"我是leader%s" % self.port)
      else:
        print(u"我不是leader%s" % self.port)
    self.isPrimary = primary

  # 获取所有的领导下面的追随者
  def getGroup(self):
    return self.group

  def getLeaders(self):
    return self.leaders

  def getAcceptors(self):
    return self.acceptors

  # 必须获得1/2以上的人支持
  def getQuorumSize(self):
    return (len(self.getAcceptors()) / 2) + 1

  def getInstanceValue(self, instanceID):
    if instanceID in self.instances:
      return self.instances[instanceID].value
    return None

  def getHistory(self): # 历史记录
    return [self.getInstanceValue(i) for i in range(1, self.highestInstance + 1)]

  # 抓取同意的数量
  def getNumAccpted(self):
    return len([v for v in self.getHistory() if v != None])

  # 抓取空白时间处理下事务
  def findAndFillGaps(self):
    for i in range(1, self.highestInstance):
      if self.getInstanceValue(i) == None:
        print("填充空白", i)
        self.newProposal(0, i)
    self.lasttime = time.time()

  # 采集无用信息
  def garbageCollect(self):
    for i in self.instances:
      self.instances[i].cleanProtocols()

  # 通知领导
  def recvMessage(self, message):
    if self.stoped:
      return
    if message == None:
      if self.isPrimary and time.time() - self.lasttime > 15.0:
        self.findAndFillGaps()
        self.garbageCollect()
      return
    #处理心跳信息
    if message.command == Message.MSG_HEARTBEAT:
      self.hbListener.newHB(message)
      return True
    #处理额外的提议
    if message.command == Message.MSG_EXT_PROPOSE:
      print("额外的协议", self.port, self.highestInstance)
      if self.isPrimary:
        self.newProposal(message.value)
      return True

    if self.isPrimary and message.command != Message.MSG_ACCEPTOR_ACCEPT:
      self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)

    if message.command == Message.MSG_ACCEPTOR_ACCEPT:
      if message.instanceID not in self.instances:
        self.instances[message.instanceID] = InstanceRecord()
      record = self.instances[message.instanceID]
      if message.proposalID not in record.protocols:#创建协议
        protocol = PaxosLeaderProtocol(self)
        protocol.state = PaxosLeaderProtocol.STATE_AGREED
        protocol.proposalID = message.proposalID
        protocol.instanceID = message.instanceID
        protocol.value = message.value
        record.addProtocol(protocol)
      else:
        protocol = record.getProtocol(message.proposalID)

      protocol.doTransition(message)

    return True
  # 新建提议
  def newProposal(self, value, instance=None):
    protocol = PaxosLeaderProtocol(self)
    if instance == None: # 创建协议标号
      self.highestInstance += 1
      instanceID = self.highestInstance
    else:
      instanceID = instance
    self.proposalCount += 1
    id = (self.port, self.proposalCount)
    if instanceID in self.instances:
      record = self.instances[instanceID]
    else:
      record = InstanceRecord()
      self.instances[instanceID] = record
    protocol.propose(value, id, instanceID)
    record.addProtocol(protocol)

  def notifyLeader(self, protocol, message):
    if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
      print("协议接口%s被%s接受" % (message.instanceID, message.value))
      self.instances[message.instanceID].accepted = True
      self.instances[message.instanceID].value = message.value
      self.highestInstance = max(message.instanceID, self.highestInstance)
      return
    if protocol.state == PaxosLeaderProtocol.STATE_REJECTED: # 重新尝试
      self.proposalCount = max(self.proposalCount, message.highestPID[1])
      self.newProposal(message.value)
      return True
    if protocol.state == PaxosLeaderProtocol.STATE_UNACCEPTED:
      pass

LeaderProtocol实现:

from Message import Message
class PaxosLeaderProtocol(object):
  STATE_UNDEFINED = -1 # 协议没有定义的情况0
  STATE_PROPOSED = 0 # 协议消息
  STATE_REJECTED = 1 # 拒绝链接
  STATE_AGREED = 2 # 同意链接
  STATE_ACCEPTED = 3 # 同意请求
  STATE_UNACCEPTED = 4 # 拒绝请求
  def __init__(self, leader):
    self.leader = leader
    self.state = PaxosLeaderProtocol.STATE_UNDEFINED
    self.proposalID = (-1, -1)
    self.agreecount, self.acceptcount = (0, 0)
    self.rejectcount, self.unacceptcount = (0, 0)
    self.instanceID = -1
    self.highestseen = (0, 0)
  # 提议
  def propose(self, value, pID, instanceID):
    self.proposalID = pID
    self.value = value
    self.instanceID = instanceID
    message = Message(Message.MSG_PROPOSE)
    message.proposalID = pID
    message.instanceID = instanceID
    message.value = value
    for server in self.leader.getAcceptors():
      message.to = server
      self.leader.sendMessage(message)
    self.state = PaxosLeaderProtocol.STATE_PROPOSED

    return self.proposalID

  # 過度
  def doTransition(self, message):
    # 根據狀態運行協議
    if self.state == PaxosLeaderProtocol.STATE_PROPOSED:
      if message.command == Message.MSG_ACCEPTOR_AGREE:
        self.agreecount += 1
        if self.agreecount >= self.leader.getQuorumSize(): # 选举
          print(u"达成协议的法定人数,最后的价值回答是:%s" % message.value)
          if message.value != None:
            if message.sequence[0] > self.highestseen[0] or (
                message.sequence[0] == self.highestseen[0] and message.sequence[1] > self.highestseen[
              1]):
              self.value = message.value
              self.highestseen = message.sequence

            self.state = PaxosLeaderProtocol.STATE_AGREED # 同意更新
            # 发送同意消息
            msg = Message(Message.MSG_ACCEPT)
            msg.copyAsReply(message)
            msg.value = self.value
            msg.leaderID = msg.to
            for server in self.leader.getAcceptors():
              msg.to = server
              self.leader.sendMessage(msg)
            self.leader.notifyLeader(self, message)
          return True

        if message.command == Message.MSG_ACCEPTOR_REJECT:
          self.rejectcount += 1
          if self.rejectcount >= self.leader.getQuorumSize():
            self.state = PaxosLeaderProtocol.STATE_REJECTED
            self.leader.notifyLeader(self, message)
          return True

    if self.state == PaxosLeaderProtocol.STATE_AGREED:
      if message.command == Message.MSG_ACCEPTOR_ACCEPT: # 同意协议
        self.acceptcount += 1
        if self.acceptcount >= self.leader.getQuorumSize():
          self.state = PaxosLeaderProtocol.STATE_ACCEPTED # 接受
          self.leader.notifyLeader(self, message)
      if message.command == Message.MSG_ACCEPTOR_UNACCEPT:
        self.unacceptcount += 1
        if self.unacceptcount >= self.leader.getQuorumSize():
          self.state = PaxosLeaderProtocol.STATE_UNACCEPTED
          self.leader.notifyLeader(self, message)

测试模块:

import socket, pickle, time
from Message import Message
from PaxosAcceptor import PaxosAcceptor
from PaxosLeader import PaxosLeader

if __name__ == "__main__":
  # 设定5个客户端
  numclients = 5
  clients = [PaxosAcceptor(port, [54321, 54322]) for port in range(64320, 64320 + numclients)]
  # 两个领导者
  leader1 = PaxosLeader(54321, [54322], [c.port for c in clients])
  leader2 = PaxosLeader(54322, [54321], [c.port for c in clients])

  # 开启领导者与追随者
  leader1.start()
  leader1.setPrimary(True)
  leader2.setPrimary(True)
  leader2.start()
  for c in clients:
    c.start()

  # 破坏,客户端不链接
  clients[0].fail()
  clients[1].fail()

  # 通信
  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # udp协议
  start = time.time()
  for i in range(1000):
    m = Message(Message.MSG_EXT_PROPOSE) # 消息
    m.value = 0 + i # 消息参数
    m.to = 54322 # 设置传递的端口
    bytes = pickle.dumps(m) # 提取的二进制数据
    s.sendto(bytes, ("localhost", m.to)) # 发送消息

  while leader2.getNumAccpted() < 999:
    print("休眠的这一秒 %d " % leader2.getNumAccpted())
    time.sleep(1)
  print(u"休眠10秒")
  time.sleep(10)
  print(u"停止leaders")
  leader1.stop()
  leader2.stop()
  print(u"停止客户端")
  for c in clients:
    c.stop()
  print(u"leader1历史纪录")
  print(leader1.getHistory())
  print(u"leader2历史纪录")
  print(leader2.getHistory())
  end = time.time()
  print(u"一共用了%f秒" % (end - start))

代码确实比较长,看起来有些困难,最好还是在pycharm上看这个逻辑,可以快速定位参数指向,如果有不对的地方欢迎指正

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Python实现简单层次聚类算法以及可视化

    本文实例为大家分享了Python实现简单层次聚类算法,以及可视化,供大家参考,具体内容如下 基本的算法思路就是:把当前组间距离最小的两组合并成一组. 算法的差异在算法如何确定组件的距离,一般有最大距离,最小距离,平均距离,马氏距离等等. 代码如下: import numpy as np import data_helper np.random.seed(1) def get_raw_data(n): _data=np.random.rand(n,2) #生成数据的格式是n个(x,y) _grou

  • python3实现二叉树的遍历与递归算法解析(小结)

    1.二叉树的三种遍历方式 二叉树有三种遍历方式:先序遍历,中序遍历,后续遍历 即:先中后指的是访问根节点的顺序 eg:先序 根左右 中序 左根右 后序 左右根 遍历总体思路:将树分成最小的子树,然后按照顺序输出 1.1 先序遍历 a 先访问根节点 b 访问左节点 c 访问右节点 a(b ( d ( h ) )( e ( i ) ))( c ( f )( g )) -- abdheicfg 1.2 中序遍历 a 先访问左节点 b 访问根节点 c 访问右节点 ( ( ( h ) d ) b ( (

  • 如何通过雪花算法用Python实现一个简单的发号器

    实现一个简单的发号器 根据snowflake算法的原理实现一个简单的发号器,产生不重复.自增的id. 1.snowflake算法的简单描述 这里的snowflake算法是用二进制的,有64位.其中41位的时间戳表示:当前时间戳减去某个设定的起始时间,10位标识表示:不同的机器.数据库的标识ID等等,序列号为每秒或每毫秒内自增的id. 我做的时候没有用位运算去实现,而是做了一个十进制的,16位的(当时项目要求是16位的).但是实现发号器的基本策略是一样的,通过时间戳和标识来防止重复,通过序列号实现

  • Python实现在某个数组中查找一个值的算法示例

    第一种算法思路: 第一步:随机出来一个数组的下标 第二步:判断下标对应的值是否等于被查找的值,是的话终止,已找到,否的话转第三步. 第三步:判断是否随机完数组的所有下标,是的话终止,没找到,否的话转第一步. 代码如下: #本程序的功能是在字典中查找存在某个值 import random di = {'a':1,'b':2,'c':3,'d':4,'e':5,'f':6} key = 2 di1 = {} while True: tmp = random.choice(di.keys()) #随机

  • python TF-IDF算法实现文本关键词提取

    TF(Term Frequency)词频,在文章中出现次数最多的词,然而文章中出现次数较多的词并不一定就是关键词,比如常见的对文章本身并没有多大意义的停用词.所以我们需要一个重要性调整系数来衡量一个词是不是常见词.该权重为IDF(Inverse Document Frequency)逆文档频率,它的大小与一个词的常见程度成反比.在我们得到词频(TF)和逆文档频率(IDF)以后,将两个值相乘,即可得到一个词的TF-IDF值,某个词对文章的重要性越高,其TF-IDF值就越大,所以排在最前面的几个词就

  • Python实现查找数组中任意第k大的数字算法示例

    本文实例讲述了Python实现查找数组中任意第k大的数字算法.分享给大家供大家参考,具体如下: 模仿partion方法,当high=low小于k的时候,在后半部分搜索,当high=low大于k的时候,在前半部分搜索.与快排不同的是,每次都减少了一半的排序. def partitionOfK(numbers, start, end, k): if k < 0 or numbers == [] or start < 0 or end >= len(numbers) or k > end

  • python实现鸢尾花三种聚类算法(K-means,AGNES,DBScan)

    一.分散性聚类(kmeans) 算法流程: 1.选择聚类的个数k. 2.任意产生k个聚类,然后确定聚类中心,或者直接生成k个中心. 3.对每个点确定其聚类中心点. 4.再计算其聚类新中心. 5.重复以上步骤直到满足收敛要求.(通常就是确定的中心点不再改变. 优点: 1.是解决聚类问题的一种经典算法,简单.快速 2.对处理大数据集,该算法保持可伸缩性和高效率 3.当结果簇是密集的,它的效果较好 缺点 1.在簇的平均值可被定义的情况下才能使用,可能不适用于某些应用 2.必须事先给出k(要生成的簇的数

  • 基于python的Paxos算法实现

    理解一个算法最快,最深刻的做法,我觉着可能是自己手动实现,虽然项目中不用自己实现,有已经封装好的算法库,供我们调用,我觉着还是有必要自己亲自实践一下. 这里首先说明一下,python这种动态语言,对不熟悉的人可能看着比较别扭,不像java那样参数类型是固定的,所以看着会有些蛋疼.这里环境用的是python2.7. class Message: # command MSG_ACCEPTOR_AGREE = 0 # 追随者约定 MSG_ACCEPTOR_ACCEPT = 1 # 追随者接受 MSG_

  • 基于python实现雪花算法过程详解

    这篇文章主要介绍了基于python实现雪花算法过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 Snowflake是Twitter提出来的一个算法,其目的是生成一个64bit的整数: 1bit:一般是符号位,不做处理 41bit:用来记录时间戳,这里可以记录69年,如果设置好起始时间比如今年是2018年,那么可以用到2089年,到时候怎么办?要是这个系统能用69年,我相信这个系统早都重构了好多次了. 10bit:10bit用来记录机器ID

  • 基于Python实现Hash算法

    目录 1 前言 2 一般hash算法 2.1 算法逻辑 2.2 代码实现 2.3 总结 3 一致性hash算法 3.1 算法逻辑 3.2 代码实现 3.3 总结 1 前言 Simhash的算法简单的来说就是,从海量文本中快速搜索和已知simhash相差小于k位的simhash集合,这里每个文本都可以用一个simhash值来代表,一个simhash有64bit,相似的文本,64bit也相似,论文中k的经验值为3.该方法的缺点如优点一样明显,主要有两点,对于短文本,k值很敏感:另一个是由于算法是以空

  • 基于Python实现DIT-FFT算法

    目录 自己写函数实现FFT 使用python的第三方库进行FFT 自己写函数实现FFT 使用递归方法 from math import log, ceil, cos, sin, pi import matplotlib.pyplot as plt import numpy as np # 这两行代码解决 plt 中文显示的问题 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = Fal

  • 基于Python实现迪杰斯特拉和弗洛伊德算法

    图搜索之基于Python的迪杰斯特拉算法和弗洛伊德算法,供大家参考,具体内容如下 Djstela算法 #encoding=UTF-8 MAX=9 ''' Created on 2016年9月28日 @author: sx ''' b=999 G=[[0,1,5,b,b,b,b,b,b],\ [1,0,3,7,5,b,b,b,b],\ [5,3,0,b,1,7,b,b,b],\ [b,7,b,0,2,b,3,b,b],\ [b,5,1,2,0,3,6,9,b],\ [b,b,7,b,3,0,b,5

  • 基于 Python 实践感知器分类算法

    Perceptron是用于二进制分类任务的线性机器学习算法.它可以被认为是人工神经网络的第一种和最简单的类型之一.绝对不是"深度"学习,而是重要的组成部分.与逻辑回归相似,它可以快速学习两类分类任务在特征空间中的线性分离,尽管与逻辑回归不同,它使用随机梯度下降优化算法学习并且不预测校准概率. 在本教程中,您将发现Perceptron分类机器学习算法.完成本教程后,您将知道: Perceptron分类器是一种线性算法,可以应用于二进制分类任务. 如何使用带有Scikit-Learn的Pe

  • 基于Python代码实现Apriori 关联规则算法

    目录 一.关联规则概述 二.应用场景举例 1.股票涨跌预测 2.视频.音乐.图书等推荐 3.打车路线预测(考虑时空) 4.风控策略自动化挖掘 三.3个最重要的概念 1.支持度 2.置信度 3.提升度 4. 频繁项集 四.Python算法介绍 五.挖掘实例 一.关联规则概述 1993年,Agrawal等人在首先提出关联规则概念,迄今已经差不多30年了,在各种算法层出不穷的今天,这算得上是老古董了,比很多人的年纪还大,往往是数据挖掘的入门算法,但深入研究的不多,尤其在风控领域,有着极其重要的应用潜力

  • python基于双向链表实现LFU算法

    本文实例为大家分享了python实现LFU算法的具体代码,供大家参考,具体内容如下 在第一节中实现了双向链表DoubleLinkedList类,上一节中基于双向链表实现了LRU算法,本节课我们继续基于双向链表实现LFU(Least frequently used 最不经常使用)算法. 一.重写Node节点类 构建LFUNode类 继承自第一节中的Node类,添加freq属性用来表示节点使用频率 class LFUNode(Node):     def __init__(self, key, va

  • 详解分布式系统中如何用python实现Paxos

    一致性算法背景 1.Paxos一致性算法解决的问题:分布式系统中数据不能存在单个节点(主机)上,否则可能出现单点故障:多个节点(主机)需要保证具有相同的数据. 2.什么是一致性:一致性就是数据保持一致,在分布式系统中,可以理解为多个节点中数据的值是一致的. 3.一致性模型分类:一般分为强一致性和弱一致性,强一致性保证系统改变提交以后立即改变集群的状态.常见模型包括:Paxos,Raft(muti-paxos),ZAB(muti-paxos): 弱一致性也叫最终一致性,系统不保证改变提交以后立即改

  • 基于Python和Scikit-Learn的机器学习探索

    你好,%用户名%! 我叫Alex,我在机器学习和网络图分析(主要是理论)有所涉猎.我同时在为一家俄罗斯移动运营商开发大数据产品.这是我第一次在网上写文章,不喜勿喷. 现在,很多人想开发高效的算法以及参加机器学习的竞赛.所以他们过来问我:"该如何开始?".一段时间以前,我在一个俄罗斯联邦政府的下属机构中领导了媒体和社交网络大数据分析工具的开发.我仍然有一些我团队使用过的文档,我乐意与你们分享.前提是读者已经有很好的数学和机器学习方面的知识(我的团队主要由MIPT(莫斯科物理与技术大学)和

随机推荐