pyspark操作MongoDB的方法步骤

如何导入数据

数据可能有各种格式,虽然常见的是HDFS,但是因为在Python爬虫中数据库用的比较多的是MongoDB,所以这里会重点说说如何用spark导入MongoDB中的数据。

当然,首先你需要在自己电脑上安装spark环境,简单说下,在这里下载spark,同时需要配置好JAVA,Scala环境。

这里建议使用Jupyter notebook,会比较方便,在环境变量中这样设置

PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
如果你的环境中有多个Python版本,同样可以制定你想要使用的解释器,我这里是python36,根据需求修改。

PYSPARK_PYTHON=/usr/bin/python36

pyspark对mongo数据库的基本操作 (๑• . •๑)

有几点需要注意的:

  • 不要安装最新的pyspark版本,请安装 pip3 install pyspark==2.3.2
  • spark-connector 与平常的MongoDB写法不同,格式是: mongodb://127.0.0.1:database.collection
  • 如果计算数据量比较大,你的电脑可能会比较卡,^_^
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: zhangslob
@file: spark_count.py
@time: 2019/01/03
@desc:
  不要安装最新的pyspark版本
  `pip3 install pyspark==2.3.2`
  更多pyspark操作MongoDB请看https://docs.mongodb.com/spark-connector/master/python-api/
"""

import os
from pyspark.sql import SparkSession

# set PYSPARK_PYTHON to python36
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python36'

# load mongodb data
# 格式是:"mongodb://127.0.0.1:database.collection"
input_uri = "mongodb://127.0.0.1:27017/spark.spark_test"
output_uri = "mongodb://127.0.0.1:27017/spark.spark_test"

# 创建spark,默认使用本地环境,或者"spark://master:7077"
spark = SparkSession \
  .builder \
  .master("local") \
  .appName("MyApp") \
  .config("spark.mongodb.input.uri", input_uri) \
  .config("spark.mongodb.output.uri", output_uri) \
  .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.0') \
  .getOrCreate()

def except_id(collection_1, collection_2, output_collection, pipeline):
  """
  计算表1与表2中不同的数据
  :param collection_1: 导入表1
  :param collection_2: 导入表2
  :param output_collection: 保存的表
  :param pipeline: MongoDB查询语句 str
  :return:
  """
  # 可以在这里指定想要导入的数据库,将会覆盖上面配置中的input_uri。下面保存数据也一样
  # .option("collection", "mongodb://127.0.0.1:27017/spark.spark_test")
  # .option("database", "people").option("collection", "contacts")

  df_1 = spark.read.format('com.mongodb.spark.sql.DefaultSource').option("collection", collection_1) \
    .option("pipeline", pipeline).load()

  df_2 = spark.read.format('com.mongodb.spark.sql.DefaultSource').option("collection", collection_2) \
    .option("pipeline", pipeline).load()

  # df_1有但是不在 df_2,同理可以计算df_2有,df_1没有
  df = df_1.subtract(df_2)
  df.show()

  # mode 参数可选范围
  # * `append`: Append contents of this :class:`DataFrame` to existing data.
  # * `overwrite`: Overwrite existing data.
  # * `error` or `errorifexists`: Throw an exception if data already exists.
  # * `ignore`: Silently ignore this operation if data already exists.

  df.write.format("com.mongodb.spark.sql.DefaultSource").option("collection", output_collection).mode("append").save()
  spark.stop()

if __name__ == '__main__':
  # mongodb query, MongoDB查询语句,可以减少导入数据量
  pipeline = "[{'$project': {'uid': 1, '_id': 0}}]"

  collection_1 = "spark_1"
  collection_2 = "spark_2"
  output_collection = 'diff_uid'
  except_id(collection_1, collection_2, output_collection, pipeline)
  print('success')

完整代码地址: spark_count_diff_uid.py

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • c#操作mongodb插入数据效率

    mongodb的数据插入速度是其一个亮点,同样的10000条数据,插入的速度要比Mysql和sqlserver都要快,当然这也是要看使用者怎么个使用法,你代码如果10000次写入使用10000次连接,那也是比不过其他数据库使用事务一次性提交的速度的. 同样,mongo也提供的一次性插入巨量数据的方法,因为mongodb没有事务这回事,所以在在C#驱动里,具体方法是InsertManyAsync()一次性插入多个文档.与之对应的是InsertOneAsync,这个是一次插入一个文档: Insert

  • Java操作MongoDB插入数据进行模糊查询与in查询功能

    由于需要用MongoDB缓存数据,所以自己写了一套公共的存放和读取方法 具体如下: 存放mongodb: /** * 公共方法:设置Object类型缓存 * @author shijing * @param param * @param sysGuid */ public void setObjData(Map<String,Object> param, String sysGuid, String enumBpd){ DBObject dbObject = new BasicDBObject

  • Python操作mongodb数据库的方法详解

    本文实例讲述了Python操作mongodb数据库的方法.分享给大家供大家参考,具体如下: 安装pymongo 下载pymongo: https://pypi.python.org/packages/82/26/f45f95841de5164c48e2e03aff7f0702e22cef2336238d212d8f93e91ea8/pymongo-3.4.0.tar.gz#md5=aa77f88e51e281c9f328cea701bb6f3e 安装pymongo: 解压后,cmd进入pymon

  • MongoDB聚合分组取第一条记录的案例与实现方法

    前言 今天开发同学向我们提了一个紧急的需求,从集合mt_resources_access_log中,根据字段refererDomain分组,取分组中最近一笔插入的数据,然后将这些符合条件的数据导入到集合mt_resources_access_log_new中. 接到这个需求,还是有些心虚的,原因有二,一是,业务需要,时间紧:二是,实现这个功能MongoDB聚合感觉有些复杂,聚合要走好多步. 数据记录格式如下: 记录1 { "_id" : ObjectId("5c1e23eaa

  • 在pycharm上mongodb配置及可视化设置方法

    一.mongodb安装 在官网下载适应于自己平台的mongodb,在此安装环境为Windows7-64bit 下载完成后直接安装,连续点击next选项直到,此处注意!!!!! 切勿勾选左下选项,安装过程可能非常漫长,勾选选项为mongodb可视化工具,可另外下载 安装完成后配置过程结合官方工作手册进行设置(https://docs.mongodb.com/manual/tutorial/install-mongodb-on-windows/),mongodb安装完成. 二.配置pycharm 首

  • Docker容器化部署尝试——多容器通信(node+mongoDB+nginx)

    原因是这样的 想要部署一个mocker平台,就在朋友的推荐下选择了 api-mocker 这个现成的项目 该项目分为服务端node.客户端vue.以及数据库mongoDB 在尝试直接部署的时候发现需要装一大堆的环境,node.mongo.nginx啊,特别的麻烦,之前简单的使用过docker,就在想能不能用docker免环境直接部署呢?于是就有了这次的尝试 多容器通信 该项目分为3个部分,于是就要建立3个容器(node.mongo.nginx) 那容器之间怎么实现通信呢? # 通过link指令建

  • windows与mac安装mongodb数据库的方法步骤与注意事项

    nosql数据库 --mongodb数据库! Mongo DB 是目前在IT行业非常流行的一种非关系型数据库(NoSql),其灵活的数据存储方式备受当前IT从业人员的青睐.Mongo DB很好的实现了面向对象的思想(OO思想),在Mongo DB中每一条记录都是一个Document对象.Mongo DB最大的优势在于所有的数据持久操作都无需开发人员手动编写SQL语句,直接调用方法就可以轻松的实现CRUD操作. windows 安装 这里我说的是命令行安装方法 准备.到官网下载相应的版本 http

  • 详解Java 连接MongoDB集群的几种方式

    先决条件 先运行mongodb肯定是必须的,然后导入以下包: import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; import com.mongodb.ServerAddress; import com.mongodb.MongoCredential; import com.mongodb.MongoClientOptions; MongoClient MongoClient()实例表示到数据库的连接池; 你将只需

  • MongoDB中的加减乘除运算详解

    前言 很多同学因为对MongoDB不熟悉,加之应用的不是很多,有时候会认为MongoDB数据库对一些功能不支持,或者认为支持不好.今天我们 演示一下 MongoDB对"加减乘除"的使用. 在MongoDB数据库中"加减乘除"运算,又称为 数学表达式(mathematical expression:或算术表达式),主要用于操作数值. 1.$add操作符(+) 1.1 语法及功能介绍 $add 操作符主要用于将一组数字相加:也可以用于在指定时间上添加一定的时间间隔.时间

  • Python使用pymongo库操作MongoDB数据库的方法实例

    python操作mongodb数据库 # !/usr/bin/env python # -*- coding:utf-8 -*- """ 使用pymongo库操作MongoDB数据库 """ import pymongo # 1.连接数据库服务器,获取客户端对象 mongo_client=pymongo.MongoClient('localhost',27017) # 2.获取数据库对象 db=mongo_client.myDB # db=mon

随机推荐