基于asyncio 异步协程框架实现收集B站直播弹幕

前言

虽然标题是全站,但目前只做了等级 top 100 直播间的全天弹幕收集。

弹幕收集系统基于之前的B 站直播弹幕姬 Python 版修改而来。具体协议分析可以看上一篇文章。

直播弹幕协议是直接基于 TCP 协议,所以如果 B 站对类似我这种行为做反制措施,比较困难。应该有我不知道的技术手段来检测类似我这种恶意行为。

我试过同时连接 100 个房间,和连接单个房间 100 次的实验,都没有问题。>150 会被关闭链接。

直播间的选取

现在弹幕收集系统在选取直播间上比较简单,直接选取了等级 top100。

以后会修改这部分,改成定时去 http://live.bilibili.com/all 查看新开播的直播间,并动态添加任务。

异步任务和弹幕存储

收集系统仍旧使用了 asyncio 异步协程框架,对于每一个直播间都使用如下方法来加进 loop 中。

danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
task1 = asyncio.ensure_future(danmuji.connectServer())
task2 = asyncio.ensure_future(danmuji.HeartbeatLoop())

其实若将心跳任务 HeartbeatLoop 放入 connectorServer 中去启动,代码看起来更优雅一些。但这么做是因为我需要维护一个任务列表,后面会有描述。

在弹幕存储上我花了些时间选择。

数据库存储是一个同步 IO 的过程,Insert 的时候会阻塞弹幕收集的任务。虽然有 aiomysql 这种异步接口,但配置数据库太麻烦,我的设想是这个小系统能够方便地部署。

最终我选择使用自带的 sqlite3。但 sqlite3 无法做并行操作,故开了一个线程单独进行数据库存储。在另一个线程中,100 * 2 个任务搜集所有的弹幕、人数信息,并塞进队列 commentq, numq 中。存储线程每隔 10s 唤醒一次,将队列中的数据写进 sqlite3 中,并清空队列。

在多线程和异步的配合下,网络流量没有被阻塞。

可能的连接失败场景处理

弹幕协议是直接基于 TCP,位与位直接关联性较强,一旦解析错误,很容易就抛 Exception(个人感觉,虽然 TCP 是可靠传输,但B站服务器自身发生错误也是有可能的)。所以有必要设计一个自动重连机制。

在 asyncio 文档中提到,

Done means either that a result / exception are available, or that the future was cancelled.

函数正常返回、抛出异常或者是被 cancel,都会退出当前任务。可以使用 done() 来判断。

每一个直播间对应两个任务,解析任务是最容易挂的,但并不会影响心跳任务,所以必须找出并将对应心跳任务结束。
在创建任务的时候使用字典记录每个房间的两个任务,

self.tasks[url] = [task1, task2]

在运行过程中,每隔 10s 做一次检查,

for url in self.tasks:
  item = self.tasks[url]
  task1 = item[0]
  task2 = item[1]
  if task1.done() == True or task2.done() == True:
    if task1.done() == False:
      task1.cancel()
    if task2.done() == False:
      task2.cancel()
    danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
    task11 = asyncio.ensure_future(danmuji.connectServer())
    task22 = asyncio.ensure_future(danmuji.HeartbeatLoop())
    self.tasks[url] = [task11, task22]

实际我只见过一次任务失败的场景,是因为主播房间被封了,导致无法进入直播间。

结论

  1. B站人数是按照连接弹幕服务器的链接数量统计的。通过操纵链接量,可以瞬间增加任意人数观看,有商机?
  2. 运行的这几天中,发现即使大部分房间不在直播,也能有 >5 的人数,包括凌晨。我只能猜测也有和我一样的人在 24h 收集弹幕。
  3. top100 平均一天 40M 弹幕数据。
  4. 收集的弹幕能做什么?还没想好,可能可以拿来做用户行为分析 -_^

最后附上本源码的GITHUB地址 https://github.com/lyyyuna/bilibili_danmu_colloector

(0)

相关推荐

  • 又一枚精彩的弹幕效果jQuery实现

    简易弹幕效果:将发布的内容随机显示在弹幕右侧,逐渐左移最后消失. 涉及知识点:val().random().height().css().append().remove()等,主要是元素的操作 html代码: <a href="#">弹幕技术</a> <div class="mask"> <a href="#" class="button">X</a> </di

  • IOS 实现简单的弹幕功能

    前言 简单实现弹幕功能,表跟我谈效率,但也有用队列控制同时弹的数量. 正文 代码实现: let DANMAKU_SPEED: CGFloat = 150 // 弹幕每秒移动速度 let DANMAKU_SPACE_TIME: NSTimeInterval = 1 // 弹幕之间的时间间隔 let DANMAKU_MAX_ROW = 3 // 最多同时弹幕行数 let danmakuFont = UIFont.systemFontOfSize(18) // 弹幕字体大小 var rowArray

  • JavaScript直播评论发弹幕切图功能点集合效果代码

    一.代码 html+js <!DOCTYPE HTML> <html> <head> <meta charset="utf-8"> <title>数发直播平台</title> <link rel="stylesheet" type="text/css" href="css/common.css"> <link rel="styl

  • 实例解析如何在Android应用中实现弹幕动画效果

    在B站或者其他视频网站看视频时,常常会打开弹幕效果,边看节目边看大家的吐槽.弹幕看起来很有意思,今天我们就来实现一个简单的弹幕效果. 从直观上,弹幕效果就是在一个ViewGroup上增加一些View,然后让这些View移动起来.所以,整体的实现思路大概是这样的: 1.定义一个RelativeLayout,在里面动态添加TextView. 2.这些TextView的字体大小.颜色.移动速度.初始位置都是随机的. 3.将TextView添加到RelativeLayout的右边缘,每隔一段时间添加一个

  • Android实现自定义的弹幕效果

    一.效果图 先来看看效果图吧~~ 二.实现原理方案 1.自定义ViewGroup-XCDanmuView,继承RelativeLayout来实现,当然也可以继承其他三大布局类哈 2.初始化若干个TextView(弹幕的item View,这里以TextView 为例,当然也可以其他了~),然后通过addView添加到自定义View中 3.通过addView添加到XCDanmuView中,位置在坐标,为了实现 从屏幕外移动进来的效果 我们还需要修改添加进来TextView的位置,以从右向左移动方向

  • bilibili弹幕转ass程序制作思路及过程

    b站的弹幕,线下播放还是挺麻烦的,专用的弹幕播放器对其他格式的视频支持不好.我也试着弄个弹幕转字幕的小程序出来. 抓取xml文件的工作就不多说了,很简单的事,只要在播放页面看看源文件就能确定xml文件的地址进行抓取了. 本文主要是讲述xml内的弹幕转字幕的过程. 除去xml文件开头结尾的一些七七八八的东西,弹幕主体是这样的: <d p="51.593,5,25,16711680,1408852480,0,7fa769b4,576008622">怒求 up 自己配音!<

  • 终于实现了!精彩的jquery弹幕效果

    本文实例为大家分享了jquery弹幕效果,供大家参考,具体内容如下 页面效果如下: html页面如下: <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"

  • 基于jquery实现弹幕效果

    用js写的一个弹幕 效果图: 源码: <html> <head> <meta charset="utf-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1"&

  • 基于asyncio 异步协程框架实现收集B站直播弹幕

    前言 虽然标题是全站,但目前只做了等级 top 100 直播间的全天弹幕收集. 弹幕收集系统基于之前的B 站直播弹幕姬 Python 版修改而来.具体协议分析可以看上一篇文章. 直播弹幕协议是直接基于 TCP 协议,所以如果 B 站对类似我这种行为做反制措施,比较困难.应该有我不知道的技术手段来检测类似我这种恶意行为. 我试过同时连接 100 个房间,和连接单个房间 100 次的实验,都没有问题.>150 会被关闭链接. 直播间的选取 现在弹幕收集系统在选取直播间上比较简单,直接选取了等级 to

  • python 中的 asyncio 异步协程

    目录 一.定义协程 二.运行协程 三.协程回调 四.运行多个协程 五.run_forever 六.多协程中关闭run_forever 一.定义协程 asyncio 执行的任务,称为协程,但是Asyncio 并不能带来真正的并行 Python 的多线程因为 GIL(全局解释器锁)的存在,也不能带来真正的并行 import asyncio # 通过 async 定义一个协程 async def task(): print('这是一个协程') # 判断是否是一个协程,返回True print(asyn

  • 关于Python核心框架tornado的异步协程的2种方法详解

    什么是异步? 含义 :双方不需要共同的时钟,也就是接收方不知道发送方什么时候发送,所以在发送的信息中就要有提示接收方开始接收的信息,如开始位,同时在结束时有停止位 现象:没有共同的时钟,不考虑顺序来了就处理 直观感受:就是不用等了,效率高 同步 含义:指两个或两个以上随时间变化的量在变化过程中保持一定的相对关系 现象:有一个共同的时钟,按来的顺序一个一个处理 直观感受 :就是需要等候,效率低下 那么今天我们看怎么用2种方法用代码实现tornado的异步? 这些是导入的包: 2种方法用代码实现to

  • Python 异步协程函数原理及实例详解

    这篇文章主要介绍了Python 异步协程函数原理及实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一. asyncio 1.python3.4开始引入标准库之中,内置对异步io的支持 2.asyncio本身是一个消息循环 3.步骤: (1)创建消息循环 (2)把协程导入 (3)关闭 4.举例: import threading # 引入异步io包 import asyncio # 使用协程 @ asyncio.coroutine def

  • python 单线程和异步协程工作方式解析

    在python3.4之后新增了asyncio模块,可以帮我们检测IO(只能是网络IO[HTTP连接就是网络IO操作]),实现应用程序级别的切换(异步IO).注意:asyncio只能发tcp级别的请求,不能发http协议. 异步IO:所谓「异步 IO」,就是你发起一个 网络IO 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知. 实现方式:单线程+协程实现异步IO操作. 异步协程用法 接下来让我们来了解下协程的实现,从 Python 3.4 开始,Python 中加入了协程的概

  • python3爬虫中异步协程的用法

    1. 前言 在执行一些 IO 密集型任务的时候,程序常常会因为等待 IO 而阻塞.比如在网络爬虫中,如果我们使用 requests 库来进行请求的话,如果网站响应速度过慢,程序一直在等待网站响应,最后导致其爬取效率是非常非常低的. 为了解决这类问题,本文就来探讨一下 Python 中异步协程来加速的方法,此种方法对于 IO 密集型任务非常有效.如将其应用到网络爬虫中,爬取效率甚至可以成百倍地提升. 注:本文协程使用 async/await 来实现,需要 Python 3.5 及以上版本. 2.

  • 详解Python生成器和基于生成器的协程

    一.什么是生成器 Generator 1.生成器就是可以生成值的函数 2.当一个函数里有了 yield关键字就成了生成器 3.生成器可以挂起执行并且保持当前执行的状态 代码示例: def simple_gen(): yield 'hello' yield 'world' gen = simple_gen() print(type(gen)) # 'generator' object print(next(gen)) # 'hello' print(next(gen)) # 'world' 二.基

  • java协程框架quasar和kotlin中的协程对比分析

    目录 前言 快速体验 添加依赖 添加javaagent 线程VS协程 协程代码 多线程代码 协程完胜 后记 前言 早就听说Go语言开发的服务不用任何架构优化,就可以轻松实现百万级别的qps.这得益于Go语言级别的协程的处理效率.协程不同于线程,线程是操作系统级别的资源,创建线程,调度线程,销毁线程都是重量级别的操作.而且线程的资源有限,在java中大量的不加限制的创建线程非常容易将系统搞垮.接下来要分享的这个开源项目,正是解决了在java中只能使用多线程模型开发高并发应用的窘境,使得java也能

  • java基于quasar实现协程池的方法示例

    业务场景:golang与swoole都拥抱了协程,在同任务并发数量下,协程可比线程多几倍.所以最近在查询java时了解java本身是没有协程的,但是某牛自行实现了协程,也就是本文的主角quasar(纤程)!在csdn中基本都是对它的基本使用,用法和线程差不多.不过没看到谁公开一下手写协程池的骚操作(谁会直接new它用?那是没挨过社会的毒打呀~) 一个线程可以多个协程,一个进程也可以单独拥有多个协程. 线程进程都是同步机制,而协程则是异步. 协程能保留上一次调用时的状态,每次过程重入时,就相当于进

  • Python利用yield form实现异步协程爬虫

    目录 1.什么是yield 2.yield于列表的区别 3.yield from 实现协程 很古老的用法了,现在大多用的aiohttp库实现,这篇记录仅仅用做个人的协程底层实现的学习. 争取用看得懂的字来描述问题. 1.什么是yield 如果还没有怎么用过的话,直接把yield看做成一种特殊的return(PS:本质 generator(生成器))return是返回一个值然后就终断函数了,而yield返回的是一个生成器(PS:不知道的直接看作特殊列表,看下面的代码案例) # -*- coding

随机推荐