Go语言实战之实现一个简单分布式系统

目录
  • 引子
  • 思路
  • 实战
    • 节点通信
    • 主节点
    • 工作节点
    • 将它们放在一起
    • 代码效果
  • 总结

引子

如今很多云原生系统、分布式系统,例如 Kubernetes,都是用 Go 语言写的,这是因为 Go 语言天然支持异步编程,而且静态语言能保证应用系统的稳定性。笔者的开源项目 Crawlab 作为爬虫管理平台,也应用到了分布式系统。本篇文章将介绍如何用 Go 语言编写一个简单的分布式系统。

思路

在开始写代码之前,我们先思考一下需要实现些什么。

  • 主节点(Master Node):中控系统,相当于军队中的指挥官,派发任务命令
  • 工作节点(Worker Node):执行者,相当于军队中的士兵,执行任务

除了上面的概念以外,我们需要实现一些简单功能。

  • 上报运行状态(Report Status):工作节点向主节点上报当前状态
  • 分派任务(Assign Task):通过 API 向主节点发起请求,主节点再向工作节点分派任务
  • 运行脚本(Execute Script):工作节点执行任务中的脚本

整个流程示意图如下。

实战

节点通信

节点之间的通信在分布式系统中非常重要,毕竟每个节点或机器如果孤立运行,就失去了分布式系统的意义。因此,节点通信在分布式系统中是核心模块。

gRPC 协议

首先,我们来想一下,如何让节点之间进行相互通信。最常用的通信方式就是 API,不过这个通信方式有个缺点,就是需要将各个节点的 IP 地址及端口显示暴露给其他节点,这在公网中是不太安全的。因此,我们选择了 gRPC,一种流行的远程过程调用(Remote Procedure Call,RPC)框架。这里我们不过多的解释 RPC 或 gRPC 的原理,简而言之,就是能让调用者在远程机器上执行命令的协议方式。

为了使用 gRPC 框架,我们先创建 go.mod 并输入以下内容,并执行 go mod download。注意:对于国内的朋友,或许需要添加代理才能正常下载,可以先执行 export GOPROXY=goproxy.cn,direct 后再执行下载命令。

module go-distributed-system
​
go 1.17
​
require (
  github.com/golang/protobuf v1.5.0
  google.golang.org/grpc v1.27.0
  google.golang.org/protobuf v1.27.1
)

然后,我们创建 Protocol Buffers 文件 node.proto(表示节点对应的 gRPC 协议文件),并输入以下内容。

syntax = "proto3";
​
package core;
option go_package = ".;core";
​
message Request {
  string action = 1;
}
​
message Response {
  string data = 1;
}
​
service NodeService {
  rpc ReportStatus(Request) returns (Response){};       // Simple RPC
  rpc AssignTask(Request) returns (stream Response){};  // Server-Side RPC
}

在这里我们创建了两个 RPC 服务,分别是负责上报状态的 Simple RPC ReportStatus 以及 Server-Side RPC AssignTask。Simple RPC 和 Server-Side RPC 的区别如下图所示,主要区别在于 Server-Side RPC 可以从通过流(Stream)向客户端(Client)主动发送数据,而 Simple RPC 只能从客户端向服务端(Server)发请求。

创建好 .proto 文件后,我们需要将这个 gRPC 协议文件转化为 .go 代码文件,从而能被 Go 程序引用。在命令行窗口中执行如下命令。注意:编译工具 protoc 不是自带的,需要单独下载,具体可以参考文档 https://grpc.io/docs/protoc-installation/

mkdir core
protoc --go_out=./core \
    --go-grpc_out=./core \
    node.proto

执行完后,可以在 core 目录下看到两个 Go 代码文件, node.pb.gonode_grpc.pb.go,这相当于 Go 程序中对应的 gRPC 库。

gRPC 服务端

现在开始编写服务端逻辑。

咱们先创建一个新文件 core/node_service_server.go,输入以下内容。主要逻辑就是实现了之前创建好的 gRPC 协议中的两个调用方法。其中,暴露了 CmdChannel 这个通道(Channel)来获取需要发送到工作节点的命令。

package core
​
import (
  "context"
)
​
type NodeServiceGrpcServer struct {
  UnimplementedNodeServiceServer
​
  // channel to receive command
  CmdChannel chan string
}
​
func (n NodeServiceGrpcServer) ReportStatus(ctx context.Context, request *Request) (*Response, error) {
  return &Response{Data: "ok"}, nil
}
​
func (n NodeServiceGrpcServer) AssignTask(request *Request, server NodeService_AssignTaskServer) error {
  for {
    select {
    case cmd := <-n.CmdChannel:
      // receive command and send to worker node (client)
      if err := server.Send(&Response{Data: cmd}); err != nil {
        return err
      }
    }
  }
}
​
var server *NodeServiceGrpcServer
​
// GetNodeServiceGrpcServer singleton service
func GetNodeServiceGrpcServer() *NodeServiceGrpcServer {
  if server == nil {
    server = &NodeServiceGrpcServer{
      CmdChannel: make(chan string),
    }
  }
  return server
}

gRPC 客户端

gRPC 客户端不需要具体实现,我们通常只需要调用 gRPC 客户端的方法,程序会自动发起向服务端的请求以及获取后续的响应。

主节点

编写好了节点通信的基础部分,现在我们需要实现主节点了,这是整个中心化分布式系统的核心。

咱们创建一个新的文件 node.go,输入以下内容。

package core
​
import (
  "github.com/gin-gonic/gin"
  "google.golang.org/grpc"
  "net"
  "net/http"
)
​
// MasterNode is the node instance
type MasterNode struct {
  api     *gin.Engine            // api server
  ln      net.Listener           // listener
  svr     *grpc.Server           // grpc server
  nodeSvr *NodeServiceGrpcServer // node service
}
​
func (n *MasterNode) Init() (err error) {
  // TODO: implement me
  panic("implement me")
}
​
func (n *MasterNode) Start() {
  // TODO: implement me
  panic("implement me")
}
​
var node *MasterNode
​
// GetMasterNode returns the node instance
func GetMasterNode() *MasterNode {
  if node == nil {
    // node
    node = &MasterNode{}
​
    // initialize node
    if err := node.Init(); err != nil {
      panic(err)
    }
  }
​
  return node
}

其中,我们创建了两个占位方法 InitStart,我们分别实现。

在初始化方法 Init 中,我们需要做几件事情:

  • 注册 gRPC 服务
  • 注册 API 服务

现在,在 Init 方法中加入如下代码。

func (n *MasterNode) Init() (err error) {
  // grpc server listener with port as 50051
  n.ln, err = net.Listen("tcp", ":50051")
  if err != nil {
    return err
  }
​
  // grpc server
  n.svr = grpc.NewServer()
​
  // node service
  n.nodeSvr = GetNodeServiceGrpcServer()
​
  // register node service to grpc server
  RegisterNodeServiceServer(node.svr, n.nodeSvr)
​
  // api
  n.api = gin.Default()
  n.api.POST("/tasks", func(c *gin.Context) {
    // parse payload
    var payload struct {
      Cmd string `json:"cmd"`
    }
    if err := c.ShouldBindJSON(&payload); err != nil {
      c.AbortWithStatus(http.StatusBadRequest)
      return
    }
​
    // send command to node service
    n.nodeSvr.CmdChannel <- payload.Cmd
​
    c.AbortWithStatus(http.StatusOK)
  })
​
  return nil
}

可以看到,我们新建了一个 gRPC Server,并将之前的 NodeServiceGrpcServer 注册了进去。另外,我们还用 gin 框架创建了一个简单的 API 服务,可以 POST 请求到 /tasksNodeServiceGrpcServer 中的命令通道 CmdChannel 传送命令。这样就将各个部件串接起来了!

启动方法 Start 很简单,就是启动 gRPC Server 以及 API Server。

func (n *MasterNode) Start() {
  // start grpc server
  go n.svr.Serve(n.ln)
​
  // start api server
  _ = n.api.Run(":9092")
​
  // wait for exit
  n.svr.Stop()
}

下一步,我们就要实现实际做任务的工作节点了。

工作节点

现在,我们创建一个新文件 core/worker_node.go,输入以下内容。

package core
​
import (
  "context"
  "google.golang.org/grpc"
  "os/exec"
)
​
type WorkerNode struct {
  conn *grpc.ClientConn  // grpc client connection
  c    NodeServiceClient // grpc client
}
​
func (n *WorkerNode) Init() (err error) {
  // connect to master node
  n.conn, err = grpc.Dial("localhost:50051", grpc.WithInsecure())
  if err != nil {
    return err
  }
​
  // grpc client
  n.c = NewNodeServiceClient(n.conn)
​
  return nil
}
​
func (n *WorkerNode) Start() {
  // log
  fmt.Println("worker node started")
​
  // report status
  _, _ = n.c.ReportStatus(context.Background(), &Request{})
​
  // assign task
  stream, _ := n.c.AssignTask(context.Background(), &Request{})
  for {
    // receive command from master node
    res, err := stream.Recv()
    if err != nil {
      return
    }
​
    // log command
    fmt.Println("received command: ", res.Data)
​
    // execute command
    parts := strings.Split(res.Data, " ")
    if err := exec.Command(parts[0], parts[1:]...).Run(); err != nil {
      fmt.Println(err)
    }
  }
}
​
var workerNode *WorkerNode
​
func GetWorkerNode() *WorkerNode {
  if workerNode == nil {
    // node
    workerNode = &WorkerNode{}
​
    // initialize node
    if err := workerNode.Init(); err != nil {
      panic(err)
    }
  }
​
  return workerNode
}

其中,我们在初始化方法 Init 中创建了gRPC 客户端,并连接了主节点的 gRPC 服务端。

在启动方法 Start 中做了几件事情:

  • 调用上报状态(Report Status)的 Simple RPC 方法
  • 调用分配任务(Assign Task)的 Server-Side RPC 方法,获取到了流(Stream)
  • 通过循环不断接受流传输过来的来自服务端(也就是主节点)的信息,并执行命令

这样,整个包含主节点、工作节点的分布式系统核心逻辑就写好了!

将它们放在一起

最后,我们需要将这些核心逻辑用命令行工具封装一下,以便启用。

创建主程序文件 main.go,并输入以下内容。

package main
​
import (
  "go-distributed-system/core"
  "os"
)
​
func main() {
  nodeType := os.Args[0]
  switch nodeType {
  case "master":
    core.GetMasterNode().Start()
  case "worker":
    core.GetWorkerNode().Start()
  default:
    panic("invalid node type")
  }
}

这样,整个简单的分布式系统就创建好了!

代码效果

下面我们来运行一下代码。

打开两个命令行窗口,其中一个输入 go run main.go master 启动主节点,另一个输入 go run main.go worker 启动工作节点。

如果主节点启动成功,将会看到如下日志信息。

[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.
​
[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
 - using env:   export GIN_MODE=release
 - using code:  gin.SetMode(gin.ReleaseMode)
​
[GIN-debug] POST   /tasks                    --> go-distributed-system/core.(*MasterNode).Init.func1 (3 handlers)
[GIN-debug] [WARNING] You trusted all proxies, this is NOT safe. We recommend you to set a value.
Please check https://pkg.go.dev/github.com/gin-gonic/gin#readme-don-t-trust-all-proxies for details.
[GIN-debug] Listening and serving HTTP on :9092

如果工作节点启动成功,将会看到如下日志信息。

worker node started

主节点、工作节点都启动成功后,我们在另外一个命令行窗口中输入如下命令来发起 API 请求。

curl -X POST \
  -H "Content-Type: application/json" \
  -d '{"cmd": "touch /tmp/hello-distributed-system"}' \
  http://localhost:9092/tasks

在工作节点窗口应该可以看到日志 received command: touch /tmp/hello-distributed-system

然后查看文件是否顺利生成,执行 ls -l /tmp/hello-distributed-system

-rw-r--r--  1 marvzhang  wheel     0B Oct 26 12:22 /tmp/hello-distributed-system

文件成功生成,表示已经通过工作节点执行成功了!大功告成!

总结

本篇文章通过 RPC 框架 gRPC 以及 Go 语言自带的 Channel,将节点串接起来,开发出了一个简单的分布式系统。所用到的核心库和技术:

整个代码示例仓库在 GitHub 上: https://github.com/tikazyq/codao-code/tree/main/2022-10/go-distributed-system

到此这篇关于Go语言实战之实现一个简单分布式系统的文章就介绍到这了,更多相关Go语言分布式系统内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Go开源项目分布式唯一ID生成系统

    目录 前言 项目背景 项目使用 HTTP 方式 gRPC 方式 本地开发 项目架构 前言 今天跟大家介绍一个开源项目:id-maker,主要功能是用来在分布式环境下生成唯一 ID.上周停更了一周,也是用来开发和测试这个项目的相关代码. 美团有一个开源项目叫 Leaf,使用 Java 开发.本项目就是在此思路的基础上,使用 Go 开发实现的. 项目整体代码量并不多,不管是想要在实际生产环境中使用,还是想找个项目练手,我觉得都是一个不错的选择. 项目背景 在大部分系统中,全局唯一 ID 都是一个强需

  • Go实现分布式系统高可用限流器实战

    目录 前言 1. 问题描述 2. 信号量限流 2.1 阻塞方式 2.2 非阻塞方式 3. 限流算法 3.1 漏桶算法 3.2 令牌桶算法 3.3 漏桶算法的实现 改进 4. Uber 开源实现 RateLimit 深入解析 4.1 引入方式 4.2 使用 构造限流器 限流器Take() 阻塞方法 第一版本 第二版本 小结 前言 限流器,顾名思义用来对高并发的请求进行流量限制的组件. 限流包括 Nginx 层面的限流以及业务代码逻辑上的限流.流量的限制在众多微服务和 service mesh 中多

  • golang实现简易的分布式系统方法

    本文介绍了golang实现简易的分布式系统方法,分享给大家,具体如下: 功能 能够发送/接收请求和响应 能够连接到集群 如果无法连接到群集(如果它是第一个节点),则可以作为主节点启动节点 每个节点有唯一的标识 能够在节点之间交换json数据包 接受命令行参数中的所有信息(将来在我们系统升级时将会很有用) 源码 package main import ( "fmt" "strconv" "time" "math/rand" &q

  • Go语言实战之实现一个简单分布式系统

    目录 引子 思路 实战 节点通信 主节点 工作节点 将它们放在一起 代码效果 总结 引子 如今很多云原生系统.分布式系统,例如 Kubernetes,都是用 Go 语言写的,这是因为 Go 语言天然支持异步编程,而且静态语言能保证应用系统的稳定性.笔者的开源项目 Crawlab 作为爬虫管理平台,也应用到了分布式系统.本篇文章将介绍如何用 Go 语言编写一个简单的分布式系统. 思路 在开始写代码之前,我们先思考一下需要实现些什么. 主节点(Master Node):中控系统,相当于军队中的指挥官

  • Go语言实现一个简单的并发聊天室的项目实战

    目录 写在前面 并发聊天服务器 具体代码 服务端 客户端 总结 写在前面 Go语言在很多方面天然的具备很多便捷性,譬如网络编程,并发编程.而通道则又是Go语言实现并发编程的重要工具,因为其承担着通道之间互相通信的重任.并且因为其本身就是并发安全的,所以在某些场景下是非常好用的. 并发聊天服务器 这里主要是实现一个简单的并发聊天服务器.首先,客户端可以在服务器中注册自己的信息(登录以及退出),客户端发出的所有的信息由服务器向各个客户端进行转发,或者换句话说是广播. 具体代码 服务端 说的再多,没有

  • go语言实现一个简单的http客户端抓取远程url的方法

    本文实例讲述了go语言实现一个简单的http客户端抓取远程url的方法.分享给大家供大家参考.具体实现方法如下: 复制代码 代码如下: package main import (  "fmt"  "log"  "net/http"  "net/url"  "io/ioutil" ) func main() { resp, err := http.Get("http://www.google.co.

  • Go语言实现的一个简单Web服务器

    Web是基于http协议的一个服务,Go语言里面提供了一个完善的net/http包,通过http包可以很方便的就搭建起来一个可以运行的Web服务.同时使用这个包能很简单地对Web的路由,静态文件,模版,cookie等数据进行设置和操作. http包建立Web服务器 复制代码 代码如下: package main import (     "fmt"     "net/http"     "strings"     "log"

  • 分析C语言一个简单程序

    首先给大家一个简单的例子,让读者有个整体的认识,代码如下: #include <stdio.h> int main() { puts("我们"); return 0; } 函数的概念 先来看第4行代码,这行代码会在显示器上输出"我们".前面我们已经讲过,puts 后面要带( ),字符串也要放在( )中. 在C语言中,有的语句使用时不能带括号,有的语句必须带括号.带括号的称为函数(Function) . C语言提供了很多功能,例如输入输出.获得日期时间.文

  • 用C语言来实现一个简单的虚拟机

    必要的准备工作及注意事项: 在开始之前需要做以下工作: 一个C编译器--我使用了 clang 3.4,也可以用其它支持 c99/c11 的编译器: 文本编辑器--我建议使用基于IDE的文本编辑器,我使用 Emacs; 基础编程知识--最基本的变量,流程控制,函数,数据结构等: Make 脚本--能使程序更快一点. 为什么要写个虚拟机? 有以下原因: 想深入了解计算机工作原理.本文将帮助你了解计算机底层如何工作,虚拟机提供简洁的抽象层,这不就是一个最好的学习它们原理的方法吗? 更深入了解一些编程语

  • 利用 Go 语言编写一个简单的 WebSocket 推送服务

    本文中代码可以在 github.com/alfred-zhong/wserver获取. 背景 最近拿到需求要在网页上展示报警信息.以往报警信息都是通过短信,微信和 App 推送给用户的,现在要让登录用户在网页端也能实时接收到报警推送. 依稀记得以前工作的时候遇到过类似的需求.因为以前的浏览器标准比较陈旧,并且那时用 Java 较多,所以那时候解决这个问题就用了 Comet4J.具体的原理就是长轮询,长链接.但现在毕竟 html5 流行开来了,IE 都被 Edge 接替了,再用以前这种技术就显得过

  • C 语言实现一个简单的 web 服务器的原理解析

    说到 web 服务器想必大多数人首先想到的协议是 http,那么 http 之下则是 tcp,本篇文章将通过 tcp 来实现一个简单的 web 服务器. 本篇文章将着重讲解如何实现,对于 http 与 tcp 的概念本篇将不过多讲解. 一.了解 Socket 及 web 服务工作原理 既然是基于 tcp 实现 web 服务器,很多学习 C 语言的小伙伴可能会很快的想到套接字 socket.socket 是一个较为抽象的通信进程,或者说是主机与主机进行信息交互的一种抽象.socket 可以将数据流

  • C语言实现一个简单的扫雷游戏

    前言 扫雷跟上一篇文章的三子棋一样,是C语言基础知识的综合运用的实例,对于巩固我们的基础知识非常重要,同时扫雷作为C语言的一个小项目,锻炼我们的编程思维,也是一个不可多得的实践. 提示:以下是本篇文章正文内容 一.扫雷的基本思路 1.用C语言实现简单的扫雷,我们需要创建两个数组,一个数组存放雷的信息,另外一个数组存放排雷后结果的信息. 2.在创建数组时候,需要注意的是数组需要大一圈,什么意思?举个例子,比如说我们实现的是9 ×9的扫雷,那么我们的数组就得创建10×10.为什么呢? 原因如下: 因

  • Go语言实现一个简单生产者消费者模型

    目录 一.生产者消费者模型 二.Go语言实现 1.无缓冲channel 2.有缓冲channel 三.实际应用 简介:介绍生产者消费者模型,及go简单实现的demo. 一.生产者消费者模型 生产者消费者模型:某个模块(函数等〉负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.协程.线程.进程等).产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者. 单单抽象出生产者和消费者,还够不上是生产者消费者模型.该模式还需要有一个缓冲区处于生产者和消费者之间

随机推荐