python脚本使用阿里云slb对恶意攻击进行封堵的实现

环境准备:

1.安装python3.7和相关的依赖

并安装redis缓存数据库

pip install aliyun-python-sdk-core
pip install aliyun-python-sdk-slb
pip intall IPy
pip intall redis
pip intall paramiko

2.添加ram访问控制的编程接口用户

3.添加slb的访问控制策略并和需要频控的slb进行绑定

redis封堵ip的格式

脚本程序目录

Aliyun_SLB_Manager
├── helpers
│   ├── common.py
│   ├── email.py
│   ├── remote.py
│   └── slb.py
├── logs
│   └── run_20210204.log
└── run.py

# 程序核心就是使用shell命令对nginx的日志中出现的ip地址 和 访问的接口进行过滤,找出访问频繁的那些程序加入slb黑名单,同时加入redis缓存,因为slb有封堵ip个数限制,redis中存储的ip需要设置过期时间,对比后删除slb中封堵的Ip

# grep 04/Feb/2021:15:4 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -200
  2454 114.248.45.15
  1576 47.115.122.23
  1569 47.107.239.148
  269 112.32.217.52

grep 04/Feb/2021:14:5 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | awk -F ':' '{print $2}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >15)print $1,$2}'

[root@alisz-edraw-api-server-web01:~]# grep 04/Feb/2021:15:4 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -3
  2454 114.248.45.15
  1576 47.115.122.23
  1569 47.107.239.148

python脚本
主入口程序

run.py

import time
from helpers.email import send_mail
from helpers.remote import get_black_ips
from helpers.common import is_white_ip,get_ban_ip_time,set_ban_ip_time,groups
from helpers.slb import slb_add_host,slb_del_host,slb_get_host

if __name__ == "__main__":
  # aliyun 访问控制针对 slb 的管理用户
  # 用户登录名称 slb-frequency-user@xxx.onaliyun.com
  accessKeyId = 'id'
  accessSecret = 'pass'

  # slb 访问控制策略id
  acl_id = 'acl-slb'
  # reginid 查询地址:https://help.aliyun.com/document_detail/40654.html?spm=a2c6h.13066369.0.0.54a17471VmN3kA
  region_id = 'cn-shenzhen'
  # 黑名单限制个数 300
  slb_limit = 200
  # 每10分钟访问限制阈值
  threshold = 50
  # 接收邮箱
  mails = ['reblue520@chinasoft.cn']

  # 远程ssh执行grep过滤出可疑ip
  res = get_black_ips(threshold)
  deny_host_list = res[0]
  hosts_with_count = res[1]
  hosts_with_count = sorted(hosts_with_count.items(), key=lambda x: x[1] , reverse=True)
  print(hosts_with_count)
  # exit()
  # 等待被ban的ip , 过滤掉ip白名单
  deny_hosts = []
  for host in deny_host_list:
    if (is_white_ip(host) == False):
      deny_hosts.append(host + '/32')

  # 获取所有已经被ban的ip
  response = slb_get_host(accessKeyId , accessSecret , acl_id , region_id)
  denied_hosts = []
  if('AclEntrys' in response.keys()):
    for item in response['AclEntrys']['AclEntry']:
      denied_hosts.append(item['AclEntryIP'])

  # 被ban超过2天,首先移除
  must_del_hosts = []
  denied_hosts_clone = denied_hosts.copy()
  for host in denied_hosts:
    if (get_ban_ip_time(host) == 0 or (get_ban_ip_time(host) < int(round(time.time())) - 2* 24 * 3600)):
      must_del_hosts.append(host)
      denied_hosts_clone.remove(host)

  # 排除相同的
  deny_hosts_new = []
  for item in deny_hosts:
    if(item not in denied_hosts_clone):
      deny_hosts_new.append(item)

  # 两者和超过300的限制
  if((len(denied_hosts_clone)+len(deny_hosts_new))>slb_limit):
    denied_hosts_detail = {}
    for host in denied_hosts_clone:
      denied_hosts_detail[host] = get_ban_ip_time(host)
    # 需要排除的数量
    num = len(denied_hosts_clone) + len(deny_hosts_new) - slb_limit
    denied_hosts_detail = sorted(denied_hosts_detail.items(), key=lambda x: x[1])
    denied_hosts_detail = denied_hosts_detail[:num]
    for item in denied_hosts_detail:
      must_del_hosts.append(item[0])

  print("denied:",denied_hosts)
  print("delete:",must_del_hosts)
  print("add:",deny_hosts_new)
  # exit()
  # 先删除一部分 must_del_hosts
  if(len(must_del_hosts)>0):
    if (len(must_del_hosts)>50):
      must_del_hosts_clone = groups(must_del_hosts,50)
      for item in must_del_hosts_clone:
        slb_del_host(item, accessKeyId, accessSecret, acl_id, region_id)
        time.sleep(1)
    else :
      slb_del_host(must_del_hosts, accessKeyId, accessSecret, acl_id, region_id)

  # 再新增 deny_hosts_new
  if(len(deny_hosts_new)>0):
    if(len(deny_hosts_new)>50):
      deny_hosts_new_clone = groups(deny_hosts_new,50)
      for item in deny_hosts_new_clone:
        slb_add_host(item, accessKeyId, accessSecret, acl_id, region_id)
        time.sleep(1)
    else:
      slb_add_host(deny_hosts_new, accessKeyId, accessSecret, acl_id, region_id)

  # 记录ip被禁时间
  for host in deny_hosts_new:
    set_ban_ip_time(host)

  if (len(deny_hosts_new) >= 1):
    mail_content = ''
    if(len(must_del_hosts) > 0):
      mail_content += "以下黑名单已被解禁("+str(len(must_del_hosts))+"):\n"+"\n".join(must_del_hosts) + "\n"
    mail_content += "\n新增以下ip黑名单("+str(len(deny_hosts_new))+"):\n"+"\n".join(deny_hosts_new)
    mail_content += "\n\n10分钟访问超过15次("+str(len(hosts_with_count))+"):\n"
    for item in hosts_with_count:
      mail_content += str(item[1]) + " " + str(item[0]) + "\n"
    mail_content += "\n\n黑名单("+str(len(denied_hosts))+"个):\n"
    for item in denied_hosts:
      mail_content += str(item) + "\n"
    send_mail(mail_content , mails)

slb操作相关的脚本
slb.py

import logging , json

from aliyunsdkcore.client import AcsClient
from aliyunsdkslb.request.v20140515.AddAccessControlListEntryRequest import AddAccessControlListEntryRequest
from aliyunsdkslb.request.v20140515.RemoveAccessControlListEntryRequest import RemoveAccessControlListEntryRequest
from aliyunsdkslb.request.v20140515.DescribeAccessControlListAttributeRequest import DescribeAccessControlListAttributeRequest

# 阿里云slb访问控制里添加ip
def slb_add_host(hosts, accessKeyId, accessSecret, acl_id, region_id):
  client = AcsClient(accessKeyId, accessSecret, region_id)
  request = AddAccessControlListEntryRequest()
  request.set_accept_format('json')
  logging.info("正在封印IP:%s" % ",".join(hosts))

  try:
    add_hosts = []
    for host in hosts:
      add_hosts.append({"entry": host, "comment": "deny"})

    request.set_AclEntrys(add_hosts)
    request.set_AclId(acl_id)
    response = client.do_action_with_exception(request)
    print(response)
  except BaseException as e:
    logging.error("添加黑名单失败,原因:%s" % e)

# slb删除ip
def slb_del_host(hosts, accessKeyId, accessSecret, acl_id , region_id = 'us-west-1'):
  logging.info("正在解封IP:%s" % ",".join(hosts))
  try:
    del_hosts = []
    for host in hosts:
      del_hosts.append({"entry": host, "comment": "deny"})

    client = AcsClient(accessKeyId, accessSecret, region_id)
    request = RemoveAccessControlListEntryRequest()
    request.set_accept_format('json')
    request.set_AclEntrys(del_hosts)
    request.set_AclId(acl_id)

    client.do_action_with_exception(request)
    logging.info("slb删除IP:%s成功" % ",".join(hosts)) # 查看调用接口结果
    logging.info("slb删除IP:%s成功" % ",".join(hosts)) # 查看调用接口结果
  except BaseException as e:
    logging.error("移出黑名单失败,原因:%s" % e)

# 阿里云slb获取IP黑名单列表
def slb_get_host(accessKeyId, accessSecret, acl_id, region_id):
  client = AcsClient(accessKeyId, accessSecret, region_id)
  request = DescribeAccessControlListAttributeRequest()
  request.set_accept_format('json')

  try:
    request.set_AclId(acl_id)
    response = client.do_action_with_exception(request)
    data_sub = json.loads((response.decode("utf-8")))
    return data_sub
  except BaseException as e:
    logging.error("获取黑名单失败,原因:%s" % e)

远程操作日志的脚本
remote.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import datetime
import re
import paramiko

def get_black_ips(threshold = 100):
  # file = '/data/www/logs/nginx_log/access/*api*_access.log'
  file = '/data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log'
  # 可以 ssh 访问服务器 nginx 日志的用户信息
  username = 'apache'
  passwd = 'pass'

  ten_min_time = (datetime.datetime.now() - datetime.timedelta(minutes=10)).strftime("%d/%b/%Y:%H:%M")
  ten_min_time = ten_min_time[:-1]

  # 线上 需要对日志进行过滤的目标服务器,一般是内网ip,本地调试时可以直接使用外网ip方便调试
  ssh_hosts = ['1.1.1.1']
  deny_host_list = []
  for host in ssh_hosts:

    '''
    # 过滤日志文件,需要显示如下效果,次数 ip地址,需要定位具体的api接口,否则误伤率极高
    # grep 04/Feb/2021:15:2 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -5 | awk '{if ($1 >15)print $1,$2}'
    2998 116.248.89.2
    2381 114.248.45.15
    1639 47.107.239.148
    1580 47.115.122.23
    245 59.109.149.45
    '''
    shell = (
          # "grep %s %s | grep '/index.php?submod=checkout&method=index&pid' | awk '{print $1}' | awk -F ':' '{print $2}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >15)print $1,$2}'") % (
          # grep 04/Feb/2021:14:5 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api/user' | awk '{print $1}' | awk -F ':' '{print $2}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >15)print $1,$2}'
          "grep %s %s | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >2000)print $1,$2}'") % (
          ten_min_time, file)
    print(shell)
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(host, port=2020, username=username, password=passwd)
    stdin, stdout, stderr = ssh.exec_command(shell)
    result = stdout.read().decode(encoding="utf-8")
    deny_host_re = re.compile(r'\d{1,99} \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}')
    deny_host_re = deny_host_re.findall(result)
    deny_host_list = deny_host_list + deny_host_re

  uniq_host = {}
  for host_str in deny_host_list:
    tmp = host_str.split(' ')
    if tmp[1] in uniq_host:
      uniq_host[tmp[1]] += int(tmp[0])
    else:
      uniq_host[tmp[1]] = int(tmp[0])

  deny_host_list = []
  for v in uniq_host:
    if (uniq_host[v] > threshold):
      deny_host_list.append(v)

  return [deny_host_list , uniq_host]

发送邮件的脚本
email.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import smtplib
from email.mime.text import MIMEText
from email.header import Header
import logging

def send_mail(host , receivers):
  # 发送邮件的服务器,用户信息
  mail_host = "smtpdm-ap-southeast-1.aliyun.com"
  mail_user = "admin@mail.chinasoft.com"
  mail_pass = "pass"

  sender = 'admin@mail.chinasoft.com'

  message = MIMEText('chinasoft国内接口被刷,单个IP最近10分钟内访问超过阈值100次会收到此邮件告警!!!!\n%s' % (host), 'plain', 'utf-8')
  message['From'] = Header("chinasoft国内接口被刷", 'utf-8')

  subject ='[DDOS]购买链接接口异常链接!!'
  message['Subject'] = Header(subject, 'utf-8')

  try:
    smtpObj = smtplib.SMTP(mail_host, 80)
    smtpObj.login(mail_user, mail_pass)
    smtpObj.sendmail(sender, receivers, message.as_string())
    logging.info("邮件发送成功")
  except smtplib.SMTPException as e:
    logging.error("发送邮件失败,原因:%s" % e)

配置文件
common.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import IPy
from functools import reduce
import redis,time

def groups(L1,len1):
  groups=zip(*(iter(L1),)*len1)
  L2=[list(i) for i in groups]
  n=len(L1) % len1
  L2.append(L1[-n:]) if n !=0 else L2
  return L2

def ip_into_int(ip):
  return reduce(lambda x, y: (x << 8) + y, map(int, ip.split('.')))

# 过滤掉内网ip
def is_internal_ip(ip):
  ip = ip_into_int(ip)
  net_a = ip_into_int('10.255.255.255') >> 24
  net_b = ip_into_int('172.31.255.255') >> 20
  net_c = ip_into_int('192.168.255.255') >> 16
  return ip >> 24 == net_a or ip >> 20 == net_b or ip >> 16 == net_c

# 是否为白名单ip (公司内网+集群内网ip+slb和需要互访的服务器ip避免误杀)
def is_white_ip(ip):
  if (is_internal_ip(ip)):
    return True
  white_hosts = [
    # web-servers
    '1.1.1.1',
    '1.1.1.2',
  ];
  for white in white_hosts:
    if (ip in IPy.IP(white)):
      return True
  return False

def get_ban_ip_time(ip):
  pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=1)
  client = redis.Redis(connection_pool=pool)
  key = 'slb_ban_'+ip
  val = client.get(key)
  if val == None:
    return 0
  else :
    return int(val)

def set_ban_ip_time(ip):
  pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=1)
  client = redis.Redis(connection_pool=pool)
  key = 'slb_ban_'+ip
  timestamp = time.time()
  timestamp = int(round(timestamp))
  return client.set(key , timestamp , 86400)

本地可以直接运行run.py进行调试

到此这篇关于python脚本使用阿里云slb对恶意攻击进行封堵的实现的文章就介绍到这了,更多相关python脚本阿里云slb内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 利用Python+阿里云实现DDNS动态域名解析的方法

    引子 我想大家应该都很熟悉DNS了,这回在DNS前面加了一个D又变成了什么呢?这个D就是Dynamic(动态),也就是说,按照传统,一个域名所对应的IP地址应该是定死的,而使用了DDNS后,域名所对应的IP是可以动态变化的.那这个有什么用呢? 比如,在家里的路由器上连着一个raspberry pi(树莓派),上面跑着几个网站,我应该如和在外网环境下访问网站.登陆树莓派的SSH呢? 还有,家里的NAS(全称Network Attach Storage 网络附属存储,可以理解为私有的百度网盘)上存储

  • Python3编程实现获取阿里云ECS实例及监控的方法

    本文实例讲述了Python3编程实现获取阿里云ECS实例及监控的方法.分享给大家供大家参考,具体如下: #!/usr/bin/env python3.5 # -*- coding:utf8 -*- try: import httplib except ImportError: import http.client as httplib import sys,datetime import urllib import urllib.request import urllib.error impor

  • 在阿里云服务器上配置CentOS+Nginx+Python+Flask环境

    项目运行环境 阿里云(单核CPU, 1G内存, Ubuntu 14.04 x64 带宽1Mbps), 具体购买和ssh连接阿里云本文不做描述. 实用工具 首先进入阿里云后先要升级下apt-get, 并下载所需软件 sudo apt-get update sudo apt-get install vim git wget tmux 我还会使用zsh和oh-my-zsh来替换bash sudo apt-get install zsh # 终端下打以下命令 wget --no-check-certif

  • 阿里云 CentOS7.4 安装 Python3.6的方法讲解

    1.到python官网 https://www.python.org查找最新的原码,我使用的,Python-3.6.3 2.mkdir /home/soft 创建软件管理目录. 3.cd /home/soft/ 进入soft目录,因为我下一步是下载python的源码 4.wget https://www.python.org/ftp/python/3.6.3/Python-3.6.3.tgz 下载Python-3.6.3的源码到/home/soft/里,如果没有安装wget 请执行下面的命令,

  • python脚本使用阿里云slb对恶意攻击进行封堵的实现

    环境准备: 1.安装python3.7和相关的依赖 并安装redis缓存数据库 pip install aliyun-python-sdk-core pip install aliyun-python-sdk-slb pip intall IPy pip intall redis pip intall paramiko 2.添加ram访问控制的编程接口用户 3.添加slb的访问控制策略并和需要频控的slb进行绑定 redis封堵ip的格式 脚本程序目录 Aliyun_SLB_Manager ├─

  • Python脚本修改阿里云的访问控制列表的方法

    需求 对于部署在阿里云上的重要系统一般是不让其他人访问的,所以会在负载均衡(SLB)上加上访问控制列表.而使用ASDL拨号上网的宽带来说一般公网IP都不会固定的,会随时变更公网IP,所以此脚本正是解决此需求. 说明 脚本运行前需要先安装aliyun-python-sdk-core 和aliyun-python-sdk-slb 2个sdk,并且在阿里云账户里面创建access_key和access_secret. 脚本会查询到目前的公网IP,如何创建本地一个文件将IP记录到文件里,下次执行时会将查

  • 利用Python抓取阿里云盘资源

    目录 网页分析 抓取与解析 模板 完整代码 总结 前阵子阿里云盘大火,送了好多的容量空间.而且阿里云盘下载是不限速,这点比百度网盘好太多了.这两天看到一个第三方网站可以搜索阿里云盘上的资源,但是它的资源顺序不是按时间排序的.这种情况会造成排在前面时间久远的资源是一个已经失效的资源.小编这里用 python 抓取后重新排序. 网页分析 这个网站有两个搜索路线:搜索线路一和搜索线路二,本文章使用的是搜索线路二. 打开控制面板下的网络,一眼就看到一个 seach.html 的 get 请求. 上面带了

  • python实现监控阿里云账户余额功能

    背景 由于阿里云oss,cdn消耗钱的速度比较快,在不知道的情况下,服务就被停了,影响比较大.所以想做个监控.百度一下阿里云账户余额 api 还真有:于是开启了踩坑之路. 查阅资料创建accessKeyId和accessKeySecret 官方文档(感觉并不细致) https://help.aliyun.com/document_detail/87997.html?spm=a2c6h.13066369.0.0.59e4581eaxXH1O sdk https://developer.aliyun

  • 阿里云主机一键安装lamp、lnmp环境的shell脚本分享

    阿里云主机一键安装lamp,lnmp,自动安装脚本,由阿里云主机分享 一键安装包下载地址:点击下载 1.阿里云分享的一键安装lamp,lnmp,此安装包包含的软件及版本为: 复制代码 代码如下: nginx:1.0.15.1.2.5.1.4.4 apache:2.2.22.2.4.2 mysql:5.1.73.5.5.35.5.6.15 php:5.3.18.5.4.23.5.5.7 php扩展:memcache.Zend Engine/ OPcache ftp:(yum/apt-get安装)

  • 关于阿里云oss获取sts凭证 app直传 python的实例

    首先安装需要的模块 pip install aliyun-python-sdk-core pip install aliyun-python-sdk-sts 获取需要的5个参数 getsts.py from aliyunsdkcore import client from aliyunsdkcore.profile import region_provider from aliyunsdksts.request.v20150401 import AssumeRoleRequest import

  • 阿里云负载均衡SLB安装SSL证书的方法

    获取证书文件 1.登陆用户中心(如何登陆用户中心?)获取SSL证书(证书管理系统现在有2个版本并存[根据购买SSL证书品牌,类型等随机分配],证书管理系统版本不同,获取证书方式略有差异,最终得到的证书是没有区别的,请按照您现行使用的证书系统版本获取证书文件.) 1-1. 版本1-如何获取SSL证书文件:点击这里 ,找到如下图所示页面,请把SSL证书文件(包括"-----BEGIN CERTIFICATE-----"和"-----END CERTIFICATE-----&quo

  • python 阿里云oss实现直传签名与回调验证的示例方法

    签名 import base64 import json import time from datetime import datetime import hmac from hashlib import sha1 access_key_id = '' # 请填写您的AccessKeySecret. access_key_secret = '' # host的格式为 bucketname.endpoint ,请替换为您的真实信息. host = '' # callback_url为 上传回调服务

随机推荐