MSDN Magazin > Home > Ausgaben > 2008 > Juni >  Concurrent Affairs: Vereinfachtes APM mit dem A...
Concurrent Affairs
Vereinfachtes APM mit dem AsyncEnumerator
Jeffrey Richter

In meinem letzten Artikel (msdn2.microsoft.com/magazine/cc163323) ging es darum, wie neue C#-Sprachfeatures (anonyme Methoden, Lambda-Ausdrücke und Iteratoren) zur Vereinfachung der asynchronen Programmierung genutzt werden können. Am Ende der Rubrik stand eine Demonstration dazu, wie eine asynchrone Programmierung mit einem synchronen Programmiermodell über einen C#-Iterator realisiert werden könnte. Der Beispieliterator ist in Abbildung 1 gezeigt. Es ist zu beachten, dass zum Ausführen des Iteratorcodes keine normale C#-Foreach-Anweisung verwendet werden kann, weil der Code immer vom Thread ausgeführt wird, der „foreach“ aufruft, und die Ausführung kontinuierlich laufen würde, ohne darauf zu warten, dass die asynchronen E/A-Vorgänge abgeschlossen werden.
In diesem Artikel stelle ich meine AsyncEnumerator-Klasse vor, die einen Iterator intelligent steuert, damit verschiedene Threadpoolthreads den Code zu verschiedenen Zeiten ausführen können, und die sicherstellt, dass der Iterator nur nach Abschluss der asynchronen E/A-Vorgänge fortgesetzt wird. Außerdem wird die Architektur und Funktionsweise meiner AsyncEnumerator-Klasse erläutert.

Verwenden meiner AsyncEnumerator-Klasse
Hier ist die Definition meiner AsyncEnumerator-Klasse:
public class AsyncEnumerator {
   // Methods called by code outside of the iterator
   public AsyncEnumerator();
   public void Execute(IEnumerator<Int32> enumerator);

   // Methods called by code inside the iterator
   public AsyncCallback End();
   public IAsyncResult DequeueAsyncResult();
}
Die Verwendung dieser AsyncEnumerator-Klasse ist ganz einfach. Zunächst wird erklärt, wie Sie Ihr Iteratormitglied implementieren, anschließend wird erläutert, wie Sie es aufrufen.
Definieren Sie Ihr Iteratormitglied so, dass es die gewünschten Parameter akzeptiert, und fügen Sie einen zusätzlichen Parameter hinzu, der auf ein Async­Enumeratorobjekt verweist. Ihr Iteratormitglied muss so definiert werden, dass es eine Sammlung von Int32s zurückgibt. Anders ausgedrückt: es muss den Rückgabetyp „IEnumerator<Int32>“ haben.
Starten Sie dann jeden asynchronen Vorgang innerhalb des Iteratorsmitglieds durch Aufrufen der entsprechenden BeginXxx-Methode. Wie Sie wissen, ist es beim Aufrufen einer BeginXxx-Methode wichtig, dass Sie ihr den Namen der Methode übergeben, die beim Abschluss des asynchronen Vorgangs aufgerufen werden soll.
Rufen Sie anstelle einer Definition Ihrer eigenen Methode die Endmethode des AsyncEnumerator-Objekts auf. Die Endmethode gibt einen AsyncCallback-Delegaten zurück, der eine private Methode identifiziert, die innerhalb meiner Async­Enumeratorklasse definiert ist. Sobald also ein asynchroner Vorgang abgeschlossen wird, wird der Code in einem AsyncEnumerator-Objekt hierüber benachrichtigt. Dieser Code setzt dann das IAsyncResult-Objekt für den abgeschlossenen Vorgang in ein List<IAsyncResult>-Objekt, das ich als Inbox bezeichne.
Setzen Sie in Ihren Code nach dem Aufrufen einer BeginXxx-Methode eine Yield Return-Anweisung ein, die die Anzahl der asynchronen Vorgänge zurückgibt, die Sie in die Warteschlange gesetzt haben. In Abbildung 1 ist der Yield Return 1 (da nur eine BeginXxx-Methode [BeginRead] aufgerufen wurde). Die Yield Return-Anweisung unterbricht die Iteratormethode und verhindert, dass weiterer Code in der Iteratormethode ausgeführt wird.
private static IEnumerator<Int32> ApmPatternWithIterator(
   AsyncEnumerator ae, String pathname) {
   using (FileStream fs = new FileStream(pathname, FileMode.Open,
      FileAccess.Read, FileShare.Read, 8192, FileOptions.Asynchronous)) {

      Byte[] data = new Byte[fs.Length];
      fs.BeginRead(data, 0, data.Length, ae.End(), null);
      yield return 1;

      Int32 bytesRead = fs.EndRead(ae.DequeueAsyncResult());
      ProcessData(data);
   }
}
Weiter unten in diesem Artikel werden Beispiele aufgeführt, in denen andere Yield Return-Werte als 1 erzielt wurden, aber für viele Anwendungen ist der Yield Return-Wert 1 angemessen. Die Zahl, die Sie in einer Yield Return-Anweisung angeben, teilt dem Async­Enumerator-Objekt mit, wie viele asynchrone Vorgänge abgeschlossen werden müssen, bevor Ihr Iterator fortgesetzt wird. Wenn der Iterator unterbrochen wurde, werden also alle von Ihnen ausgelösten asynchronen Vorgänge abgeschlossen. Sobald ein Vorgang abgeschlossen ist, wird in der Inbox des Async­Enumerators ein Eintrag hinzugefügt. Wenn die Zahl der Elemente in der Inbox gleich der Zahl ist, die Sie in der Yield Return-Anweisung angegeben haben, setzt das Async­Enumerator-Objekt den Iterator fort, und der Code kann weiter ausgeführt werden.
Einblicke: AsyncEnumeration im Kontext
Die Verwendung von C#-Iteratoren zur Durchführung einer asynchronen Verarbeitung ist nur einer von vielen Aspekten im Zusammenhang mit der gleichzeitigen Programmierung. Bei Microsoft laufen momentan mehrere Entwicklungsinitiativen, die sich mit diesen Punkten befassen.
Eine dieser Initiativen ist die Concurrency and Coordination Runtime (CCR), ein generischeres Framework, das zur Lösung verschiedener Probleme in den Bereichen asynchrone und gleichzeitige Programmierung herangezogen werden kann.
Die CCR enthält eine Implementierung zum Durchführen einer asynchronen Enumeration über C#-Iteratoren. Das folgende Beispiel demonstriert, wie Sie eine Datei asynchron über die Iteratoren in der CCR verarbeiten könnten:
IEnumerator<ITask> CcrIterator(string pathname) {
  var resultPort = new Port<IAsyncResult>();
  using (FileStream fs = new FileStream(pathname, 
    FileMode.Open, FileAccess.Read, FileShare.Read, 
    8192, FileOptions.Asynchronous)) {
      Byte[] data = new Byte[fs.Length];
      fs.BeginRead(data, 0, data.Length, resultPort.Post, null);    
      yield return resultPort;
      IAsyncResult ar = resultPort;
      Int32 bytesRead = fs.EndRead(ar); }
    ProcessData(data);
  }
}
Neben der Unterstützung von Iteratoren bietet die CCR auch erweiterte Unterstützung für Ausnahmebehandlung und Koordinationsprimitiven, z. B. Arbiter. Weitere Informationen zur CCR finden Sie im CCR-Benutzerhandbuch (msdn2.microsoft.com/library/bb905447), in der Rubrik „Concurrent Affairs“ vom September 2006 (msdn2.microsoft.com/magazine/cc163556) und unter CCR auf Kanal 9 (channel9.msdn.com/showpost.aspx?postid=219308).
Howard Dierking, Chefredakteur
Nach der Yield Return-Anweisung rufen Sie die dazugehörige EndXxx-Methode auf. Wenn Sie eine EndXxx-Methode aufrufen, müssen Sie ein IAsyncResult-Objekt übergeben. Glücklicherweise hält das AsyncEnumerator-Objekt diese Objekte in seiner Inbox bereit, und Sie müssen sie daher nur daraus abrufen, indem Sie die DequeueAsyncResult-Methode des AsyncEnumerator-Objekts aufrufen. Die Elemente werden über einen FIFO-Algorithmus (First-in-First-out) aus der Inbox abgerufen.
Wenn die Ergebnisse des asynchronen Vorgangs von der EndXxx-Methode zurückgegeben wurden, können Sie sie direkt innerhalb des Iterators verarbeiten. Das ist äußerst praktisch, da jetzt ein synchrones Programmiermodell vorliegt, obwohl asynchrone Vorgänge durchgeführt werden.
In vielen Szenarios ist es nützlich, jetzt einen weiteren asynchronen Vorgang in die Warteschlange zu setzen, indem eine weitere BeginXxx-Methode aufgerufen wird. Im Iterator können Sie eine andere BeginXxx-Methode aufrufen und dann eine andere Yield Return 1-Anweisung erzeugen, nach der Sie dann die entsprechende EndXxx-Methode aufrufen. Hierdurch können Sie problemlos nacheinander eine Sequenz asynchroner Vorgänge ausführen, ohne Threads zu blockieren. Ein weiteres Plus ist, dass Sie den gesamten Code mit einem synchronen Programmiermodell schreiben können.
Der in Abbildung 2 gezeigte HtmlToFile-Iterator liest beispielsweise HTML-Daten von jeweils 1 Kilobyte von einem Webserver und speichert diese Daten in einer lokalen Datei. Das Aufrufen des Iterators ist äußerst einfach: Sie erstellen ein AsyncEnumerator-Objekt und rufen dann seine Execute-Methode auf, in der Sie den Namen Ihres Iteratormitglieds übergeben. Sie können Argumente an das Iteratormitglied übergeben und müssen auch einen Verweis zum AsyncEnumerator-Objekt an den Iterator übergeben, damit dieser die End- und die DequeueAsyncResult-Methoden aufrufen kann. Hier ist der Code zum Aufrufen des HtmlToFile-Iterators:
AsyncEnumerator ae = new AsyncEnumerator();
ae.Execute(HtmlToFile(
  ae, 
  "http://www.Wintellect.com/", 
  "LocalFile.html")); 
private static IEnumerator<Int32> HtmlToFile(AsyncEnumerator ae, 
   String url, String file) {

   // Issue asynchronous web request operation
   WebRequest webRequest = WebRequest.Create(url);
   webRequest.BeginGetResponse(ae.End(), null);
   yield return 1;

   WebResponse webResponse;
   try {
      webResponse = webRequest.EndGetResponse(ae.DequeueAsyncResult());
   }
   catch (WebException e) {
      Console.WriteLine("Failed to contact server: {0}", e.Message);
      yield break;
   }

   using (webResponse) {
      Stream webResponseStream = webResponse.GetResponseStream();

      // Read the stream data and write it to a file in 1KB chunks
      Byte[] data = new Byte[1024];
      using (FileStream fs = new FileStream(file, FileMode.Create, 
         FileAccess.Write, FileShare.None, data.Length, 
         FileOptions.Asynchronous)) {

         // See support.microsoft.com/kb/156932 
         fs.SetLength(webResponse.ContentLength);

         while (true) {
            // Issue asynchronous web response stream read operation
            webResponseStream.BeginRead(data, 0, data.Length, 
               ae.End(), null);
            yield return 1;

            // Get result of web response stream read operation
            Int32 bytesRead = webResponseStream.EndRead(
               ae.DequeueAsyncResult());
            if (bytesRead == 0) break; // Stream end: close file & exit

            // Issue asynchronous file write operation
            fs.BeginWrite(data, 0, bytesRead, ae.End(), null);
            yield return 1;

            // Get result of file write operation
            fs.EndWrite(ae.DequeueAsyncResult());
         }
      }
   }
}
Meine AsyncEnumerator-Execute-Methode gibt erst dann Werte zurück, wenn das Iteratormitglied entweder beendet wird oder eine Yield Break-Anweisung ausführt. Solange der Hauptthread blockiert ist, führen Threadpoolthreads den Iterator weiter aus, während asynchrone Vorgänge abgeschlossen werden. All das bedeutet, dass jeder Teil Ihres Iteratorcodes theoretisch von einem anderem Thread ausgeführt werden könnte und dass Ihr Iteratorcode deshalb nicht von einem threadspezifischen Zustand wie lokaler Speicher eines Threads, Kultur, UI-Kultur, Prinzipal oder Priorität abhängen sollte.
Eine kleine Bemerkung am Rande: Ich bin nicht dafür, den Thread, der den Execute-Block ausführt, so lange laufen zu lassen, bis der Iterator abgeschlossen ist, aber hierdurch lässt sich die Funktionsweise meiner AsyncEnumerator-Klasse leichter erklären, und das Experimentieren und Debugging wird hierdurch vereinfacht. Es gibt allerdings eine Möglichkeit, meinen AsyncEnumerator dazu zu bringen, den Thread während der Ausführung des Iterators nicht zu blockieren. Dies ist im Hinblick auf Skalierbarkeit und Reaktionsfähigkeit wichtig ist, wenn Sie einen Iterator von einem GUI-Thread in einer Windows® Form- oder einer Windows Presentation Foundation (WPF)-Anwendung aufrufen möchten. Im nächsten Artikel unter dieser Rubrik wird aufgezeigt, wie Sie Ihr Iteratormitglied asynchron ausführen können.

Architektur von AsyncEnumerator
Durch meine AsyncEnumerator-Klasse werden die in Abbildung 3 gezeigten privaten Felder definiert. Das Feld „m_enumerator“ verweist auf Ihr Iteratormitglied. Der Konstruktor initialisiert dieses Feld auf Null und Execute setzt das Feld auf den übergebenen Wert. Das Feld „m_inbox“ verweist auf eine Sammlung von IAsyncResult-Objekten. Das Feld „m_waitAndInboxCounts“ ist eine Struktur, die zwei 16-Bit-Ganzzahlwerte enthält. Weiter unten in diesem Artikel wird erläutert, wie diese Werte geändert werden.
public partial class AsyncEnumerator {
   // Refers to the iterator member's code
   private IEnumerator<Int32> m_enumerator;

   // Collection of completed async operations (the inbox)
   private List<IAsyncResult> m_inbox = 
      new List<IAsyncResult>();

   // Structure with Wait & Inbox counters
   private WaitAndInboxCounts m_waitAndInboxCounts;
}
Wie bereits im letzten Artikel erwähnt, stellt ein Iterator eine wirklich einfache Möglichkeit dar, den Compiler zum Erstellen einer Klasse zu bringen, die die IEnumerator<T>-Schnittstelle implementiert. Normalerweise bewegen Sie sich mithilfe der foreach-Anweisung durch ein IEnumerator<T>-Objekt. Es hindert Sie aber nichts daran, die IEnumerator<T> MoveNext-Methode und die Current-Eigenschaft explizit im Code aufzurufen, und genau das erfolgt in meiner AsyncEnumerator-Klasse.
Die Execute-Methode des AsyncEnumerators ruft intern eine private Methode mit dem Namen „ResumeIterator“ (in Abbildung 4 gezeigt) auf. Diese Methode ist für das Starten des Iterators und auch für seinen erneuten Start aus einem angehaltenen Zustand verantwortlich.
private void ResumeIterator() {
   Boolean continueIterating;

   // While there are more operations to perform...
   while (continueIterating = m_enumerator.MoveNext()) {

      // Get the value returned from the enumerator
      UInt16 numberOpsToWaitFor = checked((UInt16) m_enumerator.Current);

      // If inbox has fewer items than requested, keep iterator suspended
      if (!m_waitAndInboxCounts.AtomicSetWait(numberOpsToWaitFor)) break;

      // Inbox has enough items, loop to resume the iterator
   }

   // The iterator is suspended, just return
   if (continueIterating) return;

   // The iterator has exited, execute the iterator's finally code
   m_enumerator.Dispose();
}
Wenn ein Thread ResumeIterator aufruft, wird dabei die IEnumerator<T> MoveNext-Methode aufgerufen, die den Iterator aktiviert und ausführt. Wenn der Iterator beendet oder eine Yield Break-Anweisung ausgeführt wird, gibt die IEnumerator<T> MoveNext-Methode „false“ zurück, was bedeutet, dass keine weiteren Aufgaben zu erledigen sind. Wenn die continueIterating-Variable meiner ResumeIterator-Methode auf „false“ festgelegt ist, beendet sie die Schleife und ruft dann die Dispose-Methode des IEnumerator<T>-Objekts auf, um eine Bereinigung durchzuführen. Da der Iterator alle Aufgaben abgeschlossen hat, kehrt die ResumeIterator-Methode zurück.
Wenn allerdings die MoveNext-Methode des Enumeratorobjekts „true“ zurückgibt, wurden einige asynchrone Vorgänge ausgelöst und die Ausführung der Methode durch Ausführen einer Yield Return-Anweisung unterbrochen. Die ResumeIterator-Methode muss jetzt wissen, welche Zahl für die Yield Return-Anweisung des IEnumerator<T>-Objekts angegeben wurde. Diese Zahl gibt die Anzahl der asynchronen Vorgänge an, die vor dem erneuten Start des Iterators abgeschlossen sein sollten. Zum Abrufen der Zahl fragt ResumeIterator die Current-Eigenschaft des Enumeratorobjekts ab. Diese Eigenschaft gibt den letzten Wert zurück, der über eine Yield Return-Anweisung angegeben wurde.
ResumeIterator ruft dann die WaitAndInboxCounts AtomicSetWait-Methode auf, die die Anzahl der Elemente festlegt, auf die der Iterator warten soll. Wenn AtomicSetWait feststellt, dass die Zahl der Elemente in der Inbox kleiner als diese Wait-Zahl ist, gibt AtomicSetWait „false“ zurück, wodurch ResumeIterator zurückkehrt, da dieser Thread keine weiteren Aufgaben zu erledigen hat. Wenn AtomicSetWait feststellt, dass die Anzahl der Elemente in der Inbox größer als oder gleich der Wait-Zahl ist, gibt AtomicSetWait „true“ zurück und führt eine Schleife aus, indem es die MoveNext-Methode des Enumeratorobjekts erneut aufruft und es dem Iterator ermöglicht, die Ausführung wieder aufzunehmen, damit die abgeschlossenen Vorgänge verarbeitet werden können.
Innerhalb des Iterators übergeben alle BeginXxx-Methoden die gleiche Methode für die AsyncCallback-Methode. Diese private Methode mit dem Namen „Enqueue­AsyncResult“ wird über die Endmethode des AsyncEnumerator-Objekts aufgerufen und wie folgt implementiert:
private void EnqueueAsyncResult(IAsyncResult result) {
   // Add this item to the inbox
   lock (m_inbox) { m_inbox.Add(result); }

   // Add 1 to inbox count. If inbox has enough items 
   // in it; this thread calls ResumeIterator
   if (m_waitAndInboxCounts.AtomicIncrementInbox()) 
      ResumeIterator();
}
Diese Methode fügt der Inbox das IAsyncResult-Objekt des abgeschlossenen Vorgangs threadsicher hinzu und ruft dann die WaitAndInboxCounts AtomicIncrementInbox-Methode auf, die die Anzahl der Elemente in der Inbox um 1 erhöht. Wenn AtomicIncrement­Inbox feststellt, dass die Anzahl der Elemente in der Inbox kleiner als die Wait-Zahl ist, gibt AtomicIncrementInbox „false“ zurück, wodurch der Threadpoolthread zum Pool zurückkehrt, damit er für andere Aufgaben verwendet werden kann.
Wenn AtomicIncrementInbox feststellt, dass die Anzahl der Elemente in der Inbox gleich der Wait-Zahl ist, gibt AtomicIncrementInbox „true“ zurück und ruft die ResumeIterator-Methode meines AsyncEnumerators auf, die wiederum die MoveNext-Methode des Enumeratorobjekts aufruft, wodurch der Iterator die Ausführung fortsetzt, damit die abgeschlossenen Vorgänge verarbeitet werden können.

Ändern der Wait- und Inbox-Zähler
Jedes AsyncEnumerator-Objekt verfügt über eine 16-Bit-Wait-Zahl und eine 16-Bit-Inbox-Zahl, und beide Zahlen haben einen Bereich von 0 bis 65535. Die Wait-Zahl gibt die Anzahl der asynchronen Vorgänge an, die abgeschlossen werden müssen, bevor das Enumeratorobjekt fortgesetzt wird. Ein Iterator legt diesen Wert über die Yield Return-Anweisung fest. Die Inbox-Zahl gibt die Anzahl der asynchronen Vorgänge an, die abgeschlossen wurden. Die Wait- und die Inbox-Zahl müssen geändert und mit threadsicheren Verfahren geprüft werden, da mehrere Threads gleichzeitig Code für asynchrone Vorgänge ausführen könnten, während der Iterator die Yield Return-Anweisung ausführt.
Die schnellste Methode zur threadsicheren Aktualisierung dieser Zahlen besteht darin, die beiden Zahlen in einen einzelnen Int32 zu setzen und dann Interlocked-Methoden zu verwenden, um Int32 zu aktualisieren. Um den Code lesbarer und leichter verwaltbar zu machen, habe ich mich dazu entschieden, zum Verwalten dieser beiden Zahlen eine WaitAndInboxCounts-Struktur zu definieren (siehe Abbildung 5). Die Struktur definiert ein privates Int32-Feld und bietet interne Mitglieder an, um Int32 threadsicher zu ändern. Der AsyncEnumerator-Code ruft diese Mitglieder auf.
private struct WaitAndInboxCounts {
   private const UInt16 c_MaxWait = 0xFFFF;
   // Wait=High 16 bits, Inbox=low-16 bits
   private Int32 m_waitAndInboxCounts;    

   private UInt16 Wait {
      get { return (UInt16) (m_waitAndInboxCounts >> 16); }
      set { m_waitAndInboxCounts = (Int32) ((value << 16) | Inbox); }
   }
   private UInt16 Inbox {
      get { return (UInt16) m_waitAndInboxCounts; }
      set { m_waitAndInboxCounts = 
         (Int32)((m_waitAndInboxCounts & 
            0xFFFF0000)|value); }
   }

   private WaitAndInboxCounts(Int32 waic) { m_waitAndInboxCounts = waic; }
   private Int32 ToInt32() { return m_waitAndInboxCounts; }

   internal void Initialize() { Wait = c_MaxWait; }

   internal Boolean AtomicSetWait(UInt16 numberOpsToWaitFor) {
      return InterlockedEx.Morph<Boolean, UInt16>(
         ref m_waitAndInboxCounts, 
         numberOpsToWaitFor, SetWait);
   }

   private static Int32 SetWait(Int32 i, UInt16 numberOpsToWaitFor, 
      out Boolean shouldMoveNext) {
      WaitAndInboxCounts waic = new WaitAndInboxCounts(i);
      // Set the number of items to wait for
      waic.Wait = numberOpsToWaitFor;  
      shouldMoveNext = (waic.Inbox >= waic.Wait);

      // Does the inbox contain enough items to MoveNext?
      if (shouldMoveNext) {         
         // Subtract the number of items from the inbox 
         waic.Inbox -= waic.Wait;   
         // The next wait is indefinite 
         waic.Wait = c_MaxWait;     
      }
      return waic.ToInt32();
   }

   internal Boolean AtomicIncrementInbox() {
      return InterlockedEx.Morph<Boolean, Object>(
         ref m_waitAndInboxCounts, 
         null, IncrementInbox);
   }

   private static Int32 IncrementInbox(Int32 i, Object argument, 
      out Boolean shouldMoveNext) {
      WaitAndInboxCounts waic = new WaitAndInboxCounts(i);
      // Add 1 to the inbox count
      waic.Inbox++;                 
      shouldMoveNext = (waic.Inbox == waic.Wait);

      // Does the inbox contain enough items to MoveNext?
      if (shouldMoveNext) {         
         // Subtract the number of items from the inbox 
         waic.Inbox -= waic.Wait;   
         // The next wait is indefinite 
         waic.Wait = c_MaxWait;     
      }
      return waic.ToInt32();
   }
}
Die beiden intern verfügbaren Methoden sind AtomicSetWait und AtomicIncrement­Inbox. Weiter oben in diesem Artikel wurde erklärt, was mit diesen Methoden erreicht wird. Jetzt soll die Funktionsweise erläutert werden, da diese Technik potenziell in vielen Szenarios zum Einsatz kommt, in denen mehrere Werte auf atomare Weise geändert werden müssen.
Wie Sie sicher bereits wissen, bietet die Interlocked-Klasse der Microsoft® .NET Framework-Bibliothek mehrere Methoden zum threadsicheren Ändern eines Int32-Werts. Es gibt Methoden zum Inkrementieren eines Int32 um 1, zum Dekrementieren eines Int32 um 1, zum Hinzufügen eines positiven oder negativen Werts zu einem Int32, zum Ändern eines Int32 in einen neuen Wert, zum Überprüfen, ob ein Int32 ein bestimmter Wert ist sowie zum Ändern dieses Werts in einen anderen Wert, wenn diese Bedingung erfüllt wird. Kurzum: Die Interlocked-Methoden sind großartig, aber ein Int32 kann auch über eine Vielzahl anderer Wege geändert werden.

Threadsiches Morphing
In meinem ersten Concurrent Affairs-Artikel (msdn.microsoft.com/msdnmag/issues/05/10/ConcurrentAffairs) wurde demonstriert, wie die CompareExchange-Methode der Interlocked-Klasse zum Ändern eines Int32-Werts verwendet werden kann. Genauer gesagt wurde aufgezeigt, wie auf atomare Weise ein bitweiser AND-, OR- oder XOR-Vorgang auf dem Int32 durchgeführt wird. Dieses Muster lässt sich leicht erweitern, um atomare Multiplikations-, Divisions-, Min-, Max- und viele andere Vorgänge durchzuführen.
Letztlich habe ich mich für eine Erweiterung des Musters entschieden, indem es in eine generische Methode mit dem Namen „Morph<TResult, TArgument>“ umgewandelt wurde, die wie folgt aussieht:
public static TResult Morph<TResult, TArgument>(ref Int32 target,
   TArgument argument, Morpher<TResult, TArgument> morpher) {

   TResult morphResult;
   Int32 i, j = target;
   do {
      i = j;
      j = Interlocked.CompareExchange(ref target, 
         morpher(i, argument, out morphResult), i);
   } while (i != j);
   return morphResult;
}
Wenn Sie diese Methode aufrufen, übergeben Sie ihr (durch einen Verweis) den Int32-Wert, den Sie atomar ändern möchten. Sie können auch ein zusätzliches Argument übergeben, das dann von Ihrem Morpher verwendet wird. Zum Schluss übergeben Sie eine Methode, deren Signatur dem Morpher-Delegaten entspricht und die wie folgt definiert ist:
delegate Int32 Morpher<TResult, TArgument>(Int32 startValue,
   TArgument argument, out TResult morphResult); 
An die Morpher-Methode werden der Anfangswert im Ziel-Int32 und das zusätzliche an Morph übergebene Argument übergeben. Die Morpher-Methode wendet einen Algorithmus auf den Int32-Wert an und gibt den gewünschten neuen Wert zurück. Wenn der ursprüngliche Int32-Wert nicht von einem anderen Thread geändert wurde, während der Algorithmuscode ausgeführt wurde, wird der Wert im Ziel dem von der Morpher-Methode zurückgegebenen Wert zugewiesen. Wenn der ursprüngliche Int32-Wert von einem anderen Thread geändert wurde, während der Algorithmuscode ausgeführt wurde, ruft die Morph-Methode die Morpher-Methode erneut auf und übergibt den neuen Int32-Wert (der vom anderen Thread geändert wurde).
Die Morpher-Methode muss auch einen Out-Parameter (morphResult) festlegen. Wenn Morph das Ziel erfolgreich ändert, ist dieser Out-Parameter der Wert, den die Morph-Methode an den Aufrufer zurückgibt. Das Gute hieran ist, dass der Code innerhalb der Morpher-Methode nicht threadsicher sein muss. Er kann Tausende von Zeilen lang sein und mehrere Manipulationen durchführen, wobei die Morph-Methode dafür verantwortlich ist, dass der ursprüngliche Int32-Wert threadsicher geändert wird.
Da nun die Morph-Methode und der Morper-Delegat definiert sind, kann diese Infrastruktur zum Implementieren der WaitAndInboxCounts AtomicSetWait- und der AtomicIncrementInbox-Methode verwendet werden.
AtomicSetWait ruft Morph intern auf und übergibt eine Morpher-Methode an Morph, durch die die neue Wait-Zahl auf den Wert festgelegt wird, der von der Yield Return-Anweisung des Iterators zurückgegeben wurde. Wenn die Inbox-Zahl kleiner als die neue Wait-Zahl ist, wird die neue Wait-Zahl atomar festgelegt. Außerdem wird an den Aufrufer von Morph „false“ zurückgegeben, wodurch dem Aufrufer mitgeteilt wird, dass die MoveNext-Methode des Enumeratorobjekts nicht aufgerufen werden sollte, da der Iterator anzeigt, dass er auf mehr Elemente in der Inbox warten will.
Wenn die Morph-Methode feststellt, dass die Inbox-Zahl größer oder gleich der neuen Wait-Zahl ist, subtrahiert die Morph-Methode die Wait-Zahl von der Inbox-Zahl (da sie diese Elemente logischerweise aus der Inbox entfernt), und die Wait-Zahl wird auf ihren Höchstwert (65535) festgelegt, damit der Inbox (über andere Threads) hinzugefügte Elemente nicht dazu führen, dass der Iterator erneut mit der Ausführung beginnt. Nachdem alle Vorgänge erfolgreich ausgeführt wurden, gibt die Morph-Methode „true“ an den Aufrufer zurück, und dieser Thread ruft die MoveNext-Methode des Enumeratorobjekts auf, wodurch der Iterator erneut gestartet wird, damit die abgeschlossenen Elemente, auf die gewartet wurde, verarbeitet werden können.
AtomicIncrementInbox ruft intern Morph auf und übergibt eine Morpher-Methode an Morph, durch die die Inbox um den Wert 1 inkrementiert wird. Wenn die Inbox-Zahl nicht gleich der aktuellen Wait-Zahl ist, wird die Wait-Zahl atomar festgelegt und an den Aufrufer von Morph „false“ zurückgegeben, wodurch dem Aufrufer mitgeteilt wird, dass die MoveNext-Methode des Enumeratorobjekts nicht aufgerufen werden sollte, da der Iterator anzeigt, dass er auf mehr Elemente in der Inbox warten will.
Wenn die Morpher-Methode (ähnlich wie bei der Morph-Methode) feststellt, dass die Inbox-Zahl gleich der aktuellen Wait-Zahl ist, subtrahiert die Morpher-Methode die Wait-Zahl von der Inbox-Zahl (da sie diese Elemente logischerweise aus der Inbox entfernt), und die Wait-Zahl wird auf ihren Höchstwert festgelegt, damit der Inbox (über andere Threads) hinzugefügte Elemente nicht dazu führen, dass der Iterator erneut mit der Ausführung beginnt. Nachdem alle Vorgänge erfolgreich ausgeführt wurden, gibt die Morph-Methode „true“ an den Aufrufer zurück, und dieser Thread ruft die MoveNext-Methode des Enumeratorobjekts auf, wodurch der Iterator erneut gestartet wird, damit die abgeschlossenen Elemente, auf die gewartet wurde, verarbeitet werden können.
Die WaitAndInboxCounts-Struktur definiert einige zusätzliche private Mitglieder, die intern verwendet werden, um den Int32-Wert nicht threadsicher zu ändern. Der private Konstruktor wandelt hierbei ein Int32 in eine WaitAndInboxCounts-Struktur um. Die private ToInt32-Methode gibt den Int32-Wert innerhalb einer WaitAndInboxCounts-Struktur zurück. Es gibt auch private Wait- und Inbox-Eigenschaften, durch die die niedrigen und hohen 16-Bit-Werte des Int32-Felds abgerufen und festgelegt werden.

Die Zukunft
Viele Entwickler wissen, wie wertvoll eine asynchrone Programmierung bei der Implementierung leistungsstarker, skalierbarer Anwendungen und Komponenten ist. In der Praxis empfinden viele Programmierer das asynchrone Programmiermodell allerdings als zu schwierig, vermeiden es daher und nehmen dafür eine schlechte Leistung bei ihren Anwendungen in Kauf. Es bleibt zu hoffen, dass meine Async­Enumerator-Klasse zusammen mit C#-Iteratoren die asynchrone Programmierung so vereinfacht, dass Programmierer sie in Zukunft nutzen werden und dadurch die Leistung und Skalierbarkeit ihrer Anwendungen durch eine geringere Anzahl Threads und Kontextwechsel verbessern.
Im nächsten Artikel werden weitere Features meiner Async­Enumerator-Klasse sowie verschiedene von ihr unterstützte Verwendungsmuster vorgestellt. Bis zum nächsten Mal.

Senden Sie Ihre Fragen und Kommentare für Jeffrey Richter in englischer Sprache an mmsync@microsoft.com.

Jeffrey Richter ist Mitbegründer von Wintellect (www.Wintellect.com), einem Unternehmen zur Prüfung von Softwarearchitekturen sowie für Beratung und Schulung. Er ist Autor verschiedener Bücher, darunter CLR via C#. Jeffrey Richter schreibt außerdem redaktionelle Beiträge für das MSDN Magazin und ist seit 1990 als Berater für Microsoft tätig.

Communityinhalt   Was ist Community Content?
Neuen Inhalt hinzufügen RSS  Anmerkungen
Processing
Page view tracker