python使用adbapi实现MySQL数据库的异步存储

之前一直在写有关scrapy爬虫的事情,今天我们看看使用scrapy如何把爬到的数据放在MySQL数据库中保存。

有关python操作MySQL数据库的内容,网上已经有很多内容可以参考了,但都是在同步的操作MySQL数据库。在数据量不大的情况下,这种方法固然可以,但是一旦数据量增长后,MySQL就会出现崩溃的情况,因为网上爬虫的速度要远远高过往数据库中插入数据的速度。为了避免这种情况发生,我们就需要使用异步的方法来存储数据,爬虫与数据存储互不影响。

为了显示方便,我们把程序设计的简单一点,只是爬一页的数据。我们今天选择伯乐在线这个网站来爬取,只爬取第一页的数据。

首先我们还是要启动一个爬虫项目,然后自己建了一个爬虫的文件jobbole.py。我们先来看看这个文件中的代码

# -*- coding: utf-8 -*-
import io
import sys
import scrapy
import re
import datetime
from scrapy.http import Request
from urllib import parse
from ArticleSpider.items import JobboleArticleItem, ArticleItemLoader
from scrapy.loader import ItemLoader
sys.stdout = io.TextIOWrapper(sys.stdout.buffer,encoding='utf-8')

class JobboleSpider(scrapy.Spider):
 """docstring for JobboleSpider"""
 name = "jobbole"
 allowed_domain = ["blog.jobbole.com"]
 start_urls = ['http://blog.jobbole.com/all-posts/']

 def parse(self, response):
 """
 1.获取列表页中的文章url
 """
 # 解析列表汇中所有文章url并交给scrapy下载器并进行解析
 post_nodes = response.css("#archive .floated-thumb .post-thumb a")
 for post_node in post_nodes:
 image_url = post_node.css("img::attr(src)").extract_first("")# 这里取出每篇文章的封面图,并作为meta传入Request
 post_url = post_node.css("::attr(href)").extract_first("")
 yield Request(url = parse.urljoin(response.url, post_url), meta = {"front_image_url":image_url}, callback = self.parse_detail)

 def parse_detail(self, response):
 article_item = JobboleArticleItem()
 # 通过ItemLoader加载Item
 # 通过add_css后的返回值都是list型,所有我们再items.py要进行处理
 item_loader = ArticleItemLoader(item = JobboleArticleItem(), response = response)
 item_loader.add_css("title", ".entry-header h1::text")
 item_loader.add_value("url", response.url)
 # item_loader.add_value("url_object_id", get_md5(response.url))
 item_loader.add_value("url_object_id", response.url)
 item_loader.add_css("create_date", "p.entry-meta-hide-on-mobile::text")
 item_loader.add_value("front_image_url", [front_image_url])
 item_loader.add_css("praise_nums", ".vote-post-up h10::text")
 item_loader.add_css("comment_nums", "a[href='#article-comment'] span::text")
 item_loader.add_css("fav_nums", ".bookmark-btn::text")
 item_loader.add_css("tags", "p.entry-meta-hide-on-mobile a::text")
 item_loader.add_css("content", "div.entry")

 article_item = item_loader.load_item()
 print(article_item["tags"])

 yield article_item
 pass

这里我把代码进行了简化,首先对列表页发出请求,这里只爬取一页数据,然后分析每一页的url,并且交给scrapy对每一个url进行请求,得到每篇文章的详情页,把详情页的相关内容放在MySQL数据库中。
这里使用itemloader来进行页面的解析,这样解析有个最大的好处就是可以把解析规则存放在数据库中,实现对解析规则的动态加载。但是要注意一点是使用itemloader中css方式和xpath方式得到的数据都是list型,因此还需要在items.py中再对相对应的数据进行处理。

接下来我们就来看看items.py是如何处理list数据的。

# -*- coding: utf-8 -*-

# Define here the models for your scraped items
#
# See documentation in:
# https://doc.scrapy.org/en/latest/topics/items.html
import datetime
import re

import scrapy
from scrapy.loader import ItemLoader
from scrapy.loader.processors import MapCompose, TakeFirst,Join
from ArticleSpider.utils.common import get_md5

def convert_date(value):
 try:
 create_date = datetime.datetime.strptime(create_date, "%Y/%m/%d").date()
 except Exception as e:
 create_date = datetime.datetime.now().date()
 return create_date

def get_nums(value):
 match_re = re.match(".*?(\d+).*", value)
 if match_re:
 nums = int(match_re.group(1))
 else:
 nums = 0

 return nums

def remove_comment_tags(value):
 # 去掉tags中的评论内容
 if "评论" in value:
 # 这里做了修改,如果返回"",则在list中仍然会占位,会变成类似于["程序员",,"解锁"]这样
 # return ""
 return None
 else:
 return value

def return_value(value):
 return 

class ArticleItemLoader(ItemLoader):
 """docstring for AriticleItemLoader"""
 # 自定义ItemLoader
 default_output_processor = TakeFirst()

class ArticlespiderItem(scrapy.Item):
 # define the fields for your item here like:
 # name = scrapy.Field()
 pass

class JobboleArticleItem(scrapy.Item):
 """docstring for ArticlespiderItem"""
 title = scrapy.Field()
 create_date = scrapy.Field(
 input_processor = MapCompose(convert_date)
 )
 url = scrapy.Field()
 url_object_id = scrapy.Field(
 output_processor = MapCompose(get_md5)
 )
 # 这里注意front_image_url还是一个list,在进行sql语句时还需要处理
 front_image_url = scrapy.Field(
 output_processor = MapCompose(return_value)
 )
 front_image_path = scrapy.Field()
 praise_nums = scrapy.Field(
 input_processor = MapCompose(get_nums)
 )
 comment_nums = scrapy.Field(
 input_processor = MapCompose(get_nums)
 )
 fav_nums = scrapy.Field(
 input_processor = MapCompose(get_nums)
 )
 # tags要做另行处理,因为tags我们需要的就是list
 tags = scrapy.Field(
 input_processor = MapCompose(remove_comment_tags),
 output_processor = Join(",")
 )
 content = scrapy.Field()

首先我们看到定义了一个类ArticleItemloader,在这个类中只有一句话,就是对于每个items都默认采用list中的第一个元素,这样我们就可以把每个items中的第一个元素取出来。但是要注意,有些items我们是必须要用list型的,比如我们给ImagePipeline的数据就要求必须是list型,这样我们就需要对front_image_url单独进行处理。这里我们做了一个小技巧,对front_image_url什么都不错,因为我们传过来的front_image_url就是list型
在items的Field中有两个参数,一个是input_processor,另一个是output_processor,这两个参数可以帮助我们对items的list中的每个元素进行处理,比如有些需要用md5进行加密,有些需要用正则表达式进行筛选或者排序等等。

在进行mysql的pipeline之前,我们需要设计数据库,下面是我自己设计的数据库的字段,仅供参考

这里我把url_object_id作为该表的主键,由于它不会重复,所以适合做主键。

下面我们来看看数据库的pipeline。

# -*- coding: utf-8 -*-

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
import codecs
import json
from twisted.enterprise import adbapi
import MySQLdb
import MySQLdb.cursors

class MysqlTwistedPipeline(object):
 """docstring for MysqlTwistedPipeline"""
 #采用异步的机制写入mysql
 def __init__(self, dbpool):
 self.dbpool = dbpool

 @classmethod
 def from_settings(cls, settings):
 dbparms = dict(
 host = settings["MYSQL_HOST"],
 db = settings["MYSQL_DBNAME"],
 user = settings["MYSQL_USER"],
 passwd = settings["MYSQL_PASSWORD"],
 charset='utf8',
 cursorclass=MySQLdb.cursors.DictCursor,
 use_unicode=True,
 )
 dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)

 return cls(dbpool)

 def process_item(self, item, spider):
 #使用twisted将mysql插入变成异步执行
 query = self.dbpool.runInteraction(self.do_insert, item)
 query.addErrback(self.handle_error, item, spider) #处理异常
 return item

 def handle_error(self, failure, item, spider):
 # 处理异步插入的异常
 print (failure)

 def do_insert(self, cursor, item):
 #执行具体的插入
 #根据不同的item 构建不同的sql语句并插入到mysql中
 # insert_sql, params = item.get_insert_sql()
 # print (insert_sql, params)
 # cursor.execute(insert_sql, params)
 insert_sql = """
 insert into jobbole_article(title, url, create_date, fav_nums, url_object_id)
 VALUES (%s, %s, %s, %s, %s)
 """
 # 可以只使用execute,而不需要再使用commit函数
 cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"], item["url_object_id"]))

在这里我们只是演示一下,我们只向数据库中插入5个字段的数据,分别是title,url,create_date,fav_nums,url_object_id。

当然你也可以再加入其它的字段。

首先我们看看from_settings这个函数,它可以从settings.py文件中取出我们想想要的数据,这里我们把数据库的host,dbname,username和password都放在settings.py中。实际的插入语句还是在process_item中进行,我们自己定义了一个函数do_insert,然后把它传给dbpool中用于插入真正的数据。

最后我们来看看settings.py中的代码,这里就很简单了。

MYSQL_HOST = "localhost"
MYSQL_DBNAME = "article_wilson"
MYSQL_USER = "root"
MYSQL_PASSWORD = "root"

其实这里是和pipeline中的代码是想对应的,别忘了把在settings.py中把pipeline打开。

ITEM_PIPELINES = {
 # 'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
 # 'ArticleSpider.pipelines.JsonWithEncodingPipeline': 1

 # # 'scrapy.pipelines.images.ImagePipeline': 1,
 # 'ArticleSpider.pipelines.JsonExporterPipleline': 1
 # 'ArticleSpider.pipelines.ArticleImagePipeline': 2
 # 'ArticleSpider.pipelines.MysqlPipeline': 1
 'ArticleSpider.pipelines.MysqlTwistedPipeline': 1
}

好了,现在我们可以跑一程序吧。

scrapy crawl jobbole

下面是运行结果的截图

好了,以上就是今天的全部内容了。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • python中aioysql(异步操作MySQL)的方法

    python异步IO初探 探索异步IO执之前,先说说IO的种类 1.阻塞IO最简单,即读写数据时,需要等待操作完成,才能继续执行.进阶的做法就是用多线程来处理需要IO的部分,缺点是开销会有些大. 2.非阻塞IO,即读写数据时,如果暂时不可读写,则立刻返回,而不等待.因为不知道什么时候是可读写的,所以轮询时可能会浪费CPU时间. 3.IO复用,即在读写数据前,先检查哪些描述符是可读写的,再去读写.select 和 poll 就是这样做的,它们会遍历所有被监视的描述符,查看是否满足,这个检查的过程是

  • python如何实现异步调用函数执行

    在实现异步调用之前我们先进行什么是同步调用和异步调用 同步:是指完成事务的逻辑,先执行第一个事务,如果阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,顺序执行 异步:是和同步相对的,异步是指在处理调用这个事务的之后,不会等待这个事务的处理结果,直接处理第二个事务去了,通过状态.通知.回调来通知调用者处理结果 分析一下,下面的例子: 定义了一个装饰器 async 和 A .B 两个function 函数 A 里面sleep 10s , 然后打印 a function 字符串 B 里面直接打

  • python并发和异步编程实例

    关于并发.并行.同步阻塞.异步非阻塞.线程.进程.协程等这些概念,单纯通过文字恐怕很难有比较深刻的理解,本文就通过代码一步步实现这些并发和异步编程,并进行比较.解释器方面本文选择python3,毕竟python3才是python的未来,并且python3用原生的库实现协程已经非常方便了. 1.准备阶段 下面为所有测试代码所需要的包 #! python3 # coding:utf-8 import socket from concurrent import futures from selecto

  • Python异步操作MySQL示例【使用aiomysql】

    本文实例讲述了Python异步操作MySQL.分享给大家供大家参考,具体如下: 安装aiomysql 依赖 Python3.4+ asyncio PyMySQL 安装 pip install aiomysql 应用 基本的异步连接connection import asyncio from aiomysql import create_pool loop = asyncio.get_event_loop() async def go(): async with create_pool(host=

  • Python 异步协程函数原理及实例详解

    这篇文章主要介绍了Python 异步协程函数原理及实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一. asyncio 1.python3.4开始引入标准库之中,内置对异步io的支持 2.asyncio本身是一个消息循环 3.步骤: (1)创建消息循环 (2)把协程导入 (3)关闭 4.举例: import threading # 引入异步io包 import asyncio # 使用协程 @ asyncio.coroutine def

  • python异步实现定时任务和周期任务的方法

    一. 如何调用 def f1(arg1, arg2): print('f1', arg1, arg2) def f2(arg1): print('f2', arg1) def f3(): print('f3') def f4(): print('周期任务', int(time.time())) timer = TaskTimer() # 把任务加入任务队列 timer.join_task(f1, [1, 2], timing=15.5) # 每天15:30执行 timer.join_task(f

  • python使用celery实现异步任务执行的例子

    使用celery在django项目中实现异步发送短信 在项目的目录下创建celery_tasks用于保存celery异步任务. 在celery_tasks目录下创建config.py文件,用于保存celery的配置信息 ```broker_url = "redis://127.0.0.1/14"``` 在celery_tasks目录下创建main.py文件,用于作为celery的启动文件 from celery import Celery # 为celery使用django配置文件进行

  • python使用adbapi实现MySQL数据库的异步存储

    之前一直在写有关scrapy爬虫的事情,今天我们看看使用scrapy如何把爬到的数据放在MySQL数据库中保存. 有关python操作MySQL数据库的内容,网上已经有很多内容可以参考了,但都是在同步的操作MySQL数据库.在数据量不大的情况下,这种方法固然可以,但是一旦数据量增长后,MySQL就会出现崩溃的情况,因为网上爬虫的速度要远远高过往数据库中插入数据的速度.为了避免这种情况发生,我们就需要使用异步的方法来存储数据,爬虫与数据存储互不影响. 为了显示方便,我们把程序设计的简单一点,只是爬

  • python使用MySQLdb访问mysql数据库的方法

    本文实例讲述了python使用MySQLdb访问mysql数据库的方法.分享给大家供大家参考.具体如下: #!/usr/bin/python import MySQLdb def doInsert(cursor,db): #insert # Prepare SQL query to INSERT a record into the database. sql = "UPDATE EMPLOYEE SET AGE = AGE+1 WHERE SEX = '%c'" %('M') try:

  • Python实现定时备份mysql数据库并把备份数据库邮件发送

    一.先来看备份mysql数据库的命令 mysqldump -u root --password=root --database abcDataBase > c:/abc_backup.sql 二.写Python程序 BackupsDB.py #!/usr/bin/python # -*- coding: UTF-8 -*- ''''' zhouzhongqing 备份数据库 ''' import os import time import sched import smtplib from em

  • Python实现的查询mysql数据库并通过邮件发送信息功能

    本文实例讲述了Python实现的查询mysql数据库并通过邮件发送信息功能.分享给大家供大家参考,具体如下: 这里使用Python查询mysql数据库,并通过邮件发送宕机信息. Python代码如下: #-*- coding: UTF-8 -*- #!/usr/bin/env python ''''' author:qlzhong Created on 2015-6-29 征途宕机日志统计汇总 ''' import MySQLdb import time import datetime impo

  • python远程连接服务器MySQL数据库

    本文实例为大家分享了python远程连接服务器MySQL数据库的具体代码,供大家参考,具体内容如下 这里默认大家都已经配置安装好 MySQL 和 Python 的MySQL 模块,且默认大家的DB内表和访问账号权限均已设置无误,下面直接代码演示: # -*- coding: utf-8 -*- """ Created on Fri Dec 30 10:43:35 2016 @author: zhengyongzhe """ import MySQ

  • Python使用pymysql从MySQL数据库中读出数据的方法

    python3.x已经不支持mysqldb了,支持的是pymysql 使用pandas读取MySQL数据时,使用sqlalchemy,出现No module named 'MySQLdb'错误. 安装:打开Windows PowerShell,输入pip3 install PyMySQL即可 import pymysql.cursors import pymysql import pandas as pd #连接配置信息 config = { 'host':'127.0.0.1', 'port'

  • python 基于PYMYSQL使用MYSQL数据库

    在做测试的时候都会用到数据库,今天写一篇通过python连接MYSQL数据库 什么是MYSQL数据库 MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,目前属于 Oracle 旗下产品.MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL是最好的 RDBMS (Relational Database Management System,关系数据库管理系统) 应用软件之一. 什么是PYMYSQL PyMySQL 是在 Python3.x 版本中用于

  • python基于Pandas读写MySQL数据库

    要实现 pandas 对 mysql 的读写需要三个库 pandas sqlalchemy pymysql 可能有的同学会问,单独用 pymysql 或 sqlalchemy 来读写数据库不香么,为什么要同时用三个库?主要是使用场景不同,个人觉得就大数据处理而言,用 pandas 读写数据库更加便捷. 1.read_sql_query 读取 mysql read_sql_query 或 read_sql 方法传入参数均为 sql 语句,读取数据库后,返回内容是 dateframe 对象.普及一下

  • Python基础之操作MySQL数据库

    一.数据库操作 1.1 安装PyMySQL pip install PyMySQL 1.2 连接数据库 python连接test数据库 import pymysql host = 'localhost' # 主机地址 username = 'root' # 数据库用户名 password = '' # 数据库密码 db_name = 'test' # 数据库名称 # 创建connect对象 connect = pymysql.connect(host=host, user=username, p

  • 运用Python快速的对MySQL数据库进行重命名

    对数据库的表进行重命名可以使用以下原生sql: RENAME TABLE old_table TO new_table; 窘境:但是MySQL并没有直接支持对数据库进行重命名 那么如何运用Python快速的对现有的数据库进行重命名呢? 比如项目初期,对数据库的命名(db_ridingroad)没有规划好, 然后在下面创建了大量的表和写入了大量的数据,现在需要对数据库的名字进行重命名为(db_news_website) 常规思路 下面的方法步骤较为繁琐 -- 数据库备份 mysqldump –u

随机推荐