Python通过zookeeper实现分布式服务代码解析

借助zookeeper可以实现服务器的注册与发现,有需求的时候调用zookeeper来发现可用的服务器,将任务均匀分配到各个服务器上去.

这样可以方便的随任务的繁重程度对服务器进行弹性扩容,客户端和服务端是非耦合的,也可以随时增加客户端.

zk_server.py

import threading
import json
import socket
import sys
from kazoo.client import KazooClient

# TCP服务端绑定端口开启监听,同时将自己注册到zk
class ZKServer(object):
  def __init__(self, host, port):
    self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self.host = host
    self.port = port
    self.sock.bind((host, port))
    self.zk = None

  def serve(self):
    """
    开始服务,每次获取得到一个信息,都新建一个线程处理
    """
    self.sock.listen(128)
    self.register_zk()
    print("开始监听")
    while True:
      conn, addr = self.sock.accept()
      print("建立链接%s" % str(addr))
      t = threading.Thread(target=self.handle, args=(conn, addr))
      t.start()

  # 具体的处理逻辑,只要接收到数据就立即投入工作,下次没有数据本次链接结束
  def handle(self, conn, addr):
    while True:
      data=conn.recv(1024)
      if not data or data.decode('utf-8') == 'exit':
        break
      print(data.decode('utf-8'))
    conn.close()
    print('My work is done!!!')

  # 将自己注册到zk,临时节点,所以连接不能中断
  def register_zk(self):
    """
    注册到zookeeper
    """
    self.zk = KazooClient(hosts='127.0.0.1:2181')
    self.zk.start()
    self.zk.ensure_path('/rpc') # 创建根节点
    value = json.dumps({'host': self.host, 'port': self.port})
    # 创建服务子节点
    self.zk.create('/rpc/server', value.encode(), ephemeral=True, sequence=True)

if __name__ == '__main__':
  if len(sys.argv) < 3:
    print("usage:python server.py [host] [port]")
    exit(1)
  host = sys.argv[1]
  port = sys.argv[2]
  server = ZKServer(host, int(port))
  server.serve()

zk_client.py

import random
import sys
import time
import json
import socket

from kazoo.client import KazooClient

# 客户端连接zk,并从zk获取可用的服务器列表
class ZKClient(object):
  def __init__(self):
    self._zk = KazooClient(hosts='127.0.0.1:2181')
    self._zk.start()
    self._get_servers()

  def _get_servers(self, event=None):
    """
    从zookeeper获取服务器地址信息列表
    """
    servers = self._zk.get_children('/rpc', watch=self._get_servers)
    # print(servers)
    self._servers = []
    for server in servers:
      data = self._zk.get('/rpc/' + server)[0]
      if data:
        addr = json.loads(data.decode())
        self._servers.append(addr)

  def _get_server(self):
    """
    随机选出一个可用的服务器
    """
    return random.choice(self._servers)

  def get_connection(self):
    """
    提供一个可用的tcp连接
    """
    sock = None
    while True:
      server = self._get_server()
      print('server:%s' % server)
      try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((server['host'], server['port']))
      except ConnectionRefusedError:
        time.sleep(1)
        continue
      else:
        break
    return sock
if __name__ == '__main__':
  # 模拟多个客户端批量生成任务,推送给服务器执行
  client = ZKClient()
  for i in range(40):
    sock = client.get_connection()
    sock.send(bytes(str(i), encoding='utf8'))
    sock.close()
    time.sleep(1)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • windows系统搭建zookeeper服务器的教程

    安装&配置 在apache的官方网站提供了好多镜像下载地址,然后找到对应的版本 下载地址: http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz Windows下安装 把下载的zookeeper的文件解压到指定目录 C:\ZK\zookeeper-3.4.14> 修改conf下增加一个zoo.cfg 内容如下: # The number of milliseconds of each

  • zookeeper python接口实例详解

    本文主要讲python支持zookeeper的接口库安装和使用.zk的python接口库有zkpython,还有kazoo,下面是zkpython,是基于zk的C库的python接口. zkpython安装 前提是zookeeper安装包已经在/usr/local/zookeeper下 cd /usr/local/zookeeper/src/c ./configure make make install wget --no-check-certificate http://pypi.python

  • Zookeeper未授权访问测试问题

    前言 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件.它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护.域名服务.分布式同步.组服务等. zookeeper 未授权访问是指安装部署之后默认情况下不需要任何身份验证,从而导致 zookeeper 被远程利用,导致大量服务级别的信息泄露. 默认使用端口:2181.2182. 探测Zookeeper服务开放 如使用nmap探测某个目标

  • Zookeeper连接超时问题与拒绝连接的解决方案

    今天在工作中突然遇到这个问题,开始郁闷得不行,查阅了很多资料才解决.话不多少先上图 ①解决连接超时问题 1:在Linux下输入命令ifconfig -a   看看是否ip地址输入错误 2:关闭Linux防火墙,输入  chkconfig iptables off 命令 ②解决拒绝连接问题 报错图忘截了,不好意思,还是直接说解决方案吧! 将前面的127.0.0.1删掉,输入:wq 命令保存就行了,原因是与输入的地址发生冲入,所以拒绝连接. 总结 以上就是这篇文章的全部内容了,希望本文的内容对大家的

  • viper配置框架的介绍支持zookeeper的读取和监听

    viper作为配置框架,其功能非常的强大,我们没有理由不去了解一下.我们先看官网对它的功能简介: viper是完整配置解决方案,他可以处理所有类型和格式的配置文件,他有如下功能: 设置默认配置 支持读取 JSON TOML YAML HCL 和 Java 属性配置文件 监听配置文件变化,实时读取读取配置文件内容 读取环境变量值 读取远程配置系统 (etcd Consul) 和监控配置变化 读取命令 Flag 值 读取 buffer 值 读取确切值 乍一看,未免有相见恨晚之感,可仔细一想,不免脑袋

  • Netty + ZooKeeper 实现简单的服务注册与发现

    一. 背景 最近的一个项目:我们的系统接收到上游系统的派单任务后,会推送到指定的门店的相关设备,并进行相应的业务处理. 二. Netty 的使用 在接收到派单任务之后,通过 Netty 推送到指定门店相关的设备.在我们的系统中 Netty 实现了消息推送.长连接以及心跳机制. 2.1 Netty Server 端: 每个 Netty 服务端通过 ConcurrentHashMap 保存了客户端的 clientId 以及它连接的 SocketChannel. 服务器端向客户端发送消息时,只要获取

  • 基于 ZooKeeper 搭建 Hadoop 高可用集群 的教程图解

    一.高可用简介 Hadoop 高可用 (High Availability) 分为 HDFS 高可用和 YARN 高可用,两者的实现基本类似,但 HDFS NameNode 对数据存储及其一致性的要求比 YARN ResourceManger 高得多,所以它的实现也更加复杂,故下面先进行讲解: 1.1 高可用整体架构 HDFS 高可用架构如下: 图片引用自: https://www.edureka.co/blog/how-to-set-up-hadoop-cluster-with-hdfs-hi

  • Docker搭建Zookeeper&Kafka集群的实现

    最近在学习Kafka,准备测试集群状态的时候感觉无论是开三台虚拟机或者在一台虚拟机开辟三个不同的端口号都太麻烦了(嗯..主要是懒). 环境准备 一台可以上网且有CentOS7虚拟机的电脑 为什么使用虚拟机?因为使用的笔记本,所以每次连接网络IP都会改变,还要总是修改配置文件的,过于繁琐,不方便测试.(通过Docker虚拟网络的方式可以避免此问题,当时实验的时候没有了解到) Docker 安装 如果已经安装Docker请忽略此步骤 Docker支持以下的CentOS版本: CentOS 7 (64

  • Python通过zookeeper实现分布式服务代码解析

    借助zookeeper可以实现服务器的注册与发现,有需求的时候调用zookeeper来发现可用的服务器,将任务均匀分配到各个服务器上去. 这样可以方便的随任务的繁重程度对服务器进行弹性扩容,客户端和服务端是非耦合的,也可以随时增加客户端. zk_server.py import threading import json import socket import sys from kazoo.client import KazooClient # TCP服务端绑定端口开启监听,同时将自己注册到z

  • Python内建函数之raw_input()与input()代码解析

    这两个均是 python 的内建函数,通过读取控制台的输入与用户实现交互.但他们的功能不尽相同.举两个小例子. >>> raw_input_A = raw_input("raw_input: ") raw_input: abc >>> input_A = input("Input: ") Input: abc Traceback(most recent call last): File "<pyshell#1>

  • Python自定义函数定义,参数,调用代码解析

    函数能提高应用的模块性,和代码的重复利用率.Python提供了许多内建函数,比如print()等.也可以创建用户自定义函数. 函数定义 函数定义的简单规则: 函数代码块以def关键词开头,后接函数标识符名称和圆括号(),任何传入参数和自变量必须放在圆括号中间 函数内容以冒号起始,并且缩进 若有返回值,Return[expression]结束函数:不带return表达式相当于返回None 函数通常使用三个单引号'''...'''来注释说明函数:函数体内容不可为空,可用pass来表示空语句:以下几个

  • Python Django 简单分页的实现代码解析

    这篇文章主要介绍了Python Django 简单分页的实现代码解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 models.py: from django.db import models class Book(models.Model): title = models.CharField(max_length=32) def __str__(self): return self.title class Meta: db_table =

  • 基于Python获取docx/doc文件内容代码解析

    这篇文章主要介绍了基于Python获取docx/doc文件内容代码解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 整体思路: 下载文件并修改后缀为zip文件,解压zip文件,所要获取的内容在固定的文件夹下:work/temp/word/document.xml 所用包,全部是python自带,不需要额外下载安装. # encoding:utf-8 import os import re import requests import zipf

  • Dubbo+zookeeper搭配分布式服务的过程详解

    目录 分布式架构: Dubbo 是什么 Dubbo: 思想: 依赖: 分布式架构: 1.当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服务,逐渐形成稳定的服务中心,前端应用能更快速的响应多变的市场需求. 2.此时,用于提高业务复用及整合的 分布式服务框架(RPC) 是关键. Dubbo 是什么 一款分布式服务框架 高性能和透明化的RPC远程服务调用方案 SOA服务治理方案 Dubbo: 作为分布式架构比较后的框架,同时也是比较容易入手的框架,适合作为分布式的入手框架,下

  • Python中sort和sorted函数代码解析

    本文研究的主要是Python中sort和sorted函数的相关内容,具体如下. 一.sort函数 sort函数是序列的内部函数 函数原型: L.sort(cmp=None, key=None, reverse=False) 函数作用: 它是把L原地排序,也就是使用后并不是返回一个有序的序列副本,而是把当前序列变得有序 参数说明: (1) cmp参数 cmp接受一个函数,拿整形举例,形式为: def f(a,b): return a-b 如果排序的元素是其他类型的,如果a逻辑小于b,函数返回负数:

  • python微信跳一跳游戏辅助代码解析

    这个代码实现的是   手动点击起点 和 终点  ,程序自动判断距离.触屏时间  完成跳跃 原理(摘自项目说明页面): 1. 将手机点击到"跳一跳"小程序界面: 2. 用Adb 工具获取当前手机截图,并用adb将截图pull上来: adb shell screencap -p /sdcard/1.png adb pull /sdcard/1.png . 3. 用matplot显示截图: 4. 用鼠标点击起始点和目标位置,计算像素距离: 5. 根据像素距离,计算按压时间: 6. 用Adb工

  • Python  word实现读取及导出代码解析

    2个简单的代码,帮你实现word的导出和word的读取 功能一:导出word,word中的内容为 代码: from docx import Document from docx.enum.text import WD_PARAGRAPH_ALIGNMENT #设置对象居中.对齐等. from docx.enum.text import WD_TAB_ALIGNMENT,WD_TAB_LEADER #设置制表符等 from docx.shared import Inches #设置图像大小 fro

  • Python实现快速大文件比较代码解析

    问题 假如,在有两个大文件分别存储了大量的数据,数据其实很简单就是一堆字符串,每行存储一条,如何快速筛选出两个文件的异同之处么,或者如何筛选出两个文件中不同的元素呢? 刚开始我是通过最简单的方法,利用for循环去一个个的判断,时间复杂度为m的n次幂,当然当文件数量级为十万或者百万时,速率简直慢到了极点. 解决方法 利用set()的different(方法)可快速比较,两个set集合的不同之处,也就是对集合进行数学运算 假设:数据1拥有858882条记录,数据2有360029条记录,快速挑选出数据

随机推荐