python实现多线程行情抓取工具的方法

思路

借助python当中threading模块与Queue模块组合可以方便的实现基于生产者-消费者模型的多线程模型。Jimmy大神的tushare一直是广大python数据分析以及业余量化爱好者喜爱的免费、开源的python财经数据接口包。

平时一直有在用阿里云服务器通过tushare的接口自动落地相关财经数据,但日复权行情数据以往在串行下载的过程当中,速度比较慢,有时遇到网络原因还需要重下。每只股票的行情下载过程中都需要完成下载、落地2个步骤,一个可能需要网络开销、一个需要数据库mysql的存取开销。2者原本就可以独立并行执行,是个典型的“生产者-消费者”模型。

基于queue与threading模块的线程使用一般采用以下的套路:

producerQueue=Queue()
consumerQueue=Queue()
lock = threading.Lock()
class producerThead(threading.Thread):
 def __init__(self, producerQueue,consumerQueue):
 self.producerQueue=producerQueue
 self.consumerQueue=consumerQueue

 def run(self):
 while not self.thread_stop:
  try:
  #接收任务,如果连续20秒没有新的任务,线程退出,否则会一直执行
  item=self.producerQueue.get(block=True, timeout=20)
  #阻塞调用进程直到有数据可用。如果timeout是个正整数,
  #阻塞调用进程最多timeout秒,
  #如果一直无数据可用,抛出Empty异常(带超时的阻塞调用)
  except Queue.Empty:
  print("Nothing to do!thread exit!")
  self.thread_stop=True
  break
  #实现生产者逻辑,生成消费者需要处理的内容 consumerQueue.put(someItem)
  #还可以边处理,边生成新的生产任务
  doSomethingAboutProducing()
  self.producerQueue.task_done()
 def stop(self):
 self.thread_stop = True

class consumerThead(threading.Thread):
 def __init__(self,lock, consumerQueue):
 self.consumerQueue=consumerQueue
 def run(self):
 while true:
  try:
  #接收任务,如果连续20秒没有新的任务,线程退出,否则会一直执行
  item=self.consumerQueue.get(block=True, timeout=20)
  #阻塞调用进程直到有数据可用。如果timeout是个正整数,
  #阻塞调用进程最多timeout秒,
  #如果一直无数据可用,抛出Empty异常(带超时的阻塞调用)
  except Queue.Empty:
  print("Nothing to do!thread exit!")
  self.thread_stop=True
  break
  doSomethingAboutConsuming(lock)# 处理消费者逻辑,必要时使用线程锁 ,如文件操作等
  self.consumerQueue.task_done()
#定义主线程
def main():
 for i in range(n):#定义n个i消费者线程
 t = ThreadRead(producerQueue, consumerQueue)
 t.setDaemon(True)
 t.start()
 producerTasks=[] #定义初始化生产者任务队列
 producerQueue.put(producerTasks)
 for i in range(n):#定义n个生产者钱程
 t = ThreadWrite(consumerQueue, lock)
 t.setDaemon(True)
 t.start()
 stock_queue.join()
 data_queue.join()

相关接口

1,股票列表信息接口

作用

获取沪深上市公司基本情况。属性包括:

code,代码
name,名称
industry,所属行业
area,地区
pe,市盈率
outstanding,流通股本(亿)
totals,总股本(亿)
totalAssets,总资产(万)
liquidAssets,流动资产
fixedAssets,固定资产
reserved,公积金
reservedPerShare,每股公积金
esp,每股收益
bvps,每股净资
pb,市净率
timeToMarket,上市日期
undp,未分利润
perundp, 每股未分配
rev,收入同比(%)
profit,利润同比(%)
gpr,毛利率(%)
npr,净利润率(%)
holders,股东人数

调用方法

import tushare as ts
ts.get_stock_basics()

返回效果

  name industry area  pe outstanding  totals totalAssets
code
600606 金丰投资  房产服务 上海  0.00  51832.01 51832.01 744930.44
002285 世联行  房产服务 深圳 71.04  76352.17 76377.60 411595.28
000861 海印股份  房产服务 广东 126.20  83775.50 118413.84 730716.56
000526 银润投资  房产服务 福建 2421.16  9619.50 9619.50  20065.32
000056 深国商  房产服务 深圳  0.00  14305.55 26508.14 787195.94
600895 张江高科  园区开发 上海 171.60 154868.95 154868.95 1771040.38
600736 苏州高新  园区开发 江苏 48.68 105788.15 105788.15 2125485.75
600663 陆家嘴  园区开发 上海 47.63 135808.41 186768.41 4562074.50
600658 电子城  园区开发 北京 19.39  58009.73 58009.73 431300.19
600648 外高桥  园区开发 上海 65.36  81022.34 113534.90 2508100.75
600639 浦东金桥  园区开发 上海 57.28  65664.88 92882.50 1241577.00
600604 市北高新  园区开发 上海 692.87  33352.42 56644.92 329289.50

2,日复权行情接口

作用

提供股票上市以来所有历史数据,默认为前复权,读取后存到本地,作为后续分析的基础

调用方法

ts.get_h_data('002337', start='2015-01-01', end='2015-03-16') #两个日期之间的前复权数据

parameter:
code:string,股票代码 e.g. 600848
start:string,开始日期 format:YYYY-MM-DD 为空时取当前日期
end:string,结束日期 format:YYYY-MM-DD 为空时取去年今日
autype:string,复权类型,qfq-前复权 hfq-后复权 None-不复权,默认为qfq
index:Boolean,是否是大盘指数,默认为False
retry_count : int, 默认3,如遇网络等问题重复执行的次数
pause : int, 默认 0,重复请求数据过程中暂停的秒数,防止请求间隔时间太短出现的问题

return:
date : 交易日期 (index)
open : 开盘价
high : 最高价
close : 收盘价
low : 最低价
volume : 成交量
amount : 成交金额

返回结果

   open high close low  volume  amount
date
2015-03-16 13.27 13.45 13.39 13.00 81212976 1073862784
2015-03-13 13.04 13.38 13.37 13.00 40548836 532739744
2015-03-12 13.29 13.95 13.28 12.96 71505720 962979904
2015-03-11 13.35 13.48 13.15 13.00 59110248 780300736
2015-03-10 13.16 13.67 13.59 12.72 105753088 1393819776
2015-03-09 13.77 14.73 14.13 13.70 139091552 1994454656
2015-03-06 12.17 13.39 13.39 12.17 89486704 1167752960
2015-03-05 12.79 12.80 12.17 12.08 26040832 966927360
2015-03-04 13.96 13.96 13.30 12.58 26636174 1060270720
2015-03-03 12.17 13.10 13.10 12.05 19290366 733336768

实现

废话不多说,直接上代码,

生产者线程,读取行情

class ThreadRead(threading.Thread):
 def __init__(self, queue, out_queue):
  '''
  用于根据股票代码、需要读取的日期,读取增量的日行情数据,
  :param queue:用于保存需要读取的股票代码、起始日期的列表
  :param out_queue:用于保存需要写入到数据库表的结果集列表
  :return:
  '''
  threading.Thread.__init__(self)
  self.queue = queue
  self.out_queue = out_queue
 def run(self):
  while true:
   item = self.queue.get()
   time.sleep(0.5)
   try:
    df_h_data = ts.get_h_data(item['code'], start=item['startdate'], retry_count=10, pause=0.01)
    if df_h_data is not None and len(df_h_data)>0:
     df_h_data['secucode'] = item['code']
     df_h_data.index.name = 'date'
     print df_h_data.index,item['code'],item['startdate']
     df_h_data['tradeday'] = df_h_data.index.strftime('%Y-%m-%d')
     self.out_queue.put(df_h_data)
   except Exception, e:
    print str(e)
    self.queue.put(item) # 将没有爬取成功的数据放回队列里面去,以便下次重试。
    time.sleep(10)
    continue
   self.queue.task_done()

消费者线程,本地存储

class ThreadWrite(threading.Thread):
 def __init__(self, queue, lock, db_engine):
  '''
  :param queue: 某种形式的任务队列,此处为tushare为每个股票返回的最新日复权行情数据
  :param lock: 暂时用连接互斥操作,防止mysql高并发,后续可尝试去掉
  :param db_engine: mysql数据库的连接对象
  :return:no
  '''
  threading.Thread.__init__(self)
  self.queue = queue
  self.lock = lock
  self.db_engine = db_engine

 def run(self):
  while True:
   item = self.queue.get()
   self._save_data(item)
   self.queue.task_done()

 def _save_data(self, item):
   with self.lock:
    try:
     item.to_sql('cron_dailyquote', self.db_engine, if_exists='append', index=False)
    except Exception, e: # 如果是新股,则有可能df_h_data是空对象,因此需要跳过此类情况不处理
     print str(e)

定义主线程

from Queue import Queue
stock_queue = Queue()
data_queue = Queue()
lock = threading.Lock()
def main():
 '''
 用于测试多线程读取数据
 :return:
 '''
 #获取环境变量,取得相应的环境配置,上线时不需要再变更代码
 global stock_queue
 global data_queue
 config=os.getenv('FLASK_CONFIG')
 if config == 'default':
  db_url='mysql+pymysql://root:******@localhost:3306/python?charset=utf8mb4'
 else:
  db_url='mysql+pymysql://root:******@localhost:3306/test?charset=utf8mb4'
 db_engine = create_engine(db_url, echo=True)
 conn = db_engine.connect()
 #TODO 增加ts.get_stock_basics()报错的处理,如果取不到信息则直接用数据库中的股票代码信息,来获取增量信息
 #TODO 增加一个标志,如果一个股票代码的最新日期不是最新日期,则需标记该代码不需要重新获取数据,即记录该股票更新日期到了最新工作日,
 df = ts.get_stock_basics()
 df.to_sql('stock_basics',db_engine,if_exists='replace',dtype={'code': CHAR(6)})
 # 计算距离当前日期最大的工作日,以便每日定时更新
 today=time.strftime('%Y-%m-%d',time.localtime(time.time()))
 s1=("select max(t.date) from cron_tradeday t where flag=1 and t.date <='"+ today+"'")
 selectsql=text(s1)
 maxTradeay = conn.execute(selectsql).first()
 # 计算每只股票当前加载的最大工作日期,支持重跑
 s = ("select secucode,max(t.tradeday) from cron_dailyquote t group by secucode ")
 selectsql = text(s)
 result = conn.execute(selectsql) # 执行查询语句
 df_result = pd.DataFrame(result.fetchall())
 df_result.columns=['stockcode','max_tradeday']
 df_result.set_index(df_result['stockcode'],inplace=True)
 # 开始归档前复权历史行情至数据库当中,以便可以方便地计算后续选股模型

 for i in range(3):#使用3个线程
  t = ThreadRead(stock_queue, data_queue)
  t.setDaemon(True)
  t.start()
 for code in set(list(df.index)):
  try:
   #如果当前股票已经是最新的行情数据,则直接跳过,方便重跑。
   #print maxTradeay[0],df_result.loc[code].values[1]
   if df_result.loc[code].values[1] == maxTradeay[0]:
    continue
   startdate=getLastNdate(df_result.loc[code].values[1],1)
  except Exception, e:
   #如果某只股票没有相关的行情,则默认开始日期为2015年1月1日
   startdate='2015-01-01'
  item={}
  item['code']=code
  item['startdate']=startdate
  stock_queue.put(item) # 生成生产者任务队列
 for i in range(3):
  t = ThreadWrite(data_queue, lock, db_engine)
  t.setDaemon(True)
  t.start()
 stock_queue.join()
 data_queue.join()

执行效果

原本需要2,3个小时才能执行完成的每日复权行情增量落地,有效缩短至了1小时以内,这里线程数并不上越多越好,由于复权行情读的是新浪接口,在高并发情况下会返回HTTP 503服务器过载的错误,另外高并发下可能需要使用IP代理池,下载的时段也需要尝试多个时段进行。初次尝试,如果有更好的方法或者哪里有考虑不周的地方欢迎留言建议或者指正。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

您可能感兴趣的文章:

  • Python使用代理抓取网站图片(多线程)
  • Python代理抓取并验证使用多线程实现
  • Python实现多线程抓取妹子图
  • python多线程抓取天涯帖子内容示例
  • Python 多线程抓取图片效率对比
  • 尝试使用Python多线程抓取代理服务器IP地址的示例
  • python实现多线程抓取知乎用户
  • Python实现多线程抓取网页功能实例详解
  • Python之多线程爬虫抓取网页图片的示例代码
(0)

相关推荐

  • Python之多线程爬虫抓取网页图片的示例代码

    目标 嗯,我们知道搜索或浏览网站时会有很多精美.漂亮的图片. 我们下载的时候,得鼠标一个个下载,而且还翻页. 那么,有没有一种方法,可以使用非人工方式自动识别并下载图片.美美哒. 那么请使用python语言,构建一个抓取和下载网页图片的爬虫. 当然为了提高效率,我们同时采用多线程并行方式. 思路分析 Python有很多的第三方库,可以帮助我们实现各种各样的功能.问题在于,我们弄清楚我们需要什么: 1)http请求库,根据网站地址可以获取网页源代码.甚至可以下载图片写入磁盘. 2)解析网页源代码,

  • Python代理抓取并验证使用多线程实现

    没有使用队列,也没有线程池还在学习只是多线程 复制代码 代码如下: #coding:utf8 import urllib2,sys,re import threading,os import time,datetime ''''' 这里没有使用队列 只是采用多线程分发对代理量不大的网页还行但是几百几千性能就很差了 ''' def get_proxy_page(url): '''''解析代理页面 获取所有代理地址''' proxy_list = [] p = re.compile(r'''''<d

  • Python使用代理抓取网站图片(多线程)

    一.功能说明:1. 多线程方式抓取代理服务器,并多线程验证代理服务器ps 代理服务器是从http://www.cnproxy.com/ (测试只选择了8个页面)抓取2. 抓取一个网站的图片地址,多线程随机取一个代理服务器下载图片二.实现代码 复制代码 代码如下: #!/usr/bin/env python#coding:utf-8 import urllib2import reimport threadingimport timeimport random rawProxyList = []ch

  • 尝试使用Python多线程抓取代理服务器IP地址的示例

    这里以抓取 http://www.proxy.com.ru 站点的代理服务器为例,代码如下: #!/usr/bin/env python #coding:utf-8 import urllib2 import re import threading import time import MySQLdb rawProxyList = [] checkedProxyList = [] #抓取代理网站 targets = [] for i in xrange(1,42): target = r"htt

  • Python 多线程抓取图片效率对比

    目的: 是学习python 多线程的工作原理,及通过抓取400张图片这种IO密集型应用来查看多线程效率对比 import requests import urlparse import os import time import threading import Queue path = '/home/lidongwei/scrapy/owan_img_urls.txt' #path = '/home/lidongwei/scrapy/cc.txt' fetch_img_save_path =

  • python实现多线程抓取知乎用户

    需要用到的包: beautifulsoup4 html5lib image requests redis PyMySQL pip安装所有依赖包: pip install \ Image \ requests \ beautifulsoup4 \ html5lib \ redis \ PyMySQL 运行环境需要支持中文 测试运行环境python3.5,不保证其他运行环境能完美运行 需要安装mysql和redis 配置 config.ini 文件,设置好mysql和redis,并且填写你的知乎帐号

  • Python实现多线程抓取网页功能实例详解

    本文实例讲述了Python实现多线程抓取网页功能.分享给大家供大家参考,具体如下: 最近,一直在做网络爬虫相关的东西. 看了一下开源C++写的larbin爬虫,仔细阅读了里面的设计思想和一些关键技术的实现. 1.larbin的URL去重用的很高效的bloom filter算法: 2.DNS处理,使用的adns异步的开源组件: 3.对于url队列的处理,则是用部分缓存到内存,部分写入文件的策略. 4.larbin对文件的相关操作做了很多工作 5.在larbin里有连接池,通过创建套接字,向目标站点

  • python多线程抓取天涯帖子内容示例

    使用re, urllib, threading 多线程抓取天涯帖子内容,设置url为需抓取的天涯帖子的第一页,设置file_name为下载后的文件名 复制代码 代码如下: #coding:utf-8 import urllibimport reimport threadingimport os, time class Down_Tianya(threading.Thread):    """多线程下载"""    def __init__(sel

  • Python实现多线程抓取妹子图

    心血来潮写了个多线程抓妹子图,虽然代码还是有一些瑕疵,但是还是记录下来,分享给大家. Pic_downloader.py # -*- coding: utf-8 -*- """ Created on Fri Aug 07 17:30:58 2015 @author: Dreace """ import urllib2 import sys import time import os import random from multiprocessi

  • python实现多线程行情抓取工具的方法

    思路 借助python当中threading模块与Queue模块组合可以方便的实现基于生产者-消费者模型的多线程模型.Jimmy大神的tushare一直是广大python数据分析以及业余量化爱好者喜爱的免费.开源的python财经数据接口包. 平时一直有在用阿里云服务器通过tushare的接口自动落地相关财经数据,但日复权行情数据以往在串行下载的过程当中,速度比较慢,有时遇到网络原因还需要重下.每只股票的行情下载过程中都需要完成下载.落地2个步骤,一个可能需要网络开销.一个需要数据库mysql的

  • Python基于多线程实现抓取数据存入数据库的方法

    本文实例讲述了Python基于多线程实现抓取数据存入数据库的方法.分享给大家供大家参考,具体如下: 1. 数据库类 """ 使用须知: 代码中数据表名 aces ,需要更改该数据表名称的注意更改 """ import pymysql class Database(): # 设置本地数据库用户名和密码 host = "localhost" user = "root" password = "&quo

  • Python打印scrapy蜘蛛抓取树结构的方法

    本文实例讲述了Python打印scrapy蜘蛛抓取树结构的方法.分享给大家供大家参考.具体如下: 通过下面这段代码可以一目了然的知道scrapy的抓取页面结构,调用也非常简单 #!/usr/bin/env python import fileinput, re from collections import defaultdict def print_urls(allurls, referer, indent=0): urls = allurls[referer] for url in urls

  • python使用自定义user-agent抓取网页的方法

    本文实例讲述了python使用自定义user-agent抓取网页的方法.分享给大家供大家参考.具体如下: 下面python代码通过urllib2抓取指定的url的内容,并且使用自定义的user-agent,可防止网站屏蔽采集器 import urllib2 req = urllib2.Request('http://192.168.1.2/') req.add_header('User-agent', 'Mozilla 5.10') res = urllib2.urlopen(req) html

  • Python实现登录人人网并抓取新鲜事的方法

    本文实例讲述了Python实现登录人人网并抓取新鲜事的方法.分享给大家供大家参考.具体如下: 这里演示了Python登录人人网并抓取新鲜事的方法(抓取后的排版不太美观~~) from sgmllib import SGMLParser import sys,urllib2,urllib,cookielib class spider(SGMLParser): def __init__(self,email,password): SGMLParser.__init__(self) self.h3=F

  • Python实现周期性抓取网页内容的方法

    本文实例讲述了Python实现周期性抓取网页内容的方法.分享给大家供大家参考,具体如下: 1.使用sched模块可以周期性地执行指定函数 2.在周期性执行指定函数中抓取指定网页,并解析出想要的网页内容,代码中是六维论坛的在线人数 论坛在线人数统计代码: #coding=utf-8 import time,sched,os,urllib2,re,string #初始化sched模块的scheduler类 #第一个参数是一个可以返回时间戳的函数,第二个参数可以在定时未到达之前阻塞. s = sche

  • Python 通过requests实现腾讯新闻抓取爬虫的方法

    最近也是学习了一些爬虫方面的知识.以我自己的理解,通常我们用浏览器查看网页时,是通过浏览器向服务器发送请求,然后服务器响应以后返回一些代码数据,再经过浏览器解析后呈现出来.而爬虫则是通过程序向服务器发送请求,并且将服务器返回的信息,通过一些处理后,就能得到我们想要的数据了. 以下是前段时间我用python写的一个爬取TX新闻标题及其网址的一个简单爬虫: 首先需要用到python中requests(方便全面的http请求库)和 BeautifulSoup(html解析库). 通过pip来安装这两个

  • python面向对象多线程爬虫爬取搜狐页面的实例代码

    首先我们需要几个包:requests, lxml, bs4, pymongo, redis 1. 创建爬虫对象,具有的几个行为:抓取页面,解析页面,抽取页面,储存页面 class Spider(object): def __init__(self): # 状态(是否工作) self.status = SpiderStatus.IDLE # 抓取页面 def fetch(self, current_url): pass # 解析页面 def parse(self, html_page): pass

  • python制作微博图片爬取工具

    有小半个月没有发博客了,因为一直在研究python的GUI,买了一本书学习了一些基础,用我所学做了我的第一款GUI--微博图片爬取工具.本软件源代码已经放在了博客中,另外软件已经打包好上传到网盘中以供下载学习. 一.准备工作 本次要用到以下依赖库:re json os random tkinter threading requests PIL 其中后两个需要安装后使用 二.预览 1.启动 2.运行中 3.结果 这里只将拿一张图片作为展示. 三.设计流程 设计流程分为总体设计和详细设计,这里我会使

随机推荐