python实现多线程抓取知乎用户

需要用到的包:

beautifulsoup4
html5lib
image
requests
redis
PyMySQL

pip安装所有依赖包:

pip install \
Image \
requests \
beautifulsoup4 \
html5lib \
redis \
PyMySQL

运行环境需要支持中文

测试运行环境python3.5,不保证其他运行环境能完美运行

需要安装mysql和redis

配置 config.ini 文件,设置好mysql和redis,并且填写你的知乎帐号

向数据库导入 init.sql

Run

开始抓取数据: python get_user.py
查看抓取数量: python check_redis.py

效果

总体思路

1.首先是模拟登陆知乎,利用保存登陆的cookie信息
2.抓取知乎页面的html代码,留待下一步继续进行分析提取信息
3.分析提取页面中用户的个性化url,放入redis(这里特别说明一下redis的思路用法,将提取到的用户的个性化url放入redis的一个名为already_get_user的hash table,表示已抓取的用户,对于已抓取过的用户判断是否存在于already_get_user以去除重复抓取,同时将个性化url放入user_queue的队列中,需要抓取新用户时pop队列获取新的用户)
4.获取用户的关注列表和粉丝列表,继续插入到redis
5.从redis的user_queue队列中获取新用户继续重复步骤3

模拟登陆知乎

首先是登陆,登陆功能作为一个包封装了在login里面,方便整合调用

header部分,这里Connection最好设为close,不然可能会碰到max retireve exceed的错误
原因在于普通的连接是keep-alive的但是却又没有关闭

# http请求的header
headers = {
  "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36",
  "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
  "Host": "www.zhihu.com",
  "Referer": "https://www.zhihu.com/",
  "Origin": "https://www.zhihu.com/",
  "Upgrade-Insecure-Requests": "1",
  "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
  "Pragma": "no-cache",
  "Accept-Encoding": "gzip, deflate, br",
  'Connection': 'close'
}

# 验证是否登陆
def check_login(self):
  check_url = 'https://www.zhihu.com/settings/profile'
  try:
    login_check = self.__session.get(check_url, headers=self.headers, timeout=35)
  except Exception as err:
    print(traceback.print_exc())
    print(err)
    print("验证登陆失败,请检查网络")
    sys.exit()
  print("验证登陆的http status code为:" + str(login_check.status_code))
  if int(login_check.status_code) == 200:
    return True
  else:
    return False

进入首页查看http状态码来验证是否登陆,200为已经登陆,一般304就是被重定向所以就是没有登陆

# 获取验证码
def get_captcha(self):
  t = str(time.time() * 1000)
  captcha_url = 'http://www.zhihu.com/captcha.gif?r=' + t + "&type=login"
  r = self.__session.get(captcha_url, headers=self.headers, timeout=35)
  with open('captcha.jpg', 'wb') as f:
    f.write(r.content)
    f.close()
    # 用pillow 的 Image 显示验证码
    # 如果没有安装 pillow 到源代码所在的目录去找到验证码然后手动输入
  '''try:
    im = Image.open('captcha.jpg')
    im.show()
    im.close()
  except:'''
  print(u'请到 %s 目录找到captcha.jpg 手动输入' % os.path.abspath('captcha.jpg'))
  captcha = input("请输入验证码\n>")
  return captcha

获取验证码的方法。当登录次数太多有可能会要求输入验证码,这里实现这个功能

# 获取xsrf
def get_xsrf(self):
  index_url = 'http://www.zhihu.com'
  # 获取登录时需要用到的_xsrf
  try:
    index_page = self.__session.get(index_url, headers=self.headers, timeout=35)
  except:
    print('获取知乎页面失败,请检查网络连接')
    sys.exit()
  html = index_page.text
  # 这里的_xsrf 返回的是一个list
  BS = BeautifulSoup(html, 'html.parser')
  xsrf_input = BS.find(attrs={'name': '_xsrf'})
  pattern = r'value=\"(.*?)\"'
  print(xsrf_input)
  self.__xsrf = re.findall(pattern, str(xsrf_input))
  return self.__xsrf[0]

获取xsrf,为什么要获取xsrf呢,因为xsrf是一种防止跨站攻击的手段,具体介绍可以看这里csrf
在获取到xsrf之后把xsrf存入cookie当中,并且在调用api的时候带上xsrf作为头部,不然的话知乎会返回403

# 进行模拟登陆
def do_login(self):
  try:
    # 模拟登陆
    if self.check_login():
      print('您已经登录')
      return
    else:
      if self.config.get("zhihu_account", "username") and self.config.get("zhihu_account", "password"):
        self.username = self.config.get("zhihu_account", "username")
        self.password = self.config.get("zhihu_account", "password")
      else:
        self.username = input('请输入你的用户名\n> ')
        self.password = input("请输入你的密码\n> ")
  except Exception as err:
    print(traceback.print_exc())
    print(err)
    sys.exit()
  if re.match(r"^1\d{10}$", self.username):
    print("手机登陆\n")
    post_url = 'http://www.zhihu.com/login/phone_num'
    postdata = {
      '_xsrf': self.get_xsrf(),
      'password': self.password,
      'remember_me': 'true',
      'phone_num': self.username,
    }
  else:
    print("邮箱登陆\n")
    post_url = 'http://www.zhihu.com/login/email'
    postdata = {
      '_xsrf': self.get_xsrf(),
      'password': self.password,
      'remember_me': 'true',
      'email': self.username,
    }
  try:
    login_page = self.__session.post(post_url, postdata, headers=self.headers, timeout=35)
    login_text = json.loads(login_page.text.encode('latin-1').decode('unicode-escape'))
    print(postdata)
    print(login_text)
    # 需要输入验证码 r = 0为登陆成功代码
    if login_text['r'] == 1:
      sys.exit()
  except:
    postdata['captcha'] = self.get_captcha()
    login_page = self.__session.post(post_url, postdata, headers=self.headers, timeout=35)
    print(json.loads(login_page.text.encode('latin-1').decode('unicode-escape')))
  # 保存登陆cookie
  self.__session.cookies.save()

这个就是核心的登陆功能啦,非常关键的就是用到了requests库,非常方便的保存到session
我们这里全局都是用单例模式,统一使用同一个requests.session对象进行访问功能,保持登录状态的一致性

最后主要调用登陆的代码为

# 创建login对象
lo = login.login.Login(self.session)
# 模拟登陆
if lo.check_login():
  print('您已经登录')
else:
  if self.config.get("zhihu_account", "username") and self.config.get("zhihu_account", "username"):
    username = self.config.get("zhihu_account", "username")
    password = self.config.get("zhihu_account", "password")
  else:
    username = input('请输入你的用户名\n> ')
    password = input("请输入你的密码\n> ")
  lo.do_login(username, password)

知乎模拟登陆到此就完成啦

知乎用户抓取

def __init__(self, threadID=1, name=''):
  # 多线程
  print("线程" + str(threadID) + "初始化")
  threading.Thread.__init__(self)
  self.threadID = threadID
  self.name = name
  try:
    print("线程" + str(threadID) + "初始化成功")
  except Exception as err:
    print(err)
    print("线程" + str(threadID) + "开启失败")
  self.threadLock = threading.Lock()
  # 获取配置
  self.config = configparser.ConfigParser()
  self.config.read("config.ini")
  # 初始化session
  requests.adapters.DEFAULT_RETRIES = 5
  self.session = requests.Session()
  self.session.cookies = cookielib.LWPCookieJar(filename='cookie')
  self.session.keep_alive = False
  try:
    self.session.cookies.load(ignore_discard=True)
  except:
    print('Cookie 未能加载')
  finally:
    pass
  # 创建login对象
  lo = Login(self.session)
  lo.do_login()
  # 初始化redis连接
  try:
    redis_host = self.config.get("redis", "host")
    redis_port = self.config.get("redis", "port")
    self.redis_con = redis.Redis(host=redis_host, port=redis_port, db=0)
    # 刷新redis库
    # self.redis_con.flushdb()
  except:
    print("请安装redis或检查redis连接配置")
    sys.exit()
  # 初始化数据库连接
  try:
    db_host = self.config.get("db", "host")
    db_port = int(self.config.get("db", "port"))
    db_user = self.config.get("db", "user")
    db_pass = self.config.get("db", "password")
    db_db = self.config.get("db", "db")
    db_charset = self.config.get("db", "charset")
    self.db = pymysql.connect(host=db_host, port=db_port, user=db_user, passwd=db_pass, db=db_db,
                 charset=db_charset)
    self.db_cursor = self.db.cursor()
  except:
    print("请检查数据库配置")
    sys.exit()
  # 初始化系统设置
  self.max_queue_len = int(self.config.get("sys", "max_queue_len"))

这个是get_user.py的构造函数,主要功能就是初始化mysql连接、redis连接、验证登陆、生成全局的session对象、导入系统配置、开启多线程。

# 获取首页html
def get_index_page(self):
  index_url = 'https://www.zhihu.com/'
  try:
    index_html = self.session.get(index_url, headers=self.headers, timeout=35)
  except Exception as err:
    # 出现异常重试
    print("获取页面失败,正在重试......")
    print(err)
    traceback.print_exc()
    return None
  finally:
    pass
  return index_html.text
# 获取单个用户详情页面
def get_user_page(self, name_url):
  user_page_url = 'https://www.zhihu.com' + str(name_url) + '/about'
  try:
    index_html = self.session.get(user_page_url, headers=self.headers, timeout=35)
  except Exception as err:
    # 出现异常重试
    print("失败name_url:" + str(name_url) + "获取页面失败,放弃该用户")
    print(err)
    traceback.print_exc()
    return None
  finally:
    pass
  return index_html.text
# 获取粉丝页面
def get_follower_page(self, name_url):
  user_page_url = 'https://www.zhihu.com' + str(name_url) + '/followers'
  try:
    index_html = self.session.get(user_page_url, headers=self.headers, timeout=35)
  except Exception as err:
    # 出现异常重试
    print("失败name_url:" + str(name_url) + "获取页面失败,放弃该用户")
    print(err)
    traceback.print_exc()
    return None
  finally:
    pass
  return index_html.text
def get_following_page(self, name_url):
  user_page_url = 'https://www.zhihu.com' + str(name_url) + '/followers'
  try:
    index_html = self.session.get(user_page_url, headers=self.headers, timeout=35)
  except Exception as err:
    # 出现异常重试
    print("失败name_url:" + str(name_url) + "获取页面失败,放弃该用户")
    print(err)
    traceback.print_exc()
    return None
  finally:
    pass
  return index_html.text
# 获取首页上的用户列表,存入redis
def get_index_page_user(self):
  index_html = self.get_index_page()
  if not index_html:
    return
  BS = BeautifulSoup(index_html, "html.parser")
  self.get_xsrf(index_html)
  user_a = BS.find_all("a", class_="author-link") # 获取用户的a标签
  for a in user_a:
    if a:
      self.add_wait_user(a.get('href'))
    else:
      continue

这一部分的代码就是用于抓取各个页面的html代码

# 加入带抓取用户队列,先用redis判断是否已被抓取过
def add_wait_user(self, name_url):
  # 判断是否已抓取
  self.threadLock.acquire()
  if not self.redis_con.hexists('already_get_user', name_url):
    self.counter += 1
    print(name_url + " 加入队列")
    self.redis_con.hset('already_get_user', name_url, 1)
    self.redis_con.lpush('user_queue', name_url)
    print("添加用户 " + name_url + "到队列")
  self.threadLock.release()
# 获取页面出错移出redis
def del_already_user(self, name_url):
  self.threadLock.acquire()
  if not self.redis_con.hexists('already_get_user', name_url):
    self.counter -= 1
    self.redis_con.hdel('already_get_user', name_url)
  self.threadLock.release()

用户加入redis的操作,在数据库插入出错时我们调用del_already_user删除插入出错的用户

# 分析粉丝页面获取用户的所有粉丝用户
# @param follower_page get_follower_page()中获取到的页面,这里获取用户hash_id请求粉丝接口获取粉丝信息
def get_all_follower(self, name_url):
  follower_page = self.get_follower_page(name_url)
  # 判断是否获取到页面
  if not follower_page:
    return
  BS = BeautifulSoup(follower_page, 'html.parser')
  # 获取关注者数量
  follower_num = int(BS.find('span', text='关注者').find_parent().find('strong').get_text())
  # 获取用户的hash_id
  hash_id = \
    json.loads(BS.select("#zh-profile-follows-list")[0].select(".zh-general-list")[0].get('data-init'))[
      'params'][
      'hash_id']
  # 获取关注者列表
  self.get_xsrf(follower_page) # 获取xsrf
  post_url = 'https://www.zhihu.com/node/ProfileFollowersListV2'
  # 开始获取所有的关注者 math.ceil(follower_num/20)*20
  for i in range(0, math.ceil(follower_num / 20) * 20, 20):
    post_data = {
      'method': 'next',
      'params': json.dumps({"offset": i, "order_by": "created", "hash_id": hash_id})
    }
    try:
      j = self.session.post(post_url, params=post_data, headers=self.headers, timeout=35).text.encode(
        'latin-1').decode(
        'unicode-escape')
      pattern = re.compile(r"class=\"zm-item-link-avatar\"[^\"]*\"([^\"]*)", re.DOTALL)
      j = pattern.findall(j)
      for user in j:
        user = user.replace('\\', '')
        self.add_wait_user(user) # 保存到redis
    except Exception as err:
      print("获取正在关注失败")
      print(err)
      traceback.print_exc()
      pass
# 获取正在关注列表
def get_all_following(self, name_url):
  following_page = self.get_following_page(name_url)
  # 判断是否获取到页面
  if not following_page:
    return
  BS = BeautifulSoup(following_page, 'html.parser')
  # 获取关注者数量
  following_num = int(BS.find('span', text='关注了').find_parent().find('strong').get_text())
  # 获取用户的hash_id
  hash_id = \
    json.loads(BS.select("#zh-profile-follows-list")[0].select(".zh-general-list")[0].get('data-init'))[
      'params'][
      'hash_id']
  # 获取关注者列表
  self.get_xsrf(following_page) # 获取xsrf
  post_url = 'https://www.zhihu.com/node/ProfileFolloweesListV2'
  # 开始获取所有的关注者 math.ceil(follower_num/20)*20
  for i in range(0, math.ceil(following_num / 20) * 20, 20):
    post_data = {
      'method': 'next',
      'params': json.dumps({"offset": i, "order_by": "created", "hash_id": hash_id})
    }
    try:
      j = self.session.post(post_url, params=post_data, headers=self.headers, timeout=35).text.encode(
        'latin-1').decode(
        'unicode-escape')
      pattern = re.compile(r"class=\"zm-item-link-avatar\"[^\"]*\"([^\"]*)", re.DOTALL)
      j = pattern.findall(j)
      for user in j:
        user = user.replace('\\', '')
        self.add_wait_user(user) # 保存到redis
    except Exception as err:
      print("获取正在关注失败")
      print(err)
      traceback.print_exc()
      pass

调用知乎的API,获取所有的关注用户列表和粉丝用户列表,递归获取用户
这里需要注意的是头部要记得带上xsrf不然会抛出403

# 分析about页面,获取用户详细资料
def get_user_info(self, name_url):
  about_page = self.get_user_page(name_url)
  # 判断是否获取到页面
  if not about_page:
    print("获取用户详情页面失败,跳过,name_url:" + name_url)
    return
  self.get_xsrf(about_page)
  BS = BeautifulSoup(about_page, 'html.parser')
  # 获取页面的具体数据
  try:
    nickname = BS.find("a", class_="name").get_text() if BS.find("a", class_="name") else ''
    user_type = name_url[1:name_url.index('/', 1)]
    self_domain = name_url[name_url.index('/', 1) + 1:]
    gender = 2 if BS.find("i", class_="icon icon-profile-female") else (1 if BS.find("i", class_="icon icon-profile-male") else 3)
    follower_num = int(BS.find('span', text='关注者').find_parent().find('strong').get_text())
    following_num = int(BS.find('span', text='关注了').find_parent().find('strong').get_text())
    agree_num = int(re.findall(r'<strong>(.*)</strong>.*赞同', about_page)[0])
    appreciate_num = int(re.findall(r'<strong>(.*)</strong>.*感谢', about_page)[0])
    star_num = int(re.findall(r'<strong>(.*)</strong>.*收藏', about_page)[0])
    share_num = int(re.findall(r'<strong>(.*)</strong>.*分享', about_page)[0])
    browse_num = int(BS.find_all("span", class_="zg-gray-normal")[2].find("strong").get_text())
    trade = BS.find("span", class_="business item").get('title') if BS.find("span",
                                       class_="business item") else ''
    company = BS.find("span", class_="employment item").get('title') if BS.find("span",
                                         class_="employment item") else ''
    school = BS.find("span", class_="education item").get('title') if BS.find("span",
                                        class_="education item") else ''
    major = BS.find("span", class_="education-extra item").get('title') if BS.find("span",
                                           class_="education-extra item") else ''
    job = BS.find("span", class_="position item").get_text() if BS.find("span",
                                      class_="position item") else ''
    location = BS.find("span", class_="location item").get('title') if BS.find("span",
                                         class_="location item") else ''
    description = BS.find("div", class_="bio ellipsis").get('title') if BS.find("div",
                                          class_="bio ellipsis") else ''
    ask_num = int(BS.find_all("a", class_='item')[1].find("span").get_text()) if \
      BS.find_all("a", class_='item')[
        1] else int(0)
    answer_num = int(BS.find_all("a", class_='item')[2].find("span").get_text()) if \
      BS.find_all("a", class_='item')[
        2] else int(0)
    article_num = int(BS.find_all("a", class_='item')[3].find("span").get_text()) if \
      BS.find_all("a", class_='item')[3] else int(0)
    collect_num = int(BS.find_all("a", class_='item')[4].find("span").get_text()) if \
      BS.find_all("a", class_='item')[4] else int(0)
    public_edit_num = int(BS.find_all("a", class_='item')[5].find("span").get_text()) if \
      BS.find_all("a", class_='item')[5] else int(0)
    replace_data = \
      (pymysql.escape_string(name_url), nickname, self_domain, user_type,
       gender, follower_num, following_num, agree_num, appreciate_num, star_num, share_num, browse_num,
       trade, company, school, major, job, location, pymysql.escape_string(description),
       ask_num, answer_num, article_num, collect_num, public_edit_num)
    replace_sql = '''REPLACE INTO
           user(url,nickname,self_domain,user_type,
           gender, follower,following,agree_num,appreciate_num,star_num,share_num,browse_num,
           trade,company,school,major,job,location,description,
           ask_num,answer_num,article_num,collect_num,public_edit_num)
           VALUES(%s,%s,%s,%s,
           %s,%s,%s,%s,%s,%s,%s,%s,
           %s,%s,%s,%s,%s,%s,%s,
           %s,%s,%s,%s,%s)'''
    try:
      print("获取到数据:")
      print(replace_data)
      self.db_cursor.execute(replace_sql, replace_data)
      self.db.commit()
    except Exception as err:
      print("插入数据库出错")
      print("获取到数据:")
      print(replace_data)
      print("插入语句:" + self.db_cursor._last_executed)
      self.db.rollback()
      print(err)
      traceback.print_exc()
  except Exception as err:
    print("获取数据出错,跳过用户")
    self.redis_con.hdel("already_get_user", name_url)
    self.del_already_user(name_url)
    print(err)
    traceback.print_exc()
    pass

最后,到用户的about页面,分析页面元素,利用正则或者beatifulsoup分析抓取页面的数据
这里我们SQL语句用REPLACE INTO而不用INSERT INTO,这样可以很好的防止数据重复问题

# 开始抓取用户,程序总入口
def entrance(self):
  while 1:
    if int(self.redis_con.llen("user_queue")) < 1:
      self.get_index_page_user()
    else:
      # 出队列获取用户name_url redis取出的是byte,要decode成utf-8
      name_url = str(self.redis_con.rpop("user_queue").decode('utf-8'))
      print("正在处理name_url:" + name_url)
      self.get_user_info(name_url)
      if int(self.redis_con.llen("user_queue")) <= int(self.max_queue_len):
        self.get_all_follower(name_url)
        self.get_all_following(name_url)
    self.session.cookies.save()
def run(self):
  print(self.name + " is running")
  self.entrance()

最后,入口

if __name__ == '__main__':
  login = GetUser(999, "登陆线程")
  threads = []
  for i in range(0, 4):
    m = GetUser(i, "thread" + str(i))
    threads.append(m)
  for i in range(0, 4):
    threads[i].start()
  for i in range(0, 4):
    threads[i].join()

这里就是多线程的开启,需要开启多少个线程就把4换成多少就可以了

Docker

嫌麻烦的可以参考一下我用docker简单的搭建一个基础环境:

mysql和redis都是官方镜像

docker run --name mysql -itd mysql:latest
docker run --name redis -itd mysql:latest

再利用docker-compose运行python镜像,我的python的docker-compose.yml:

python:
 container_name: python
 build: .
 ports:
  - "84:80"
 external_links:
  - memcache:memcache
  - mysql:mysql
  - redis:redis
 volumes:
  - /docker_containers/python/www:/var/www/html
 tty: true
 stdin_open: true
 extra_hosts:
  - "python:192.168.102.140"
 environment:
  PYTHONIOENCODING: utf-8

最后附上源代码: GITHUB https://github.com/kong36088/ZhihuSpider

本站下载地址: http://xiazai.jb51.net/201612/yuanma/ZhihuSpider(jb51.net).zip

(0)

相关推荐

  • Python 多线程Threading初学教程

    1.1 什么是多线程 Threading 多线程可简单理解为同时执行多个任务. 多进程和多线程都可以执行多个任务,线程是进程的一部分.线程的特点是线程之间可以共享内存和变量,资源消耗少(不过在Unix环境中,多进程和多线程资源调度消耗差距不明显,Unix调度较快),缺点是线程之间的同步和加锁比较麻烦. 1.2 添加线程 Thread 导入模块 import threading 获取已激活的线程数 threading.active_count() 查看所有线程信息 threading.enumer

  • 深入理解 Python 中的多线程 新手必看

    示例1 我们将要请求五个不同的url: 单线程 import time import urllib2 defget_responses(): urls=[ 'http://www.baidu.com', 'http://www.amazon.com', 'http://www.ebay.com', 'http://www.alibaba.com', 'http://www.jb51.net' ] start=time.time() forurlinurls: printurl resp=urll

  • 详解Python多线程Selenium跨浏览器测试

    前言 在web测试中,不可避免的一个测试就是浏览器兼容性测试,在没有自动化测试前,我们总是苦逼的在一台或多台机器上安装N种浏览器,然后手工在不同的浏览器上验证主业务流程和关键功能模块功能,以检测不同浏览器或不同版本浏览器上,我们的web应用是否可以正常工作. 下面我们看看怎么利用python selenium进行自动化的跨浏览器测试. 什么是跨浏览器测试 跨浏览器测试是功能测试的一个分支,用以验证web应用能在不同的浏览器上正常工作. 为什么需要跨浏览器测试 通常情况下,我们都期望web类应用

  • Python控制多进程与多线程并发数总结

    一.前言 本来写了脚本用于暴力破解密码,可是1秒钟尝试一个密码2220000个密码我的天,想用多线程可是只会一个for全开,难道开2220000个线程吗?只好学习控制线程数了,官方文档不好看,觉得结构不够清晰,网上找很多文章也都不很清晰,只有for全开线程,没有控制线程数的具体说明,最终终于根据多篇文章和官方文档算是搞明白基础的多线程怎么实现法了,怕长时间不用又忘记,找着麻烦就贴这了,跟我一样新手也可以参照参照. 先说进程和线程的区别: 地址空间:进程内的一个执行单元;进程至少有一个线程;它们共

  • Python实现多线程抓取网页功能实例详解

    本文实例讲述了Python实现多线程抓取网页功能.分享给大家供大家参考,具体如下: 最近,一直在做网络爬虫相关的东西. 看了一下开源C++写的larbin爬虫,仔细阅读了里面的设计思想和一些关键技术的实现. 1.larbin的URL去重用的很高效的bloom filter算法: 2.DNS处理,使用的adns异步的开源组件: 3.对于url队列的处理,则是用部分缓存到内存,部分写入文件的策略. 4.larbin对文件的相关操作做了很多工作 5.在larbin里有连接池,通过创建套接字,向目标站点

  • Python 多线程的实例详解

     Python 多线程的实例详解 一)线程基础 1.创建线程: thread模块提供了start_new_thread函数,用以创建线程.start_new_thread函数成功创建后还可以对其进行操作. 其函数原型: start_new_thread(function,atgs[,kwargs]) 其参数含义如下: function: 在线程中执行的函数名     args:元组形式的参数列表.     kwargs: 可选参数,以字典的形式指定参数 方法一:通过使用thread模块中的函数创

  • Python实现多线程HTTP下载器示例

    本文将介绍使用Python编写多线程HTTP下载器,并生成.exe可执行文件. 环境:windows/Linux + Python2.7.x 单线程 在介绍多线程之前首先介绍单线程.编写单线程的思路为: 1.解析url: 2.连接web服务器: 3.构造http请求包: 4.下载文件. 接下来通过代码进行说明. 解析url 通过用户输入url进行解析.如果解析的路径为空,则赋值为'/':如果端口号为空,则赋值为"80":下载文件的文件名可根据用户的意愿进行更改(输入'y'表示更改,输入

  • python实现多线程抓取知乎用户

    需要用到的包: beautifulsoup4 html5lib image requests redis PyMySQL pip安装所有依赖包: pip install \ Image \ requests \ beautifulsoup4 \ html5lib \ redis \ PyMySQL 运行环境需要支持中文 测试运行环境python3.5,不保证其他运行环境能完美运行 需要安装mysql和redis 配置 config.ini 文件,设置好mysql和redis,并且填写你的知乎帐号

  • Python实现多线程抓取妹子图

    心血来潮写了个多线程抓妹子图,虽然代码还是有一些瑕疵,但是还是记录下来,分享给大家. Pic_downloader.py # -*- coding: utf-8 -*- """ Created on Fri Aug 07 17:30:58 2015 @author: Dreace """ import urllib2 import sys import time import os import random from multiprocessi

  • java实现爬取知乎用户基本信息

    本文实例为大家分享了一个基于JAVA的知乎爬虫,抓取知乎用户基本信息,基于HttpClient 4.5,供大家参考,具体内容如下 详细内容: 抓取90W+用户信息(基本上活跃的用户都在里面) 大致思路: 1.首先模拟登录知乎,登录成功后将Cookie序列化到磁盘,不用以后每次都登录(如果不模拟登录,可以直接从浏览器塞入Cookie也是可以的). 2.创建两个线程池和一个Storage.一个抓取网页线程池,负责执行request请求,并返回网页内容,存到Storage中.另一个是解析网页线程池,负

  • python爬虫开发之使用Python爬虫库requests多线程抓取猫眼电影TOP100实例

    使用Python爬虫库requests多线程抓取猫眼电影TOP100思路: 查看网页源代码 抓取单页内容 正则表达式提取信息 猫眼TOP100所有信息写入文件 多线程抓取 运行平台:windows Python版本:Python 3.7. IDE:Sublime Text 浏览器:Chrome浏览器 1.查看猫眼电影TOP100网页原代码 按F12查看网页源代码发现每一个电影的信息都在"<dd></dd>"标签之中. 点开之后,信息如下: 2.抓取单页内容 在浏

  • python 抓取知乎指定回答下视频的方法

    前言 现在知乎允许上传视频,奈何不能下载视频,好气哦,无奈之下研究一下了,然后撸了代码,方便下载视频保存. 接下来以 猫为什么一点也不怕蛇? 回答为例,分享一下整个下载过程. 调试一下 打开 F12, 找到光标,如下图: 然后将光标移动到视频上.如下图: 咦这是什么?视野中出现了一条神秘的链接: https://www.zhihu.com/video/xxxxx,让我们将这条链接复制到浏览器上,然后打开: 似乎这就是我们要找的视频,不要着急,让我们看一看,网页的请求,然后你会发现一个很有意思的请

  • python多线程抓取天涯帖子内容示例

    使用re, urllib, threading 多线程抓取天涯帖子内容,设置url为需抓取的天涯帖子的第一页,设置file_name为下载后的文件名 复制代码 代码如下: #coding:utf-8 import urllibimport reimport threadingimport os, time class Down_Tianya(threading.Thread):    """多线程下载"""    def __init__(sel

  • 基于Python实现微博抓取GUI程序

    目录 前言 微博功能布局 创建微博 Widget 创建微博查询 词云制作 结果展示 前言 在前面的分享中,我们制作了一个天眼查 GUI 程序,今天我们在这个的基础上,继续开发新的功能,微博抓取工具,先来看下最终的效果 整体的界面还是继承自上次的天眼查界面,我们直接来看相关功能 微博功能布局 我们整体的界面布局就是左侧可以选择不同功能,然后右侧的界面会对应改变 创建微博 Widget 对于右侧界面的切换,我们可以为不同的功能创建不同的 Widget,当点击左侧不同功能按钮后,对应切换 Widget

  • 零基础写Java知乎爬虫之抓取知乎答案

    前期我们抓取标题是在该链接下: http://www.zhihu.com/explore/recommendations 但是显然这个页面是无法获取答案的. 一个完整问题的页面应该是这样的链接: http://www.zhihu.com/question/22355264 仔细一看,啊哈我们的封装类还需要进一步包装下,至少需要个questionDescription来存储问题描述: import java.util.ArrayList;public class Zhihu { public St

  • python+mongodb数据抓取详细介绍

    分享点干货!!! Python数据抓取分析 编程模块:requests,lxml,pymongo,time,BeautifulSoup 首先获取所有产品的分类网址: def step(): try: headers = { ..... } r = requests.get(url,headers,timeout=30) html = r.content soup = BeautifulSoup(html,"lxml") url = soup.find_all(正则表达式) for i

随机推荐