Python实现SQL注入检测插件实例代码

扫描器需要实现的功能思维导图

爬虫编写思路

首先需要开发一个爬虫用于收集网站的链接,爬虫需要记录已经爬取的链接和待爬取的链接,并且去重,用 Python 的set()就可以解决,大概流程是:

  • 输入 URL
  • 下载解析出 URL
  • URL 去重,判断是否为本站
  • 加入到待爬列表
  • 重复循环

SQL 判断思路

  • 通过在 URL 后面加上AND %d=%d或者OR NOT (%d>%d)
  • %d后面的数字是随机可变的
  • 然后搜索网页中特殊关键词,比如:

MySQL 中是 SQL syntax.*MySQL
Microsoft SQL Server 是 Warning.*mssql_
Microsoft Access 是 Microsoft Access Driver
Oracle 是 Oracle error
IBM DB2 是 DB2 SQL error
SQLite 是 SQLite.Exception
...

通过这些关键词就可以判断出所用的数据库

  • 还需要判断一下 waf 之类的东西,有这种东西就直接停止。简单的方法就是用特定的 URL 访问,如果出现了像IP banned,fierwall之类的关键词,可以判断出是waf。具体的正则表达式是(?i)(\A|\b)IP\b.*\b(banned|blocked|bl(a|o)ck\s?list|firewall)
  • 开发准备展开目录

请安装这些库

pip install requests
pip install beautifulsoup4

实验环境是 Linux,创建一个Code目录,在其中创建一个work文件夹,将其作为工作目录

目录结构

/w8ay.py  // 项目启动主文件
/lib/core // 核心文件存放目录
/lib/core/config.py // 配置文件
/script   // 插件存放
/exp      // exp和poc存放

步骤

SQL 检测脚本编写

DBMS_ERRORS = {
  'MySQL': (r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"valid MySQL result", r"MySqlClient\."),
  "PostgreSQL": (r"PostgreSQL.*ERROR", r"Warning.*\Wpg_.*", r"valid PostgreSQL result", r"Npgsql\."),
  "Microsoft SQL Server": (r"Driver.* SQL[\-\_\ ]*Server", r"OLE DB.* SQL Server", r"(\W|\A)SQL Server.*Driver", r"Warning.*mssql_.*", r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}", r"(?s)Exception.*\WSystem\.Data\.SqlClient\.", r"(?s)Exception.*\WRoadhouse\.Cms\."),
  "Microsoft Access": (r"Microsoft Access Driver", r"JET Database Engine", r"Access Database Engine"),
  "Oracle": (r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver", r"Warning.*\Woci_.*", r"Warning.*\Wora_.*"),
  "IBM DB2": (r"CLI Driver.*DB2", r"DB2 SQL error", r"\bdb2_\w+\("),
  "SQLite": (r"SQLite/JDBCDriver", r"SQLite.Exception", r"System.Data.SQLite.SQLiteException", r"Warning.*sqlite_.*", r"Warning.*SQLite3::", r"\[SQLITE_ERROR\]"),
  "Sybase": (r"(?i)Warning.*sybase.*", r"Sybase message", r"Sybase.*Server message.*"),
}

通过正则表达式就可以判断出是哪个数据库了

for (dbms, regex) in ((dbms, regex) for dbms in DBMS_ERRORS for regex in DBMS_ERRORS[dbms]):
  if (re.search(regex,_content)):
    return True

下面是我们测试语句的payload

BOOLEAN_TESTS = (" AND %d=%d", " OR NOT (%d=%d)")

用报错语句返回正确的内容和错误的内容进行对比

for test_payload in BOOLEAN_TESTS:
  # Right Page
  RANDINT = random.randint(1, 255)
  _url = url + test_payload % (RANDINT, RANDINT)
  content["true"] = Downloader.get(_url)
  _url = url + test_payload % (RANDINT, RANDINT + 1)
  content["false"] = Downloader.get(_url)
  if content["origin"] == content["true"] != content["false"]:
    return "sql found: %" % url

这句

content["origin"] == content["true"] != content["false"]

意思就是当原始网页等于正确的网页不等于错误的网页内容时,就可以判定这个地址存在注入漏洞

完整代码:

import re, random
from lib.core import Download
def sqlcheck(url):
  if (not url.find("?")): # Pseudo-static page
    return false;
  Downloader = Download.Downloader()
  BOOLEAN_TESTS = (" AND %d=%d", " OR NOT (%d=%d)")
  DBMS_ERRORS = {
    # regular expressions used for DBMS recognition based on error message response
    "MySQL": (r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"valid MySQL result", r"MySqlClient\."),
    "PostgreSQL": (r"PostgreSQL.*ERROR", r"Warning.*\Wpg_.*", r"valid PostgreSQL result", r"Npgsql\."),
    "Microsoft SQL Server": (r"Driver.* SQL[\-\_\ ]*Server", r"OLE DB.* SQL Server", r"(\W|\A)SQL Server.*Driver", r"Warning.*mssql_.*", r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}", r"(?s)Exception.*\WSystem\.Data\.SqlClient\.", r"(?s)Exception.*\WRoadhouse\.Cms\."),
    "Microsoft Access": (r"Microsoft Access Driver", r"JET Database Engine", r"Access Database Engine"),
    "Oracle": (r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver", r"Warning.*\Woci_.*", r"Warning.*\Wora_.*"),
    "IBM DB2": (r"CLI Driver.*DB2", r"DB2 SQL error", r"\bdb2_\w+\("),
    "SQLite": (r"SQLite/JDBCDriver", r"SQLite.Exception", r"System.Data.SQLite.SQLiteException", r"Warning.*sqlite_.*", r"Warning.*SQLite3::", r"\[SQLITE_ERROR\]"),
    "Sybase": (r"(?i)Warning.*sybase.*", r"Sybase message", r"Sybase.*Server message.*"),
  }
  _url = url + "%29%28%22%27"
  _content = Downloader.get(_url)
  for (dbms, regex) in ((dbms, regex) for dbms in DBMS_ERRORS for regex in DBMS_ERRORS[dbms]):
    if (re.search(regex,_content)):
      return True
  content = {}
  content['origin'] = Downloader.get(_url)
  for test_payload in BOOLEAN_TESTS:
    # Right Page
    RANDINT = random.randint(1, 255)
    _url = url + test_payload % (RANDINT, RANDINT)
    content["true"] = Downloader.get(_url)
    _url = url + test_payload % (RANDINT, RANDINT + 1)
    content["false"] = Downloader.get(_url)
    if content["origin"] == content["true"] != content["false"]:
      return "sql found: %" % url

将这个文件命名为sqlcheck.py,放在/script目录中。代码的第 4 行作用是查找 URL 是否包含?,如果不包含,比方说伪静态页面,可能不太好注入,因此需要过滤掉

爬虫的编写

爬虫的思路上面讲过了,先完成 URL 的管理,我们单独将它作为一个类,文件保存在/lib/core/UrlManager.py

#-*- coding:utf-8 -*-

class UrlManager(object):
  def __init__(self):
    self.new_urls = set()
    self.old_urls = set()

  def add_new_url(self, url):
    if url is None:
      return
    if url not in self.new_urls and url not in self.old_urls:
      self.new_urls.add(url)

  def add_new_urls(self, urls):
    if urls is None or len(urls) == 0:
      return
    for url in urls:
      self.add_new_url(url)

  def has_new_url(self):
    return len(self.new_urls) != 0

  def get_new_url(self):
    new_url = self.new_urls.pop()
    self.old_urls.add(new_url)
    return new_url

为了方便,我们也将下载功能单独作为一个类使用,文件保存在lib/core/Downloader.py

#-*- coding:utf-8 -*-
import requests

class Downloader(object):
  def get(self, url):
    r = requests.get(url, timeout = 10)
    if r.status_code != 200:
      return None
    _str = r.text
    return _str

  def post(self, url, data):
    r = requests.post(url, data)
    _str = r.text
    return _str

  def download(self, url, htmls):
    if url is None:
      return None
    _str = {}
    _str["url"] = url
    try:
      r = requests.get(url, timeout = 10)
      if r.status_code != 200:
        return None
      _str["html"] = r.text
    except Exception as e:
      return None
    htmls.append(_str)

特别说明,因为我们要写的爬虫是多线程的,所以类中有个download方法是专门为多线程下载专用的

在lib/core/Spider.py中编写爬虫

#-*- coding:utf-8 -*-

from lib.core import Downloader, UrlManager
import threading
from urllib import parse
from urllib.parse import urljoin
from bs4 import BeautifulSoup

class SpiderMain(object):
  def __init__(self, root, threadNum):
    self.urls = UrlManager.UrlManager()
    self.download = Downloader.Downloader()
    self.root = root
    self.threadNum = threadNum

  def _judge(self, domain, url):
    if (url.find(domain) != -1):
      return True
    return False

  def _parse(self, page_url, content):
    if content is None:
      return
    soup = BeautifulSoup(content, 'html.parser')
    _news = self._get_new_urls(page_url, soup)
    return _news

  def _get_new_urls(self, page_url, soup):
    new_urls = set()
    links = soup.find_all('a')
    for link in links:
      new_url = link.get('href')
      new_full_url = urljoin(page_url, new_url)
      if (self._judge(self.root, new_full_url)):
        new_urls.add(new_full_url)
    return new_urls

  def craw(self):
    self.urls.add_new_url(self.root)
    while self.urls.has_new_url():
      _content = []
      th = []
      for i in list(range(self.threadNum)):
        if self.urls.has_new_url() is False:
          break
        new_url = self.urls.get_new_url()

        ## sql check
        try:
          if (sqlcheck.sqlcheck(new_url)):
            print("url:%s sqlcheck is valueable" % new_url)
        except:
          pass

        print("craw:" + new_url)
        t = threading.Thread(target = self.download.download, args = (new_url, _content))
        t.start()
        th.append(t)
      for t in th:
        t.join()
      for _str in _content:
        if _str is None:
          continue
        new_urls = self._parse(new_url, _str["html"])
        self.urls.add_new_urls(new_urls)

爬虫通过调用craw()方法传入一个网址进行爬行,然后采用多线程的方法下载待爬行的网站,下载之后的源码用_parse方法调用BeautifulSoup进行解析,之后将解析出的 URL 列表丢入 URL 管理器,这样循环,最后只要爬完了网页,爬虫就会停止

threading库可以自定义需要开启的线程数,线程开启后,每个线程会得到一个 url 进行下载,然后线程会阻塞,阻塞完毕后线程放行

爬虫和 SQL 检查的结合

在lib/core/Spider.py文件引用一下from script import sqlcheck,在craw()方法中,取出新的 URL 地方调用一下

##sql check
try:
  if(sqlcheck.sqlcheck(new_url)):
    print("url:%s sqlcheck is valueable"%new_url)
except:
  pass

用try检测可能出现的异常,绕过它,在文件w8ay.py中进行测试

#-*- coding:utf-8 -*-
'''
Name: w8ayScan
Author: mathor
Copyright (c) 2019
'''
import sys
from lib.core.Spider import SpiderMain
def main():
  root = "https://wmathor.com"
  threadNum = 50
  w8 = SpiderMain(root, threadNum)
  w8.craw()

if __name__ == "__main__":
  main()

很重要的一点!为了使得lib和script文件夹中的.py文件可以可以被认作是模块,请在lib、lib/core和script文件夹中创建__init__.py文件,文件中什么都不需要写

总结

SQL 注入检测通过一些payload使页面出错,判断原始网页,正确网页,错误网页即可检测出是否存在 SQL 注入漏洞
通过匹配出 sql 报错出来的信息,可以正则判断所用的数据库

好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

(0)

相关推荐

  • python实现一个简单的udp通信的示例代码

    什么是 Socket? Socket又称"套接字",应用程序通常通过"套接字"向网络发出请求或者应答网络请求,使主机间或者一台计算机上的进程间可以通讯. python创建套接字 socket.socket([family[, type[, proto]]]) 参数解析: family: AF_UNIX(适用跨机通信) 或 AF_INET(IPv4)(适用本机通信) type: 套接字类型,可以根据是面向连接的还是非连接分为 SOCK_STREAM(TCP) 或 SO

  • Python读取指定日期邮件的实例

    背景:9月份有部分数据缺失,这部分数据在邮箱里,需要重新拉取,但是每天几百封邮件,总共有6.7万封邮件,使用stat()和retr(which)结合遍历很 耗时 基于如上背景,初始大致思路为: 使用二分法获取到需要恢复数据时间范围内的中间一个值,也就是第几封邮件,然后分两头遍历, 读取到所有需要的邮件 算法函数: 1.获取到邮件的时间,由于可能收到格林威治时间邮件,所以这里做了相应的处理 def getTimeStamp(cn): messages=mailServer.retr(cn)[1]

  • Python的条件表达式和lambda表达式实例

    条件表达式 条件表达式也称为三元表达式,表达式的形式:x if C else y.流程是:如果C为真,那么执行x,否则执行y. 经过测试x,y,C可以是函数,表达式,常量等等: def put(): print('this is put()') def get(): print('this is get()') def post(): return 0 method = put if post() else get method() lambda表达式 lambda [arguments] :

  • Python 利用切片从列表中取出一部分使用的方法

    我想从列表中取出一部分拿来使用,可以创建切片,指定需要使用的第一个元素和最后一个元素的索引 使用例子,说明切片的使用 #创建一个数字列表,代表我有100页文章,然后进行分页显示 mage=list(range(1,101)) max_count=len(mage) #使用len()获取列表的长度,上节学的 n=0 while n<max_count: #这里用到了一个while循环,穿越过来的 print(mage[n:n+10]) #这里就用到了切片,这一节的重点 n=n+10 输出: [1,

  • Python使用ctypes调用C/C++的方法

    python使用ctypes调用C/C++ 1. ctpes介绍 ctypes is a foreign function library for Python. It provides C compatible data types, and allows calling functions in DLLs or shared libraries. It can be used to wrap these libraries in pure Python. 官方文档地址: https://do

  • 使用Python向C语言的链接库传递数组、结构体、指针类型的数据

    使用python向C语言的链接库传递数组.结构体.指针类型的数据 由于最近的项目频繁使用python调用同事的C语言代码,在调用过程中踩了很多坑,一点一点写出来供大家参考,我们仍然是使用ctypes来调用C语言的代码库. 至于如何调用基础数据类型的数据,请大家参考我的另外一篇文章:Python使用ctypes调用C/C++的方法 1. 使用python给C语言函数传递数组类型的参数 想必很多时候,C语言会使用数组作为参数,在之前我们使用过ctypes的一些数据类型作为C语言参数类型,包括byte

  • Python告诉你木马程序的键盘记录原理

    前言 Python keylogger键盘记录的功能的实现主要利用了pythoncom及pythonhook,然后就是对windows API的各种调用.Python之所以用起来方便快捷,主要归功于这些庞大的支持库,正所谓"人生苦短,快用Python". # -*- coding: utf-8 -*- from ctypes inport import pythoncom import pyHook import win32clipboard user32 = winddll.user

  • 对python中list的拷贝与numpy的array的拷贝详解

    1.python中列表list的拷贝,会有什么需要注意的呢? python变量名相当于标签名. list2=list1 ,直接赋值,实质上指向的是同一个内存值.任意一个变量list1(或list2)发生改变,都会影响另一个list2(或list1). eg: >>> list1=[1,2,3,4,5,6] >>> list2=list1 >>> list1[2]=88 >>> list1 [1, 2, 88, 4, 5, 6] >

  • python 实现提取某个索引中某个时间段的数据方法

    如下所示: from elasticsearch import Elasticsearch import datetime import time import dateutil.parser class App(object): def __init__(self): pass def _es_conn(self): es = Elasticsearch() return es def get_data(self, day,start,end): index_ = "gather-apk-20

  • Python实现DDos攻击实例详解

    SYN 泛洪攻击 SYN泛洪攻击是一种比较常用的Dos方式之一.通过发送大量伪造的 TCP 连接请求,使被攻击主机资源耗尽(通常是 CPU 满负荷或内存不足)的攻击方式 我们都知道建立 TCP 连接需要三次握手.正常情况下客户端首先向服务器端发送SYN报文,随后服务端返回以SYN+ACK报文,最后客户端向服务端发送ACK报文完成三次握手 而SYN泛洪攻击则是客户端向服务器发送SYN报文之后就不再响应服务器回应的报文.由于服务器在处理 TCP 请求时,会在协议栈留一块缓冲区来存储握手的过程,当然如

随机推荐