Chapter 23. Asynchronous Methods

在21章中,我们看到了线程如何提供并行路径。我们会理所当然的认为无论何时我们需要并行运行某些任务,我们可以为该任务赋予一个新线程或是池化的线程。尽管这通常是正确的,但是仍有一些例外。假定我们正在编写一个需要处理1000个并发客户请求的TCP Socket或是Web服务器程序。如果我们为每一个新到的请求使用一个线程,我们就会消耗G字节的内存。

异步方法通过一种模式来解决这些问题:该模型通过少量的池化线程来大量的并发活动。这使得编写高并发的程序以及线程高效的程序成为可能。

为了理解本章的内容,我们需要熟悉线程与流。

为什么存在异步方法?

如果所需要的所有线程总是处于繁忙状态,则上面所描述的问题将是不可解决的。但是并不出现这样的情况:例如获取一个web页面从开始到结束也许只需要几秒钟(由于慢速的网络连接)并且只消耗几毫秒的CPU时间。处理HTTP请求与响应并不需要大量的计算。

这意味着处理单个客户请求的服务器线程也许99%的时间都在被阻塞。异步方法模式(也称之为异步编程模式或APM)就要释放这种潜力,只需要少量完全的线程来处理上千的并发任务。

异步方法的目标就在于绝不阻塞线程,而是使用返回回调的模式。(阻塞意味着整个WaitSleepJoin状态-或是使得其他的线程等待-浪费宝贵的线程资源。)要实现这一目的,异步方法必须放弃调用阻塞方法。

APM的终极目标就在于线程经济性。这意味着阻塞也是可以的-例如当锁住读写域时。

需要一段时间来运行的方法并不违反APM,因为他会执行大量的计算。异步方法的目的并不是为方法的并行执行提供一个方便的机制;他要优化线程资源。APM的黄金原则为:

充分利用CPU或是使用回调函数退出!

这意味着一个异步方法,例如BeginRead,并不会立即返回到调用者。他会使得调用者等待所需要的时间-同时充分利用处理器或是其他的约束资源。他甚至可以同步完成整个工作-假定他不会被阻塞或是使得其他线程被阻塞。

异步方法的主要作用是处理大量运行时间较长的并发请求-通常在慢速的网络连接上。

异步方法签名

通过约定,异步方法都以Begin开头,同时有一个以End结束的对应方法,并具有类似于异步委托的签名:

IAsyncResult BeginXXX (in/ref-args, AsyncCallback callback, object state);
return-type EndXXX (out/ref-args, IAsyncResult asyncResult);

下面是Stream类中的示例:

public IAsyncResult BeginRead (byte[] buffer, int offset, int size,
                               AsyncCallback callback, object state);
public int EndRead (IAsyncResult asyncResult);

Begin方示返回一个IAsyncResult对象,该对象会有助于我们来管理异步操作。然后相同的对象会被传递给完成回调函数。其委托如下:

public delegate void AsyncCallback (IAsyncResult ar);

对于异步委托,EndXXX方法可以允许获取返回值以任意的out/ref参数。这也是异常被重新抛出的地方。

为了避免阻塞,我们几乎总是在回调方法内部调用EndXXX方法。回调函数总是运行在池化线程上。

IAsyncResult

IAsyncResult接口定义如下:

public interface IAsyncResult
{
  object AsyncState { get; }            // "state" object passed to Begin.
  WaitHandle AsyncWaitHandle { get; }   // Signaled when complete.
  bool CompletedSynchronously { get; }  // Did it complete on BeginX?
  bool IsCompleted { get; }             // Has it completed yet?
}

AsyncState可以允许我们访问传递给BeginXXX方法的state参数。

当操作完成时,wait句柄会被赋值。为了无阻塞的等待,我们可以调用ThreadPool.RegisterWaitForSingleObject:

ThreadPool.RegisterWaitForSingleObject (
  result.AsyncWaitHandle,
  (data, timedOut) => { /* Callback */ },
  null, ?1, true);

CompletedSynchronously属性表明在调用BeginXXX方法之后操作会立即完成。这适用于以下三种原因:

  • 操作将会很快完成-从而被异步执行来避免管理异步操作的负载。
  • 底层实现-或操作系统-并支持这种应用场景中的APM。
  • 操作是CPU绑定的并且会无阻塞的完成。

当这个属性返回true时,回调函数依然会被触发-但是也许会与调用BeginXXX的函数位于同一个线程上。忘记考虑这种可能性也许会导致意料不到的递归,从而导致堆栈溢出。

异步方法与异步委托

异步委托的目的是实现与其方法签名相同的模式-但是并没有线程节约的目标:

异步方法 异步委托
很少或绝不阻塞线程 也许会阻塞线程任意长的时间(尽管并不调用线程)
Begin方法也许并不会立即返回给调用者 BeginInvoke会立即返回给调用者

异步方法的目的是使得多个任务运行在少量的线程之上;异步委托的目的是使得调用者并行执行某个任务。

我们可以使用异步委托来调用异步方法-从而执行可以保证立即返回给调用者,同时依然遵循APM。或者更好的是:我们可以使用框架4.0新的Task类来封装异步方法调用来简化管理(我们将会在本间津稍后解释如何来做)。

如果我们使用异步委托来调用一个阻塞方法,我们就回到了老路上来:服务器或者会超出有限的并发或者需要上千的线程来执行任务。

使用异步方法

让我们编写一个满足下列要求的简单的TCP Socket服务器:

  1. 他等待客户请求
  2. 他读取5000字节确切长度的消息
  3. 他反转消息中的字节,然后将其返回给客户

让我们首先使用一个标准的线程阻塞模式来编写。下面是代码,但是并没有异常处理:

using System;
using System.Threading;
using System.Net;
using System.Net.Sockets;
public class Server
{
 public void Serve (IPAddress address, int port)
 {
    ThreadPool.SetMinThreads (50, 50);    // Refer to Chapter 21
    ThreadPool.SetMaxThreads (50, 50);    // Refer to Chapter 21
    TcpListener listener = new TcpListener (address, port);
    listener.Start();
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient();
      ThreadPool.QueueUserWorkItem (Accept, c);
    }
  }
  void Accept (object clientObject)
  {
    using (TcpClient client = (TcpClient) clientObject)
    using (NetworkStream n = client.GetStream())
    {
      byte[] data = new byte [5000];
      int bytesRead = 0; int chunkSize = 1;
      while (bytesRead < data.Length && chunkSize > 0)
        bytesRead +=
          chunkSize = n.Read
            (data, bytesRead, data.Length - bytesRead);    // BLOCKS
      Array.Reverse (data);
      n.Write (data, 0, data.Length);                      // BLOCKS
    }
  }
}

我们线程池的使用可以阻止创建任意大量的线程数目并且减少为每个请求创建线程所浪费的时间。我们的程序很简单而快速,但是只能处理50个并发请求。

为了扩展到处理1000个并发处理请求-而不增加线程数量-我们必须使用异步方法模式。这意味着避免阻塞I/O方法而使用其对应的异步方法。下面是我们的做法:

public class Server
{
  public void Serve (IPAddress address, int port)
  {
    ThreadPool.SetMinThreads (50, 50);
    TcpListener listener = new TcpListener (address, port);
    listener.Start();
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient();
      ThreadPool.QueueUserWorkItem (ReverseEcho, c);
    }
  }
  void ReverseEcho (object client)
  {
    new ReverseEcho().Begin ((TcpClient)client);
  }
}
class ReverseEcho
{
  TcpClient _client;
  NetworkStream _stream;
  byte[] _data = new byte [5000];
  int _bytesRead = 0;
  internal void Begin (TcpClient c)
  {
    try
    {
      _client = c;
      _stream = c.GetStream();
      Read();
    }
    catch (Exception ex) { ProcessException (ex); }
  }
  void Read()            // Read in a nonblocking fashion.
  {
    while (true)
    {
      IAsyncResult r = _stream.BeginRead
       (_data, _bytesRead, _data.Length - _bytesRead, ReadCallback, null);
      // This will nearly always return in the next line:
      if (!r.CompletedSynchronously) return;   // Handled by callback
      if (!EndRead (r)) break;
    }
    Write();
  }
  void ReadCallback (IAsyncResult r)
  {
    try
    {
      if (r.CompletedSynchronously) return;
      if (EndRead (r))
      {
        Read();       // More data to read!
        return;
      }
      Write();
    }
    catch (Exception ex) { ProcessException (ex); }
  }
  bool EndRead (IAsyncResult r)   // Returns false if there's no more data
  {
    int chunkSize = _stream.EndRead (r);
    _bytesRead += chunkSize;
    return chunkSize > 0 && _bytesRead < _data.Length;   // More to read
  }
  void Write()
  {
    Array.Reverse (_data);
    _stream.BeginWrite (_data, 0, _data.Length, WriteCallback, null);
  }
  void WriteCallback (IAsyncResult r)
  {
    try { _stream.EndWrite (r); }
    catch (Exception ex) { ProcessException (ex); }
    Cleanup();
  }
  void ProcessException (Exception ex)
  {
    Cleanup();
    Console.WriteLine ("Error: " + ex.Message);
  }
  void Cleanup()
  {
    if (_stream != null) _stream.Close();
    if (_client != null) _client.Close();
  }
}

这个程序可以在不多于10个池化线程上处理1000个并发请求。

每个客户请求都不会调用任何阻塞方法进行处理。

在Read方法中,我们由在流上调用BeginRead方法开始,同时指定一个完成回调函数。我们可以将整个方法简化为如下样子并获得相同的结果:

void Read()
{
  _stream.BeginRead
    (_data, _bytesRead, _data.Length - _bytesRead, ReadCallback, null);
}

然而有一个小小的挑战,BeginRead将会异步完成,然后在相同的线程上调用ReadCallback。因为ReadCallback会再次调用Read,这也许会导致某些非常深的递归与堆栈溢出。为了避免这一问题,我们必须在BeginRead调用之后检测CompletedSynchronously,并且如果其返回true,使用循环来调用Read直到完成而不要依赖于ReadCallback中的递归调用。

这就导致了我们为什么在Server方法中调用AcceptTcpClient-而不是其异步版本,BeginAcceptTcpClient。出于保存线程的优点,后者需要与BeginRead相同的模式使用来避免可能的堆栈溢出。

ReverseEcho类为其生命周期封装了请求状态。我们不能再为该任务使用局部变量,因为每次我们退出时执行堆栈都会退出。这同时意味着简单的using语句不再适用于关闭我们的TcpClient与流。

另一个影响因素是我们不能使用如BinaryReader与BinaryWriter这样的类型,因为他们并没有提供其方法的异步版本。

异步方法与任务

在前面的示例中我们看到了框架4.0的Task类如何能够管理池化线程上的工作单元。我们也可以使用Task来封装异步方法调用-通过TaskFactory上的FromAsync方法。

我们通过调用FromAsync所获得的任务只是在BeginXXX与EndXXX方法上的一个轻量封装器-他并不会像普通的任务一样获得调度。使用FromAsync的原因就在于利用如连续与子任务这样的特性。FromAsync在内部是使用TaskCompletionSource来实现的。

FromAsync方法需要下列参数:

  • 指定BeginXXX方法的委托
  • 指定EndXXX方法的委托
  • 传递给这些方法的其他参数

FromAsync被重载来接受与.NET框架中几乎所有的异步方法签名相匹配的委托类型与参数。例如,假定stream是一个Stram,我们并不会使用下面的代码:

var buffer = new byte[1000];
stream.BeginRead (buffer, 0, 1000, MyCallback, null);
...
void MyCallback (IAsyncResult r)
{
  int bytesRead;
  try { bytesRead = stream.EndWrite (r); }
  catch (Exception ex) { Console.Write (ex.Message); }
  Console.Write (bytesRead + " bytes read");
}

而是会使用下面的代码:

var buffer = new byte[1000];
Task<int> readChunk = Task<int>.Factory.FromAsync (
  stream.BeginRead, stream.EndRead, buffer, 0, 1000, null);
readChunk.ContinueWith (ant => Console.Write (ant.Result + " bytes read"),
                               TaskContinuationOptions.NotOnFaulted);
readChunk.ContinueWith (ant => Console.Write (ant.Exception.Message),
                               TaskContinuationOptions.OnlyOnFaulted);

这段代码本身并没有什么节省:当我们引入父子线程扩展时就会看到真正的好处。重新考虑我们前面的示例,假定我们重构ReverseEcho的Begin方法,从而他在一个新的任务上调用Read。这会将Server.Server由创建任务自身中解脱出来,但是更重要的是,他可以依据我们可以关联到哪个扩展来为From Async创建的任务创建一个父任务。这就避免了必须为每一个子任务编写单独的异常处理块或是显示的错误处理。清理也可以很容易的作为另一个父任务的扩展来实现:

public class Server
{
  public void Serve (IPAddress address, int port)
  {
    ThreadPool.SetMinThreads (50, 50);
    TcpListener listener = new TcpListener (address, port);
    listener.Start();
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient();
      new ReverseEcho().BeginAsync (c);
    }
  }
}
class ReverseEcho
{
  TcpClient _client;
  NetworkStream _stream;
  byte[] _data = new byte [5000];
  int _bytesRead = 0;
  internal void BeginAsync (TcpClient c)
  {
    _client = c;
    _stream = c.GetStream();
    var task = Task.Factory.StartNew (Read);
    // Set up centralized error handling and cleanup:
    task.ContinueWith (ant =>
      Console.WriteLine ("Error: " + ant.Exception.Message),
      TaskContinuationOptions.OnlyOnFaulted);
    task.ContinueWith (ant =>
    {
      if (_stream != null) _stream.Close();
      if (_client != null) _client.Close();
    });
  }
  void Read()    // This will create a child task.
  {
    Task<int> readChunk = Task<int>.Factory.FromAsync (
      _stream.BeginRead, _stream.EndRead,
      _data, 0, _data.Length - _bytesRead, null,
      TaskCreationOptions.AttachedToParent);
    readChunk.ContinueWith (Write, TaskContinuationOptions.NotOnFaulted);
  }
  void Write (Task<int> readChunk)
  {
    _bytesRead += readChunk.Result;
    if (readChunk.Result > 0 && _bytesRead < _data.Length)
    {
      Read();       // More data to read!
      return;
    }
    Array.Reverse (_data);
    Task.Factory.FromAsync (_stream.BeginWrite, _stream.EndWrite,
                            _data, 0, _data.Length, null,
                            TaskCreationOptions.AttachedToParent);
  }
}

同时我们不需要担心Read进入递归并执行栈分割:扩展并不会同步发生,除非我们显示要求。

*异步方法与迭代器*

在VS2010的PFX的并行编程示例中,我们会发现在TaskFactory上有一个名为Iterate的扩展方法。使用这个方法,我们可以将我们示例中的ReverseEcho类中的逻辑移动到另一个单独的迭代器方法中。在简单的场景中这将是一个恐怖的模式,因为这意味着我们可以使用具有局部变量的简单方法来替换ReverseEcho及其域。下面是我们重构的Server类:

public class Server
{
  public void Serve (IPAddress address, int port)
  {
    ThreadPool.SetMinThreads (50, 50);
    TcpListener listener = new TcpListener (address, port);
    listener.Start();
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient();
      Task.Factory.Iterate (ReverseEcho(c)).ContinueWith (t =>
        Console.WriteLine ("Error: " + t.Exception.Message),
        TaskContinuationOptions.OnlyOnFaulted);
    }
  }
  IEnumerable<Task> ReverseEcho (TcpClient client)
  {
    using (client)
    using (var stream = client.GetStream())
    {
      byte[] data = new byte[Program.MessageLength];
      int bytesRead = 0;
      while (true)
      {
        // ReadASync is an extension method in the samples.
        Task<int> readChunk = stream.ReadAsync
          (data, bytesRead, data.Length - bytesRead);
        yield return readChunk;
        bytesRead += readChunk.Result;
        if (readChunk.Result <= 0 || bytesRead >= data.Length)
          break;
      }
      Array.Reverse(data);
      yield return stream.WriteAsync (data, 0, bytesRead);
    }
  }
}

然而,在更为复杂的情况下,拥有一个类是很有帮助的,因为我们可以将代码抽象为我们所希望的多个方法。迭代器的问题在于我们在迭代器中所调用的任何方法本身并不会影响到调用者的行为,而这限制了我们重构大代码块的能力。

编写异步方法

回到我们前面的示例,假定5000字节的交换量仅是更高级协议的一个小部分。将我们已编写的代码转换为类似下面的方法将是不错的选择:

public byte[] ReverseEcho (TcpClient client);

当然,问题在于该方法的签名是同步的;我们需要提供一个异步的版本-换句话说,BeginReverseEcho。更进一步,如果遇到异常,将其输出到控制台并不是太好;我们需要在某一时刻将其抛给调用者。所以,为了适应该模式,我们必须同时提供EndReverseEcho并且编写一个实现了IAsyncResult的类。

我们的ReverseEcho类是IAsyncResult的一个优秀替代者,因为他已经封装了操作的状态。我们所需要添加的是在调用EndReverseEcho时重新抛出异常的代码,以及在完成时通知的等待句柄。

下面是一个真实的示例,具有异常处理与线程安全:

public byte[] ReverseEcho (TcpClient client);
    get { return _waitHandle.WaitOne (0, false); }
  }
  internal void Begin (TcpClient c, AsyncCallback callback, object state)
  {
    _client = c;
    _userState = state;
    get { return _waitHandle.WaitOne (0, false); }
  }
  internal void Begin (TcpClient c, AsyncCallback callback, object state)
  {
    _client = c;
    _userState = state;
      Read();       // More data to read!
      return;
    }
    Array.Reverse (_data);
    Task.Factory.FromAsync (_stream.BeginWrite, _stream.EndWrite,
                            _data, 0, _data.Length, null);
  }
}

在CleanUp中,我们关闭了_stream而不是_client,因为调用者在执行反向回应之后也许希望继续使用_client。

伪造异步方法

通常,以Begin开头并且返回IAsyncResult的框架方法都遵循APM。然后,基于Stream类会有一些例外:

  • BufferedStream
  • CryptoStream
  • DeflateStream
  • MemoryStream

这些类型依赖基类Stream类中的回调异步实现,但是并提供非阻塞保证。相反,他们会使用异步委托来调用阻塞方法,例如Read或Write。尽管这种方法在MemoryStream的情况下是非常正确的,但是对于BufferedStream与CryptoStream则会出现问题-如果封装MemoryStream以外的流。换句话说,如果我们在封装NetworkStream的CryptoStream上调用BeginRead或BeginWrite,某些线程将会在某些时刻阻塞,违反异步方法模式的扩展性。这是一个遗憾,因为CryptoStream的装饰者模式是很高效的。

CryptoStream的工作区首先将底层异步读取到MemoryStream中,然后使得CryptoStream封装MemoryStream。这意味着将整个流读取到内存中,尽管是在一个高并发服务器上,也并不会获得很好的扩展性。如果我们确实需要异步加密,一个解决方案就是在比CryptoStream还要低的层次上进行处理,也就是ICryptoTransform。使用Red Gate’s Reflector这样的反汇编工作,我们可以确切的了解CryptoStream是如何使用ICryptoTransform来进行处理的。

DeflateStream确实遵循异步模式-或者到至少是尝试遵循。问题在于他并不能正确的处理异常。例如,如果底层数据被破坏,BeginRead会在池化线程上抛出异常而不会将其汇集到EndRead。这将是破坏我们整个程序的不可捕获的异常。

FileStream类是另一个违反者-他伪造了异步方法(例如他依赖于Stream的默认实现)。然而,如果使用如下的方式进行构造,他确实会尝试真正的异步行为:

Stream s = new FileStream ("large.bin", FileMode.Create, FileAccess.Write,
                            FileShare.None, 0×1000, true);

最后的布尔参数指示FileStream不要使用异步委托-而是尝试真正的APM方法。问题在于异步文件I/O需要操作系统支持,也许并不会出现这样的支持。如果OS没有支持,BeginRead会将调用线程阻塞在WaitSleepJoin状态。

然而,异步文件I/O的缺乏几乎不是问题,假定我们要访问一个本地文件系统。小文件请求也许会由操作系统或是硬盘驱动缓存来提供。

异步方法的替代者

第21章描述了三个类似的技术-这三个技术都会在新线程上执行任务:

  • ThreadPool.RegisterWaitForSingleObject
  • 生产者/消费者队列
  • 线程与系统计时器

ThreadPool.RegisterWaitForSingleObject在实现异步方法模式时会很有用。自定义的生产者/消费者队列可以提供完全的替代-使用我们自己的工作者池-但是如果我们希望与.NET框架进行交互则没有太大的帮助。如果我们的工作是定时执行而不是响应请求,则线程与系统计时器会非常合适。