共用方式為


如何:使用內容類別實作合作式信號

本主題顯示如何使用 concurrency::Context 類別實作合作式信號類別。

Context 類別可讓您封鎖或產生目前的執行內容。 當目前的內容因資源無法使用而無法繼續執行時,封鎖或產生目前的內容會非常有用。 有種情況是目前的執行內容必須等候資源變可用,而「信號」(Semaphore) 即屬於這種情況。 就像關鍵區段物件一樣,信號是一個同步處理物件,可讓某個內容中的程式碼對資源進行獨佔存取。 但是,與關鍵區段物件不同的是,信號可讓多個內容同時存取資源。 如果持有信號鎖定的內容數達到上限,則每個額外的內容都必須等候其他內容釋放鎖定。

若要實作信號類別

  1. 宣告名為 semaphore 的類別。 將 public 和 private 區段加入至這個類別。

    // A semaphore type that uses cooperative blocking semantics. 
    class semaphore
    {
    public:
    private:
    };
    
  2. semaphore 類別的 private 區段中,宣告 std::atomic 的變數 (這個變數會保留信號計數),以及 concurrency::concurrent_queue 物件 (這個物件會保留必須等候取得信號的內容)。

    // The semaphore count.
    atomic<long long> _semaphore_count;
    
    // A concurrency-safe queue of contexts that must wait to  
    // acquire the semaphore.
    concurrent_queue<Context*> _waiting_contexts;
    
  3. semaphore 類別的 public 區段中,實作建構函式。 這個建構函式要接受 long long 值,該值指定可以持有鎖定的並行內容數上限。

    explicit semaphore(long long capacity)
       : _semaphore_count(capacity)
    {
    }
    
  4. semaphore 類別的 public 區段中,實作 acquire 方法。 這個方法會將信號計數遞減,做為不可部分完成的作業。 如果信號計數變成負數,請將目前的內容加入至等候佇列的結尾,並呼叫 concurrency::Context::Block 方法來封鎖目前的內容。

    // Acquires access to the semaphore. 
    void acquire()
    {
       // The capacity of the semaphore is exceeded when the semaphore count  
       // falls below zero. When this happens, add the current context to the  
       // back of the wait queue and block the current context. 
       if (--_semaphore_count < 0)
       {
          _waiting_contexts.push(Context::CurrentContext());
          Context::Block();
       }
    }
    
  5. semaphore 類別的 public 區段中,實作 release 方法。 這個方法會將信號計數遞增,做為不可部分完成的作業。 如果在遞增作業執行之前信號計數是負數,則至少有一個內容在等候取得鎖定。 在此情況下,請解除封鎖擋在等候佇列前面的內容。

    // Releases access to the semaphore. 
    void release()
    {
       // If the semaphore count is negative, unblock the first waiting context. 
       if (++_semaphore_count <= 0)
       {
          // A call to acquire might have decremented the counter, but has not 
          // yet finished adding the context to the queue.  
          // Create a spin loop that waits for the context to become available.
          Context* waiting = NULL;
          while (!_waiting_contexts.try_pop(waiting))
          {
             Context::Yield();
          }
    
          // Unblock the context.
          waiting->Unblock();
       }
    }
    

範例

這個範例中的 semaphore 類別會展現合作行為,因為 Context::BlockContext::Yield 方法會產生讓執行階段可以執行其他工作的執行作業。

acquire 方法會將計數器遞減,但是它可能無法趕在其他內容呼叫 release 方法之前,完成將內容加入至等候佇列。 為了處理這種情況,release 方法會使用空轉迴圈,這個迴圈會呼叫 concurrency::Context::Yield 方法,以等候 acquire 方法完成加入內容的動作。

acquire 方法呼叫 Context::Block 方法之前,release 方法可以呼叫 Context::Unblock 方法。 您無須擔心這種呼叫順序,因為執行階段允許以任何順序呼叫這些方法。 如果在相同的內容中,release 方法在 acquire 方法呼叫 Context::Block 之前呼叫了 Context::Unblock,則該內容會維持解除封鎖狀態。 執行階段只需要每個 Context::Block 呼叫都有一個對應的 Context::Unblock 呼叫。

下列範例顯示完整的 semaphore 類別。 wmain 函式會示範這個類別的基本用法。 wmain 函式會使用 concurrency::parallel_for 演算法來建立數個需要存取信號的工作。 因為不論何時都可以有三個執行緒持有鎖定,因此有些工作必須等候其他工作執行完成並釋放鎖定。

// cooperative-semaphore.cpp 
// compile with: /EHsc
#include <atomic>
#include <concrt.h>
#include <ppl.h>
#include <concurrent_queue.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics. 
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore. 
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count  
      // falls below zero. When this happens, add the current context to the  
      // back of the wait queue and block the current context. 
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore. 
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context. 
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not 
         // yet finished adding the context to the queue.  
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            Context::Yield();
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to  
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

int wmain()
{
   // Create a semaphore that allows at most three threads to  
   // hold the lock.
   semaphore s(3);

   parallel_for(0, 10, [&](int i) {
      // Acquire the lock.
      s.acquire();

      // Print a message to the console.
      wstringstream ss;
      ss << L"In loop iteration " << i << L"..." << endl;
      wcout << ss.str();

      // Simulate work by waiting for two seconds.
      wait(2000);

      // Release the lock.
      s.release();
   });
}

這個範例 (Example) 產生下列範例 (Sample) 輸出。

  

如需 concurrent_queue 類別的詳細資訊,請參閱 平行容器和物件。 如需 parallel_for 演算法的詳細資訊,請參閱平行演算法

編譯程式碼

請複製範例程式碼,並將它貼在 Visual Studio 專案中,或貼在名為 cooperative-semaphore.cpp 的檔案中,然後在 Visual Studio 的 [命令提示字元] 視窗中執行下列命令。

cl.exe /EHsc cooperative-semaphore.cpp

穩固程式設計

您可以使用「資源擷取為初始設定」(Resource Acquisition Is Initialization,RAII) 模式,將 semaphore 物件存取限制在指定的範圍。 在 RAII 模式下,資料結構會配置於堆疊上。 該資料結構會在建立時初始化或擷取資源,並在資料結構終結時終結或釋放該資源。 RAII 模式可保證在封閉範圍結束之前呼叫解構函式。 因此,當有例外狀況擲回或是函式包含多個 return 陳述式時,都會正確地管理資源。

下列範例會定義名為 scoped_lock 的類別,這個類別定義於 semaphore 類別的 public 區段中。 scoped_lock 類別類似於 concurrency::critical_section::scoped_lockconcurrency::reader_writer_lock::scoped_lock 類別。 semaphore::scoped_lock 類別的建構函式會取得對所指定 semaphore 物件的存取,而解構函式則會釋放對該物件的存取。

// An exception-safe RAII wrapper for the semaphore class. 
class scoped_lock
{
public:
   // Acquires access to the semaphore.
   scoped_lock(semaphore& s)
      : _s(s)
   {
      _s.acquire();
   }
   // Releases access to the semaphore.
   ~scoped_lock()
   {
      _s.release();
   }

private:
   semaphore& _s;
};

下列範例會修改傳遞給 parallel_for 演算法的工作函式主體,讓它使用 RAII 以確保在函式傳回之前已釋放信號。 這項技術可確保工作函式不怕例外狀況發生。

parallel_for(0, 10, [&](int i) {
   // Create an exception-safe scoped_lock object that holds the lock  
   // for the duration of the current scope.
   semaphore::scoped_lock auto_lock(s);

   // Print a message to the console.
   wstringstream ss;
   ss << L"In loop iteration " << i << L"..." << endl;
   wcout << ss.str();

   // Simulate work by waiting for two seconds.
   wait(2000);
});

請參閱

參考

Context 類別

概念

內容

平行容器和物件