c# 使用Task实现非阻塞式的I/O操作

  在前面的《基于任务的异步编程模式(TAP)》文章中讲述了.net 4.5框架下的异步操作自我实现方式,实际上,在.net 4.5中部分类已实现了异步封装。如在.net 4.5中,Stream类加入了Async方法,所以基于流的通信方式都可以实现异步操作。

1、异步读取文件数据

public static void TaskFromIOStreamAsync(string fileName)
{
  int chunkSize = 4096;
  byte[] buffer = new byte[chunkSize];

  FileStream fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read, chunkSize, true);

  Task<int> task = fileStream.ReadAsync(buffer, 0, buffer.Length);
  task.ContinueWith((readTask) =>
  {
    int amountRead = readTask.Result;
    //必须在ContinueWith中释放文件流
    fileStream.Dispose();
    Console.WriteLine($"Async(Simple) Read {amountRead} bytes");
  });
}

  上述代码中,异步读取数据只读取了一次,完成读取后就将执行权交还主线程了。但在真实场景中,需要从流中读取多次才能获得全部的数据(如文件数据大于给定缓冲区大小,或处理来自网络流的数据(数据还没全部到达机器))。因此,为了完成异步读取操作,需要连续从流中读取数据,直到获取所需全部数据。

  上述问题导致需要两级Task来处理。外层的Task用于全部的读取工作,供调用程序使用。内层的Task用于每次的读取操作。

  第一次异步读取会返回一个Task。如果直接返回调用Wait或者ContinueWith的地方,会在第一次读取结束后继续向下执行。实际上是希望调用者在完成全部读取操作后才执行。因此,不能把第一个Task发布会给调用者,需要一个“伪Task”在完成全部读取操作后再返回。

  上述问题需要使用到TaskCompletionSource<T>类解决,该类可以生成一个用于返回的“伪Task”。当异步读取操作全部完成后,调用其对象的TrySetResult,让Wait或ContinueWith的调用者继续执行。

public static Task<long> AsynchronousRead(string fileName)
{
  int chunkSize = 4096;
  byte[] buffer = new byte[chunkSize];
  //创建一个返回的伪Task对象
  TaskCompletionSource<long> tcs = new TaskCompletionSource<long>();

  MemoryStream fileContents = new MemoryStream();//用于保存读取的内容
  FileStream fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read, chunkSize, true);
  fileContents.Capacity += chunkSize;//指定缓冲区大小。好像Capacity会自动增长,设置与否没关系,后续写入多少数据,就增长多少

  Task<int> task = fileStream.ReadAsync(buffer, 0, buffer.Length);
  task.ContinueWith(readTask => ContinueRead(readTask, fileStream, fileContents, buffer, tcs));
  //在ContinueWith中循环读取,读取完成后,再返回tcs的Task
  return tcs.Task;
}

/// <summary>
/// 继续读取数据
/// </summary>
/// <param name="task">读取数据的线程</param>
/// <param name="fileStream">文件流</param>
/// <param name="fileContents">文件存放位置</param>
/// <param name="buffer">读取数据缓存</param>
/// <param name="tcs">伪Task对象</param>
private static void ContinueRead(Task<int> task, FileStream fileStream, MemoryStream fileContents, byte[] buffer, TaskCompletionSource<long> tcs)
{
  if (task.IsCompleted)
  {
    int bytesRead = task.Result;
    fileContents.Write(buffer, 0, bytesRead);//写入内存区域。似乎Capacity会自动增长
    if (bytesRead > 0)
    {
      //虽然看似是一个新的任务,但是使用了ContinueWith,所以使用的是同一个线程。
      //没有读取完,开启另一个异步继续读取
      Task<int> newTask = fileStream.ReadAsync(buffer, 0, buffer.Length);
      //此处做了一个循环
      newTask.ContinueWith(readTask => ContinueRead(readTask, fileStream, fileContents, buffer, tcs));
    }
    else
    {
      //已经全部读取完,所以需要返回数据
      tcs.TrySetResult(fileContents.Length);
      fileStream.Dispose();
      fileContents.Dispose();//应该是在使用了数据之后才释放数据缓冲区的数据
    }
  }
}

2、适应Task的异步编程模式

  .NET Framework中的旧版异步方法都带有“Begin-”和“End-”前缀。这些方法仍然有效,为了接口的一致性,它们可以被封装到Task中。

  FromAsyn方法把流的BeginRead和EndRead方法作为参数,再加上存放数据的缓冲区。BeginRead和EndRead方法会执行,并在EndRead完成后调用Continuation Task,把控制权交回主代码。上述例子会关闭流并返回转换的数据

const int ReadSize = 256;

/// <summary>
/// 从文件中获取字符串
/// </summary>
/// <param name="path">文件路径</param>
/// <returns>字符串</returns>
public static Task<string> GetStringFromFile(string path)
{
  FileInfo file = new FileInfo(path);
  byte[] buffer = new byte[1024];//存放数据的缓冲区

  FileStream fileStream = new FileStream(
    path, FileMode.Open, FileAccess.Read, FileShare.None, buffer.Length,
    FileOptions.DeleteOnClose | FileOptions.Asynchronous);

  Task<int> task = Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead,
    buffer, 0, ReadSize, null);//此参数为BeginRead需要的参数

  TaskCompletionSource<string> tcs = new TaskCompletionSource<string>();

  task.ContinueWith(taskRead => OnReadBuffer(taskRead, fileStream, buffer, 0, tcs));

  return tcs.Task;
}

/// <summary>
/// 读取数据
/// </summary>
/// <param name="taskRead">读取任务</param>
/// <param name="fileStream">文件流</param>
/// <param name="buffer">读取数据存放位置</param>
/// <param name="offset">读取偏移量</param>
/// <param name="tcs">伪Task</param>
private static void OnReadBuffer(Task<int> taskRead, FileStream fileStream, byte[] buffer, int offset, TaskCompletionSource<string> tcs)
{
  int readLength = taskRead.Result;
  if (readLength > 0)
  {
    int newOffset = offset + readLength;
    Task<int> task = Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead,
      buffer, newOffset, Math.Min(buffer.Length - newOffset, ReadSize), null);

    task.ContinueWith(callBackTask => OnReadBuffer(callBackTask, fileStream, buffer, newOffset, tcs));
  }
  else
  {
    tcs.TrySetResult(System.Text.Encoding.UTF8.GetString(buffer, 0, buffer.Length));
    fileStream.Dispose();
  }
}

3、使用async 和 await方式读取数据

  下面的示例中,使用了async和await关键字实现异步读取一个文件的同时进行压缩并写入另一个文件。所有位于await关键字之前的操作都运行于调用者线程,从await开始的操作都是在Continuation Task中运行。但有无法使用这两个关键字的场合:①Task的结束时机不明确时;②必须用到多级Task和TaskCompletionSource时

/// <summary>
/// 同步方法的压缩
/// </summary>
/// <param name="lstFiles">文件清单</param>
public static void SyncCompress(IEnumerable<string> lstFiles)
{
  byte[] buffer = new byte[16384];
  foreach(string file in lstFiles)
  {
    using (FileStream inputStream = File.OpenRead(file))
    {
      using (FileStream outputStream = File.OpenWrite(file + ".compressed"))
      {
        using (System.IO.Compression.GZipStream compressStream = new System.IO.Compression.GZipStream(outputStream, System.IO.Compression.CompressionMode.Compress))
        {
          int read = 0;
          while((read=inputStream.Read(buffer,0,buffer.Length))>0)
          {
            compressStream.Write(buffer, 0,read);
          }
        }
      }
    }
  }
}

/// <summary>
/// 异步方法的文件压缩
/// </summary>
/// <param name="lstFiles">需要压缩的文件</param>
/// <returns></returns>
public static async Task AsyncCompress(IEnumerable<string> lstFiles)
{
  byte[] buffer = new byte[16384];
  foreach(string file in lstFiles)
  {
    using (FileStream inputStream = File.OpenRead(file))
    {
      using (FileStream outputStream = File.OpenWrite(file + ".compressed"))
      {
        using (System.IO.Compression.GZipStream compressStream = new System.IO.Compression.GZipStream(outputStream, System.IO.Compression.CompressionMode.Compress))
        {
          int read = 0;
          while ((read = await inputStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
          {
            await compressStream.WriteAsync(buffer, 0, read);
          }
        }
      }
    }
  }
}

以上就是c# 使用Task实现非阻塞式的I/O操作的详细内容,更多关于c# 实现非阻塞式的I/O操作的资料请关注我们其它相关文章!

(0)

相关推荐

  • 详解C#中 Thread,Task,Async/Await,IAsyncResult的那些事儿

    说起异步,Thread,Task,async/await,IAsyncResult 这些东西肯定是绕不开的,今天就来依次聊聊他们 1.线程(Thread) 多线程的意义在于一个应用程序中,有多个执行部分可以同时执行:对于比较耗时的操作(例如io,数据库操作),或者等待响应(如WCF通信)的操作,可以单独开启后台线程来执行,这样主线程就不会阻塞,可以继续往下执行:等到后台线程执行完毕,再通知主线程,然后做出对应操作! 在C#中开启新线程比较简单 static void Main(string[]

  • c#异步task示例分享(异步操作)

    c# Task异步操作 复制代码 代码如下: using System;using System.Threading;using System.Threading.Tasks; namespace ConsoleApplication18{    class Program    {        static void Main(string[] args)        {            Func<string, string> _processTimeFunc = new Fun

  • windows下C#定时管理器框架Task.MainForm详解

    入住博客园4年多了,一直都是看别人的博客,学习别人的知识,为各个默默无私贡献自己技术总结的朋友们顶一个:这几天突然觉得是时候加入该队列中,贡献出自己微弱的力量,努力做到每个月有不同学习总结,知识学习的分享文章.以下要分享的是花了两天时间编写+测试的windows下C#定时管理器框架-Task.MainForm. 目的: 随着这五年在几个公司做不同职位的.net研发者,发现各个公司都或多或少会对接一些第三方合作的接口或者数据抓取功能,都是那种各个服务直接没有关联性功能,开发人员也可能不是一个人,使

  • 深入分析C# Task

    ​Task的MSDN的描述如下: [Task类的表示单个操作不会返回一个值,通常以异步方式执行. Task对象是一种的中心思想基于任务的异步模式首次引入.NETFramework 4 中. 因为由执行工作Task对象通常以异步方式执行线程池线程上而不是以同步方式在主应用程序线程中,可以使用Status属性,并将IsCanceled, IsCompleted,和IsFaulted属性,以确定任务的状态. 大多数情况下,lambda 表达式用于指定该任务所执行的工作量. 对于返回值的操作,您使用Ta

  • C#线程处理系列之线程池中的I/O线程

    一.I/O线程实现对文件的异步  1.1  I/O线程介绍: 对于线程所执行的任务来说,可以把线程分为两种类型:工作者线程和I/O线程. 工作者线程用来完成一些计算的任务,在任务执行的过程中,需要CPU不间断地处理,所以,在工作者线程的执行过程中,CPU和线程的资源是充分利用的. I/O线程主要用来完成输入和输出的工作的,在这种情况下, 计算机需要I/O设备完成输入和输出的任务,在处理过程中,CPU是不需要参与处理过程的,此时正在运行的线程将处于等待状态,只有等任务完成后才会有事可做, 这样就造

  • c#文件的I/O基本操作

    文件是一些永久存储及具有特定顺序的字节组成的一个有序的,具有名称的集合.与文件有关的概念是目录路径和磁盘存储等.流提供了一种向后备存储写入字节和从后备存储读取字节的方式.后备存储包裹用文件存储或用内存(数组)存储,以及网络,CD等. 基本文件的I/O 命名空间为System.I/O,程序集为mscorlib(在mscorlib.dll中)抽象基类Stream支持读取和写入字节.Stream集成了异步支持,其默认实现根据其相应的异步方法来定义同步读取和写入.所有表示流的类都是从Stream类继承的

  • C#利用Task实现任务超时多任务一起执行的方法

    前言 其实Task跟线程池ThreadPool的功能类似,不过写起来更为简单,直观.代码更简洁了,使用Task来进行操作.可以跟线程一样可以轻松的对执行的方法进行控制. 创建Task有两种方式,一种是使用构造函数创建,另一种是使用 Task.Factory.StartNew 进行创建. 如下代码所示 1.使用构造函数创建Task Task t1 = new Task(MyMethod); 2.使用Task.Factory.StartNew 进行创建Task Task t1 = Task.Fact

  • C# task应用实例详解

    Task的应用 ​Task的MSDN的描述如下: [Task类的表示单个操作不会返回一个值,通常以异步方式执行. Task对象是一种的中心思想基于任务的异步模式首次引入.NETFramework 4 中. 因为由执行工作Task对象通常以异步方式执行线程池线程上而不是以同步方式在主应用程序线程中,可以使用Status属性,并将IsCanceled, IsCompleted,和IsFaulted属性,以确定任务的状态. 大多数情况下,lambda 表达式用于指定该任务所执行的工作量. 对于返回值的

  • C#中Task.Yield的用途深入讲解

    前言 最近在阅读 .NET Threadpool starvation, and how queuing makes it worse这篇博文时发现文中代码中的一种 Task 用法之前从未见过,在网上看了一些资料后也是云里雾里不知其解,很是困扰.今天在程序员节的大好日子里终于想通了,于是写下这篇随笔分享给大家,也过过专心写博客的瘾. 这种从未见过的用法就是下面代码中的 await Task.Yield() : static async Task Process() { await Task.Yi

  • C#关于Task.Yeild()函数的讨论

    在与同事讨论async/await内部实现的时候,突然想到Task.Yeild()这个函数,为什么呢,了解一点C#async/await内部机制的都知道,在await一个异步任务(函数)的时候,它会先判断该Task是否已经完成,如果已经完成,则继续执行下去,不会返回到调用方,原因是尽量避免线程切换,因为await后面部分的代码很可能是另一个不同的线程执行,而Task.Yeild()则可以强制回到调用方,或者说主动让出执行权,给其他Task执行的机会,可以把Task理解为协程,Task.Yeild

随机推荐