Django+Vue实现WebSocket连接的示例代码

近期有一需求:前端页面点击执行任务,实时显示后端执行情况,思考一波;发现 WebSocket 最适合做这件事。

效果

测试 ping www.baidu.com 效果

点击连接建立ws连接

后端实现

所需软件包

后端主要借助Django Channels 实现socket连接,官网文档链接

这里想实现每个连接进来加入组进行广播,所以还需要引入 channels-redis

pip

channels==2.2.0
channels-redis==2.4.0

引入

settings.py

INSTALLED_APPS = [
  'django.contrib.admin',
  'django.contrib.auth',
  'django.contrib.contenttypes',
  'django.contrib.sessions',
  'django.contrib.messages',
  'django.contrib.staticfiles',
  'rest_framework.authtoken',
  'rest_framework',
        ...
  'channels',
]

# Redis配置
REDIS_HOST = ENV_DICT.get('REDIS_HOST', '127.0.0.1')
REDIS_PORT = ENV_DICT.get('REDIS_PORT', 6379)
CHANNEL_LAYERS = {
  "default": {
    "BACKEND": "channels_redis.core.RedisChannelLayer",
    "CONFIG": {
      "hosts": [(REDIS_HOST, REDIS_PORT)],
    },
  },
}

代码

apps/consumers.py

新建一个消费处理

实现: 默认连接加入组,发送信息时的处理。

from channels.generic.websocket import WebsocketConsumer

class MyConsumer(WebsocketConsumer):
  def connect(self):
    """
    每个任务作为一个频道
    默认进入对应任务执行频道
    """
    self.job_name = self.scope['url_route']['kwargs']['job_name']
    self.job_group_name = 'job_%s' % self.job_name
    async_to_sync(self.channel_layer.group_add)(
      self.job_group_name,
      self.channel_name
    )
    self.accept()

  def disconnect(self, close_code):
    async_to_sync(self.channel_layer.group_discard)(
      self.job_group_name,
      self.channel_name
    )

  # job.message类型处理
  def job_message(self, event):

    # 默认发送收到信息
    self.send(text_data=event["text"])

apps/routing.py

ws类型路由

实现:ws/job/<job_name>由 MyConsumer 去处理。

from . import consumers
from django.urls import path
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.sessions import SessionMiddlewareStack

application = ProtocolTypeRouter({
  'websocket': SessionMiddlewareStack(
    URLRouter(
     [
       path('ws/job/<str:job_name>', consumers.MyConsumer)
     ]
    )
  ),
})

apps/views.py

在执行命令中获取 webSocket 消费通道,进行异步推送

  • 使用异步推送async_to_sync是因为在连接的时候采用的异步连接,所以推送必须采用异步推送。
  • 因为执行任务时间过长,启动触发运行时加入多线程,直接先返回ok,后端运行任务。
from subprocess import Popen,PIPE
import threading

def runPopen(job):
  """
  执行命令,返回popen
  """
  path = os.path
  Path = path.abspath(path.join(BASE_DIR, path.pardir))
  script_path = path.abspath(path.join(Path,'run.sh'))
  cmd = "sh %s %s" % (script_path, job)
  return Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)

def runScript(job):
  channel_layer = get_channel_layer()
  group_name = "job_%s" % job

  popen = runPopen(job)
  while True:
    output = popen.stdout.readline()
    if output == '' and popen.poll() is not None:
      break

    if output:
      output_text = str(output.strip())
      async_to_sync(
        channel_layer.group_send
        )(
          group_name,
          {"type": "job.message", "text": output_text}
        )
    else:
      err = popen.stderr.readline()
      err_text = str(err.strip())
      async_to_sync(
        channel_layer.group_send
        )(
          group_name,
          {"type": "job.message", "text": err_text}
        )
      break

class StartJob(APIView):
  def get(self, request, job=None):
    run = threading.Thread(target=runScript, args=(job,))
    run.start()
    return HttpResponse('ok')

apps/urls.py

get请求就能启动任务

urlpatterns = [
        ...
  path('start_job/<str:job>', StartJob.as_view())
]

前端实现

所需软件包

vue-native-websocket 

代码实现

plugins/vueNativeWebsocket.js

import Vue from 'vue'
import VueNativeSock from '../utils/socket/Main.js'

export default function ({ store }) {
 Vue.use(VueNativeSock, 'http://localhost:8000/ws/job', {connectManually: true,});
}

nuxt.config.js

配置文件引入, 这里我使用的是 nuxt 框架

 plugins: [
   {
    src: '@/plugins/vueNativeWebsocket.js',
    ***: false
   },
  ],

封装 socket

export default (connection_url, option) => {
  // 事件
  let event = ['message', 'close', 'error', 'open'];

  // 拷贝选项字典
  let opts = Object.assign({}, option);

  // 定义实例字典
  let instance = {

   // socket实例
   socket: '',

   // 是否连接状态
   is_conncet: false,

   // 具体连接方法
   connect: function() {
    if(connection_url) {
     let scheme = window.location.protocol === 'https:' ? 'wss' : 'ws'
     connection_url = scheme + '://' + connection_url.split('://')[1];
     this.socket = new WebSocket(connection_url);
     this.initEvent();
    }else{
     console.log('wsurl為空');
    }
   },

   // 初始化事件
   initEvent: function() {
    for(let i = 0; i < event.length; i++){
     this.addListener(event[i]);
    }
   },

   // 判断事件
   addListener: function(event) {
    this.socket.addEventListener(event, (e) => {
     switch(event){
      case 'open':
       this.is_conncet = true;
       break;
      case 'close':
       this.is_conncet = false;
       break;
     }
     typeof opts[event] == 'function' && opts[event](e);
    });
   },

   // 发送方法,失败则回调
   send: function(data, closeCallback) {
    console.log('socket ---> ' + data)
    if(this.socket.readyState >= 2) {
     console.log('ws已经关闭');
     closeCallback && closeCallback();
    }else{
     this.socket.send(data);
    }
   }

  };

  // 调用连接方法
  instance.connect();
  return instance;
 }

index.vue

具体代码

x2Str 方法,因为后端返回的是bytes,格式 b'xxx' ,编写了方法对其进行转换。

<template>
    <div>

        <el-button type="primary" @click="runFunction" >执行</el-button>
        <el-button type="primary" @click="connectWebSock" >显示</el-button>

  <div class="socketView">
   <span v-for="i in socketMessage" :key="i">{{i}}</span>
  </div>
 </div>
</template>
<script>
 import R from '@/plugins/axios';
 import ws from '@/plugins/socket'
 export default {
  data() {
   return {
    webSocket: '',
    socketMessage: [],
   }
  },

    methods: {
     // 打开连接的处理
   openSocket(e) {
    if (e.isTrusted) {
     const h = this.$createElement;
     this.$notify({
      title: '提示',
      message: h('i', { style: 'color: teal'}, '已建立Socket连接')
     });
    }
   },

  // 连接时的处理
  listenSocket(e) {
   if (e.data){
    this.socketMessage.push(this.x2Str(e.data))
   }
  },

  // 连接webSocket
        connectWebSock() {
   let wsuri = process.env.BACKEND_URL + '/ws/job/' + this.selectFunctions
   this.webSocket = ws(wsuri, {
    open: e => this.openSocket(e),
    message: e => this.listenSocket(e),
    close: e => this.closeSocket(e)
   })
  },

     // 转码
  x2Str(str) {
   if (str) {
    let reg = new RegExp("(?<=^b').*(?='$)")
    let result = str.replace(/(?:\\x[\da-fA-F]{2})+/g, m =>
     decodeURIComponent(m.replace(/\\x/g, '%'))
    )
    return reg.exec(result)[0]
   }
  },

  // 执行方法
  runFunction() {
   R.myRequest('GET','api/start_job/' + this.selectFunctions, {}, {}).then((response) => {
    if (response.hasOwnProperty('response')){
      this.$message({
      type: 'error',
      message: '服务端返回错误,返回码:' + response.response.status
      });
    };
    if (response.data == 'ok') {
      this.$message({
       type: 'success',
       message: '开始执行[' + this.selectFunctions + ']'
      });
    }
   });
  }
  }
}
</script>

至此,实现前后端 websocket 通讯。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 基于django channel实现websocket的聊天室的方法示例

    websocket 网易聊天室? ​ web微信? ​ 直播? 假如你工作以后,你的老板让你来开发一个内部的微信程序,你需要怎么办?我们先来分析一下里面的技术难点 消息的实时性? 实现群聊 现在有这样一个需求,老板给到你了,关乎你是否能转正?你要怎么做? 我们先说消息的实时性,按照我们目前的想法是我需要用http协议来做,那么http协议怎么来做那? 是不是要一直去访问我们的服务器,问服务器有没有人给我发消息,有没有人给我发消息?那么大家认为我多长时间去访问一次服务比较合适那? 1分钟1次?1分

  • 在vue中使用SockJS实现webSocket通信的过程

    最近接到一个业务需求,需要做一个聊天信息的实时展示的界面,这就需要和服务器端建立webSocket连接,从而实现数据的实时获取和视图的实时刷新.在此将我的实现记录下来,希望可以给有同样需求的人一些帮助.废话少说,下面我就来讲一下我的实现过程: socksjs •客户端和服务器端api尽可能简洁,尽量靠近websocket api •支持服务端扩展和负载均衡技术 •传输层应该全面支持跨域通信 •如果受到代理服务器的限制,传输层能优雅地从一种方式回退到另一种方式 •尽可能快地建立连接 •客户端只是纯

  • Django+Vue实现WebSocket连接的示例代码

    近期有一需求:前端页面点击执行任务,实时显示后端执行情况,思考一波:发现 WebSocket 最适合做这件事. 效果 测试 ping www.baidu.com 效果 点击连接建立ws连接 后端实现 所需软件包 后端主要借助Django Channels 实现socket连接,官网文档链接 这里想实现每个连接进来加入组进行广播,所以还需要引入 channels-redis . pip channels==2.2.0 channels-redis==2.4.0 引入 settings.py INS

  • django+vue实现注册登录的示例代码

    注册 前台利用vue中的axios进行传值,将获取到的账号密码以form表单的形式发送给后台. form表单的作用就是采集数据,也就是在前台页面中获取用户输入的值.numberValidateForm:前台定义的表单 $axios使用时需要在main.js中全局注册,.then代表成功后进行的操作,.catch代表失败后进行的操作 submitForm(formName) { let data = new FormData() data.append('username',this.number

  • django+vue实现跨域的示例代码

    目录 版本 django实现跨域 1.安装django-cors-headers库 2.修改项目配置文件项目/settings.py 3.前端vue使用axios访问后端django提供的数据接口,安装axios 4.前端vue配置axios插件,修改src/main.js 5.在XX.vue中跨域请求数据 版本 Django 2.2.3Python 3.8.8djangorestframework 3.13.1django-cors-headers 3.11.0 django实现跨域 说明:此

  • vue 运用mock数据的示例代码

    本文介绍了vue 运用mock数据的示例代码,分享给大家,具体如下: 初始化你的项目 话不用啰嗦,首先初始化你的项目,最简单的就是使用vue-cli啦 vue init webpack 引入mock.js 安装 mockjs npm install --save-dev mockjs 引入到Vue原型上,方便使用 import mockjs from 'mockjs' Vue.prototype.$mock = Vue.$mock = mockjs.mock 以上引入到Vue原型上,可以使用 t

  • 如何在vue中使用ts的示例代码

    本文介绍了如何在vue中使用ts的示例代码,分享给大家,具体如下: 注意:此文并不是把vue改为全部替换为ts,而是可以在原来的项目中植入ts文件,目前只是实践阶段,向ts转化过程中的过渡. ts有什么用? 类型检查.直接编译到原生js.引入新的语法糖 为什么用ts? TypeScript的设计目的应该是解决JavaScript的"痛点":弱类型和没有命名空间,导致很难模块化,不适合开发大型程序.另外它还提供了一些语法糖来帮助大家更方便地实践面向对象的编程. typescript不仅可

  • goland 实现websocket server的示例代码

    采用go 实现的websocket,已经调试通过在此记录. 测试工具网址:https://www.idcd.com/tool/socket 话不多说上全部代码: package main import ( "fmt" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "net/http" uuid "github.com/satori/go.uuid&

  • vue使用websocket概念及示例

    目录 概念部分: 使用示例 概念部分: 1,WebSocket 是 HTML5 提供的 TCP 连接上进行全双工通讯的协议.一次握手之后,服务器和客户端可以互相主动通信,双向传输数据. 2,浏览器想服务器发送请求,建立连接之后,可通过send()方法想服务器发送数据,并通过message事件接受服务器返回的数据. 使用示例 <script> export default { mounted() { this.connectWebsocket(); }, methods: { connectWe

  • vue生成随机验证码的示例代码

    本文介绍了vue生成随机验证码的示例代码,分享给大家,具体如下: 样式自调,最终效果如图: 实现效果: 点击右边input框会自动切换,如果输入的值与字不同,则清空换一串随机数 HTML <input type="text" placeholder="请输入验证码" class="yanzhengma_input" @blur="checkLpicma" v-model="picLyanzhengma"

  • vue实现图片滚动的示例代码(类似走马灯效果)

    上次写了一个简单的图片轮播,这个相当于在上面的一些改进.这个组件除了可以进行图片滚动外,也可以嵌入任何内容的标签进行滚动,里面用了slot进行封装. 父: <template> <div id="app"> <er-carousel-index :typeNumber=2 :pageNumber=3 :timeSpace=2 :duration=2 :isOrNotCircle="true" url="/src/js/inde

  • 2分钟实现一个Vue实时直播系统的示例代码

    前言 我们在不敲代码的时候可能会去看游戏直播,那么是前台怎么实现的呢?下面我们来讲一下. 第一步,购买云直播服务 首先,你必须去阿里云或者腾讯云注册一个直播服务.也花不了几个钱,练手的话,几十块钱就够了. 这里我拿阿里云举例,购买完了,配置好推流域名跟播流域名,下面我们将进行地址生成.记住下面生成的地址,下面会用到. 第二步,下载本地推流工具 https://obsproject.com/ 第三步,设置OBS 在第一步中图片底部有推流地址,需要注意,分为两部分填入下方图所示. 在AppName字

随机推荐