C#请求唯一性校验支持高并发的实现方法

使用场景描述:

  网络请求中经常会遇到发送的请求,服务端响应是成功的,但是返回的时候出现网络故障,导致客户端无法接收到请求结果,那么客户端程序可能判断为网络故障,而重复发送同一个请求。当然如果接口中定义了请求结果查询接口,那么这种重复会相对少一些。特别是交易类的数据,这种操作更是需要避免重复发送请求。另外一种情况是用户过于快速的点击界面按钮,产生连续的相同内容请求,那么后端也需要进行过滤,这种一般出现在系统对接上,无法去控制第三方系统的业务逻辑,需要从自身业务逻辑里面去限定。

其他需求描述:

  这类请求一般存在时间范围和高并发的特点,就是短时间内会出现重复的请求,因此对模块需要支持高并发性。

技术实现:

  对请求的业务内容进行MD5摘要,并且将MD5摘要存储到缓存中,每个请求数据都通过这个一个公共的调用的方法进行判断。

代码实现:

  公共调用代码 UniqueCheck 采用单例模式创建唯一对象,便于在多线程调用的时候,只访问一个统一的缓存库

/*
     * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。
     * 它是被设计用来修饰被不同线程访问和修改的变量。
     * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
     */
    private static readonly object lockHelper = new object();

    private volatile static UniqueCheck _instance;    

    /// <summary>
    /// 获取单一实例
    /// </summary>
    /// <returns></returns>
    public static UniqueCheck GetInstance()
    {
      if (_instance == null)
      {
        lock (lockHelper)
        {
          if (_instance == null)
            _instance = new UniqueCheck();
        }
      }
      return _instance;
    }

  这里需要注意volatile的修饰符,在实际测试过程中,如果没有此修饰符,在高并发的情况下会出现报错。

  自定义一个可以进行并发处理队列,代码如下:ConcurrentLinkedQueue

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace PackgeUniqueCheck
{
  /// <summary>
  /// 非加锁并发队列,处理100个并发数以内
  /// </summary>
  /// <typeparam name="T"></typeparam>
  public class ConcurrentLinkedQueue<T>
  {
    private class Node<K>
    {
      internal K Item;
      internal Node<K> Next;

      public Node(K item, Node<K> next)
      {
        this.Item = item;
        this.Next = next;
      }
    }

    private Node<T> _head;
    private Node<T> _tail;

    public ConcurrentLinkedQueue()
    {
      _head = new Node<T>(default(T), null);
      _tail = _head;
    }

    public bool IsEmpty
    {
      get { return (_head.Next == null); }
    }
    /// <summary>
    /// 进入队列
    /// </summary>
    /// <param name="item"></param>
    public void Enqueue(T item)
    {
      Node<T> newNode = new Node<T>(item, null);
      while (true)
      {
        Node<T> curTail = _tail;
        Node<T> residue = curTail.Next;

        //判断_tail是否被其他process改变
        if (curTail == _tail)
        {
          //A 有其他process执行C成功,_tail应该指向新的节点
          if (residue == null)
          {
            //C 其他process改变了tail节点,需要重新取tail节点
            if (Interlocked.CompareExchange<Node<T>>(
             ref curTail.Next, newNode, residue) == residue)
            {
              //D 尝试修改tail
              Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail);
              return;
            }
          }
          else
          {
            //B 帮助其他线程完成D操作
            Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail);
          }
        }
      }
    }
    /// <summary>
    /// 队列取数据
    /// </summary>
    /// <param name="result"></param>
    /// <returns></returns>
    public bool TryDequeue(out T result)
    {
      Node<T> curHead;
      Node<T> curTail;
      Node<T> next;
      while (true)
      {
        curHead = _head;
        curTail = _tail;
        next = curHead.Next;
        if (curHead == _head)
        {
          if (next == null) //Queue为空
          {
            result = default(T);
            return false;
          }
          if (curHead == curTail) //Queue处于Enqueue第一个node的过程中
          {
            //尝试帮助其他Process完成操作
            Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail);
          }
          else
          {
            //取next.Item必须放到CAS之前
            result = next.Item;
            //如果_head没有发生改变,则将_head指向next并退出
            if (Interlocked.CompareExchange<Node<T>>(ref _head,
             next, curHead) == curHead)
              break;
          }
        }
      }
      return true;
    }
    /// <summary>
    /// 尝试获取最后一个对象
    /// </summary>
    /// <param name="result"></param>
    /// <returns></returns>
    public bool TryGetTail(out T result)
    {
      result = default(T);
      if (_tail == null)
      {
        return false;
      }
      result = _tail.Item;
      return true;
    }
  }
}

虽然是一个非常简单的唯一性校验逻辑,但是要做到高效率,高并发支持,高可靠性,以及低内存占用,需要实现这样的需求,需要做细致的模拟测试。

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Collections;

namespace PackgeUniqueCheck
{
  public class UniqueCheck
  {
    /*
     * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。
     * 它是被设计用来修饰被不同线程访问和修改的变量。
     * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
     */
    private static readonly object lockHelper = new object();

    private volatile static UniqueCheck _instance;    

    /// <summary>
    /// 获取单一实例
    /// </summary>
    /// <returns></returns>
    public static UniqueCheck GetInstance()
    {
      if (_instance == null)
      {
        lock (lockHelper)
        {
          if (_instance == null)
            _instance = new UniqueCheck();
        }
      }
      return _instance;
    }

    private UniqueCheck()
    {
      //创建一个线程安全的哈希表,作为字典缓存
      _DataKey = Hashtable.Synchronized(new Hashtable());
      Queue myqueue = new Queue();
      _DataQueue = Queue.Synchronized(myqueue);
      _Myqueue = new ConcurrentLinkedQueue<string>();
      _Timer = new Thread(DoTicket);
      _Timer.Start();
    }

    #region 公共属性设置
    /// <summary>
    /// 设定定时线程的休眠时间长度:默认为1分钟
    /// 时间范围:1-7200000,值为1毫秒到2小时
    /// </summary>
    /// <param name="value"></param>
    public void SetTimeSpan(int value)
    {
      if (value > 0&& value <=7200000)
      {
        _TimeSpan = value;
      }
    }
    /// <summary>
    /// 设定缓存Cache中的最大记录条数
    /// 值范围:1-5000000,1到500万
    /// </summary>
    /// <param name="value"></param>
    public void SetCacheMaxNum(int value)
    {
      if (value > 0 && value <= 5000000)
      {
        _CacheMaxNum = value;
      }
    }
    /// <summary>
    /// 设置是否在控制台中显示日志
    /// </summary>
    /// <param name="value"></param>
    public void SetIsShowMsg(bool value)
    {
      Helper.IsShowMsg = value;
    }
    /// <summary>
    /// 线程请求阻塞增量
    /// 值范围:1-CacheMaxNum,建议设置为缓存最大值的10%-20%
    /// </summary>
    /// <param name="value"></param>
    public void SetBlockNumExt(int value)
    {
      if (value > 0 && value <= _CacheMaxNum)
      {
        _BlockNumExt = value;
      }
    }
    /// <summary>
    /// 请求阻塞时间
    /// 值范围:1-max,根据阻塞增量设置请求阻塞时间
    /// 阻塞时间越长,阻塞增量可以设置越大,但是请求实时响应就越差
    /// </summary>
    /// <param name="value"></param>
    public void SetBlockSpanTime(int value)
    {
      if (value > 0)
      {
        _BlockSpanTime = value;
      }
    }
    #endregion

    #region 私有变量
    /// <summary>
    /// 内部运行线程
    /// </summary>
    private Thread _runner = null;
    /// <summary>
    /// 可处理高并发的队列
    /// </summary>
    private ConcurrentLinkedQueue<string> _Myqueue = null;
    /// <summary>
    /// 唯一内容的时间健值对
    /// </summary>
    private Hashtable _DataKey = null;
    /// <summary>
    /// 内容时间队列
    /// </summary>
    private Queue _DataQueue = null;
    /// <summary>
    /// 定时线程的休眠时间长度:默认为1分钟
    /// </summary>
    private int _TimeSpan = 3000;
    /// <summary>
    /// 定时计时器线程
    /// </summary>
    private Thread _Timer = null;
    /// <summary>
    /// 缓存Cache中的最大记录条数
    /// </summary>
    private int _CacheMaxNum = 500000;
    /// <summary>
    /// 线程请求阻塞增量
    /// </summary>
    private int _BlockNumExt = 10000;
    /// <summary>
    /// 请求阻塞时间
    /// </summary>
    private int _BlockSpanTime = 100;
    #endregion

    #region 私有方法
    private void StartRun()
    {
      _runner = new Thread(DoAction);
      _runner.Start();
      Helper.ShowMsg("内部线程启动成功!");
    }

    private string GetItem()
    {
      string tp = string.Empty;
      bool result = _Myqueue.TryDequeue(out tp);
      return tp;
    }
    /// <summary>
    /// 执行循环操作
    /// </summary>
    private void DoAction()
    {
      while (true)
      {
        while (!_Myqueue.IsEmpty)
        {
          string item = GetItem();
          _DataQueue.Enqueue(item);
          if (!_DataKey.ContainsKey(item))
          {
            _DataKey.Add(item, DateTime.Now);
          }
        }
        //Helper.ShowMsg("当前数组已经为空,处理线程进入休眠状态...");
        Thread.Sleep(2);
      }
    }
    /// <summary>
    /// 执行定时器的动作
    /// </summary>
    private void DoTicket()
    {
      while (true)
      {
        Helper.ShowMsg("当前数据队列个数:" + _DataQueue.Count.ToString());
        if (_DataQueue.Count > _CacheMaxNum)
        {
          while (true)
          {
            Helper.ShowMsg(string.Format("当前队列数:{0},已经超出最大长度:{1},开始进行清理操作...", _DataQueue.Count, _CacheMaxNum.ToString()));
            string item = _DataQueue.Dequeue().ToString();
            if (!string.IsNullOrEmpty(item))
            {
              if (_DataKey.ContainsKey(item))
              {
                _DataKey.Remove(item);
              }
              if (_DataQueue.Count <= _CacheMaxNum)
              {
                Helper.ShowMsg("清理完成,开始休眠清理线程...");
                break;
              }
            }
          }
        }
        Thread.Sleep(_TimeSpan);
      }
    }

    /// <summary>
    /// 线程进行睡眠等待
    /// 如果当前负载压力大大超出了线程的处理能力
    /// 那么需要进行延时调用
    /// </summary>
    private void BlockThread()
    {
      if (_DataQueue.Count > _CacheMaxNum + _BlockNumExt)
      {
        Thread.Sleep(_BlockSpanTime);
      }
    }
    #endregion

    #region 公共方法
    /// <summary>
    /// 开启服务线程
    /// </summary>
    public void Start()
    {
      if (_runner == null)
      {
        StartRun();
      }
      else
      {
        if (_runner.IsAlive == false)
        {
          StartRun();
        }
      }

    }
    /// <summary>
    /// 关闭服务线程
    /// </summary>
    public void Stop()
    {
      if (_runner != null)
      {
        _runner.Abort();
        _runner = null;
      }
    }

    /// <summary>
    /// 添加内容信息
    /// </summary>
    /// <param name="item">内容信息</param>
    /// <returns>true:缓存中不包含此值,队列添加成功,false:缓存中包含此值,队列添加失败</returns>
    public bool AddItem(string item)
    {
      BlockThread();
      item = Helper.MakeMd5(item);
      if (_DataKey.ContainsKey(item))
      {
        return false;
      }
      else
      {
        _Myqueue.Enqueue(item);
        return true;
      }
    }
    /// <summary>
    /// 判断内容信息是否已经存在
    /// </summary>
    /// <param name="item">内容信息</param>
    /// <returns>true:信息已经存在于缓存中,false:信息不存在于缓存中</returns>
    public bool CheckItem(string item)
    {
      item = Helper.MakeMd5(item);
      return _DataKey.ContainsKey(item);
    }
    #endregion  

  }
}

模拟测试代码:

private static string _example = Guid.NewGuid().ToString();

    private static UniqueCheck _uck = null;

    static void Main(string[] args)
    {
      _uck = UniqueCheck.GetInstance();
      _uck.Start();
      _uck.SetIsShowMsg(false);
      _uck.SetCacheMaxNum(20000000);
      _uck.SetBlockNumExt(1000000);
      _uck.SetTimeSpan(6000);

      _uck.AddItem(_example);
      Thread[] threads = new Thread[20];

      for (int i = 0; i < 20; i++)
      {
        threads[i] = new Thread(AddInfo);
        threads[i].Start();
      }

      Thread checkthread = new Thread(CheckInfo);
      checkthread.Start();

      string value = Console.ReadLine();

      checkthread.Abort();
      for (int i = 0; i < 50; i++)
      {
        threads[i].Abort();
      }
      _uck.Stop();
    }

    static void AddInfo()
    {
      while (true)
      {
        _uck.AddItem(Guid.NewGuid().ToString());
      }
    }

    static void CheckInfo()
    {
      while (true)
      {
        Console.WriteLine("开始时间:{0}...", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
        Console.WriteLine("插入结果:{0}", _uck.AddItem(_example));
        Console.WriteLine("结束时间:{0}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
          //调整进程休眠时间,可以测试高并发的情况
        //Thread.Sleep(1000);
      }

    }

测试截图:

总结

以上就是我在处理客户端真实IP的方法,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对我们的支持。

(0)

相关推荐

  • C#编程高并发的几种处理方法详解

    并发(英文Concurrency),其实是一个很泛的概念,字面意思就是"同时做多件事",不过方式有所不同.在.NET的世界里面,处理高并发大致有以下几种方法: 1.异步编程 异步编程就是使用future模式(又称promise)或者回调机制来实现(Non-blocking on waiting).如果使用回调或事件来实现(容易callback hell),不仅编写这样的代码不直观,很快就容易把代码搞得一团糟. 不过在.NET 4.5 及以上框架中引入的async/await关键字(在.

  • c#编写的高并发数据库控制访问代码

    代码的作用在于保证在上端缓存服务失效(一般来说概率比较低)时,形成倒瓶颈,从而能够保护数据库,数据库宕了,才是大问题(比如影响其他应用). 假设(非完全正确数据,仅做示例): 每秒支持10,000,000次查询(千万); 一次读库需要耗时:1ms; 修改内存变量需要耗时:0.001ms; 那么: 每秒最终访问的数据库的请求数量 < 1000 其他的9,900,000个请求会返回到其他页面.这就是为啥很多抢单网站有人可以访问,而有人得到繁忙中页面的原因. 微观到1ms来看,在currentVali

  • C#请求唯一性校验支持高并发的实现方法

    使用场景描述: 网络请求中经常会遇到发送的请求,服务端响应是成功的,但是返回的时候出现网络故障,导致客户端无法接收到请求结果,那么客户端程序可能判断为网络故障,而重复发送同一个请求.当然如果接口中定义了请求结果查询接口,那么这种重复会相对少一些.特别是交易类的数据,这种操作更是需要避免重复发送请求.另外一种情况是用户过于快速的点击界面按钮,产生连续的相同内容请求,那么后端也需要进行过滤,这种一般出现在系统对接上,无法去控制第三方系统的业务逻辑,需要从自身业务逻辑里面去限定. 其他需求描述: 这类

  • go高并发时append方法偶现错误解决分析

    目录 背景 排查问题 解决问题 背景 在实现图片转码的需求时,需要支持最大 500 个图片下载后转换格式: 如果是一个一个下载后转码,耗时太长,需要使用 goroutine 实现 500 个图片并发下载后,并发转码: 但自测过程中发现,会偶现下载后只转换了 499 个图片或更少的情况(全部下载.转码成功的条件下): 然后就开始了打印日志找 bug 的过程. 排查问题 因为并发时使用到了 sync 等待全部协程结束,起初以为是 sync 异步等待出了问题: 打印日志发现,正常执行了 500 次下载

  • IIS Web服务器支持高并发设置方法详解

    适用的IIS版本:IIS 7.0, IIS 7.5, IIS 8.0 适用的Windows版本:Windows Server 2008, Windows Server 2008 R2, Windows Server 2012 1.应用程序池(Application Pool)的设置: General->Queue Length设置为65535(队列长度所支持的最大值)Process Model->Idle Time-out设置为0(不让应用程序池因为没有请求而回收)Recycling->

  • 用于App服务端的MySQL连接池(支持高并发)

    本文向大家介绍了简单的MySQL连接池,用于App服务端比较合适,分享给大家供大家参考,具体内容如下 /** * 连接池类 */ package com.junones.test; import java.sql.Connection; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import com.mysql.jdbc.jdb

  • Java 高并发编程之最实用的任务执行架构设计建议收藏

    目录 前言 1.业务架构 2.技术架构 3.物理架构 高并发任务执行架构 需求场景 业务架构设计 技术架构设计 初始设计 演化阶段一 演化阶段二 演化阶段三 代码设计 总结 前言 随着互联网与软件的发展,除了程序员,架构师也是越来越火的职业.他们伴随着项目的整个生命过程,他们更像是传统工业的设计师,将项目当做生命一般细心雕琢. 目前对于项目架构而言,基本都会需要设计的几个架构. 1.业务架构 项目或者产品的市场定位.需求范围.作用场景都是需要在项目启动初期进行系统性分析的.在设计业务架构中,架构

  • 详解node单线程实现高并发原理与node异步I/O

    一.node单线程实现高并发原理 众所周知nodejs是单线程且支持高并发的脚本语言.可为什么单线程的nodejs可以支持高并发呢?很多人都不明白其原理,下面我来谈谈我的理解: 1. node的优点:I/O密集型处理是node的强项,因为node的I/O请求都是异步的(如:sql查询请求.文件流操作操作请求.http请求...) a. 什么是异步? 异步:发出操作指令,然后就可以去做别的事情了,所有操作完成后再执行回调 异步的实现原理: // 第一步:定义变量 let a = 1; // 第二步

  • Linux下高并发socket最大连接数所受的各种限制(详解)

    1.修改用户进程可打开文件数限制 在Linux平台上,无论编写客户端程序还是服务端程序,在进行高并发TCP连接处理时,最高的并发数量都要受到系统对用户单一进程同时可打开文件数量的限制(这是因为系统为每个TCP连接都要创建一个socket句柄,每个socket句柄同时也是一个文件句柄).可使用ulimit命令查看系统允许当前用户进程打开的文件数限制: [speng@as4 ~]$ ulimit -n 1024 这表示当前用户的每个进程最多允许同时打开1024个文件,这1024个文件中还得除去每个进

  • 浅谈Nginx10m+高并发内核优化详解

    何为高并发 默认的Linux内核参数考虑的是最通用场景,不符合用于支持高并发访问的Web服务器,所以需要修改Linux内核参数,这样可以让Nginx拥有更高的性能: 在优化内核时,可以做的事情很多,不过,我们通常会根据业务特点来进行调整,当Nginx作为静态web内容服务器.反向代理或者提供压缩服务器的服务器时,期内核参数的调整都是不同的,这里针对最通用的.使Nginx支持更多并发请求的TCP网络参数做简单的配置: 这些需要修改/etc/sysctl.conf来更改内核参数. 配置方法 配置详析

  • 高并发nginx服务器的linux内核优化配置讲解

    由于默认的linux内核参数考虑的是最通用场景,这明显不符合用于支持高并发访问的Web服务器的定义,所以需要修改Linux内核参数,是的Nginx可以拥有更高的性能: 在优化内核时,可以做的事情很多,不过,我们通常会根据业务特点来进行调整,当Nginx作为静态web内容服务器.反向代理或者提供压缩服务器的服务器时,期内核参数的调整都是不同的,这里针对最通用的.使Nginx支持更多并发请求的TCP网络参数做简单的配置: 以下linux 系统内核优化配置均经在线业务系统测试,并发10万左右服务器运行

  • Linux高并发踩过的坑及性能优化介绍

    目录 前言 Linux应用运行过程中出现Too many open files 问题分析和解决 Linux高并发下 time_wait 过多的问题分析及解决 Linux更多性能优化 小结 前言 Linux操作系统是现在服务器的首选操作系统,在Linux的默认系统参数下,Linux针对高并发的支持性并不是很好.小编从事Linux下应用程序开发多年,关于Linux系统下的高并发,小编自己踩过的坑,及如何解决踩过的坑下面列上几条,供大家参考,避免再次掉坑. Linux应用运行过程中出现Too many

随机推荐