Comment : synchroniser un thread producteur et un thread consommateur (Guide de programmation C#)

Mise à jour : novembre 2007

L'exemple suivant montre comment synchroniser le thread principal et les deux threads de travail à l'aide du mot clé lock, ainsi que les classes AutoResetEvent et ManualResetEvent. Pour plus d'informations, consultez lock, instruction (Référence C#).

L'exemple crée deux threads auxiliaires, ou threads actifs. Un thread produit des éléments et les stocke dans une file d'attente générique qui n'est pas thread-safe. Pour plus d'informations, consultez Queue<T>. L'autre thread consomme des éléments provenant de cette file d'attente. De plus, le thread principal affiche régulièrement le contenu de la file d'attente. Par conséquent, trois threads accèdent à la file d'attente. Le mot clé lock permet de synchroniser l'accès à la file d'attente pour vérifier que l'état de la file d'attente n'est pas endommagé.

La synchronisation supplémentaire est fournie par deux objets événement en complément du blocage de l'accès simultané avec le mot clé lock L'un est utilisé pour signaler les threads actifs à terminer, et l'autre est utilisé par le thread producteur pour signaler au thread consommateur lorsqu'un nouvel élément a été ajouté à la file d'attente. Ces deux objets événement sont encapsulés dans une classe nommée SyncEvents. Cela permet aux événements d'être passés aux objets qui représentent facilement les threads consommateur et producteur. La classe SyncEvents est définie comme suit :

public class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}

La classe AutoResetEvent est utilisée pour le « nouvel élément » afin que cet événement se réinitialise automatiquement à chaque fois que le thread consommateur répond à cet événement. La classe ManualResetEvent est également utilisée pour l'événement de « sortie » parce que vous souhaitez que plusieurs threads répondent quand cet événement est signalé. Si vous avez plutôt utilisé AutoResetEvent, l'événement retournerait à un état non signalé après qu'un seul thread a répondu à l'événement. L'autre thread ne répondrait pas, et dans ce cas, ne se terminerait pas.

La classe SyncEvents crée les deux événements et les stocke sous deux formes différentes : EventWaitHandle, qui est la classe de base pour AutoResetEvent et ManualResetEvent et dans un tableau basé sur WaitHandle. Comme vous le verrez dans la section concernant le thread consommateur, ce tableau est nécessaire afin que le thread consommateur puisse répondre à l'un ou l'autre événement.

Les threads consommateur et producteur sont représentés par les classes nommées Consumer et Producer. Elles définissent une méthode nommée ThreadRun. Ces méthodes servent de points d'entrée pour les threads actifs que la méthode Main crée.

La méthode ThreadRun définie par la classe Producer ressemble à ceci :

// Producer.ThreadRun
public void ThreadRun()
{
    int count = 0;
    Random r = new Random();
    while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            while (_queue.Count < 20)
            {
                _queue.Enqueue(r.Next(0,100));
                _syncEvents.NewItemEvent.Set();
                count++;
            }
        }
    }
    Console.WriteLine("Producer thread: produced {0} items", count);
}

Cette méthode se déplace en boucle jusqu'à ce que l'événement « sortie du thread » soit signalé. L'état de cet événement est testé avec la méthode WaitOne à l'aide de la propriété ExitThreadEvent définie par la classe SyncEvents. Dans ce cas, l'état de l'événement est vérifié sans bloquer le thread actuel car le premier argument utilisé avec WaitOne est zéro, indiquant que la méthode doit retourner immédiatement. Si WaitOne retourne true, l'événement en question est signalé actuellement. Si tel est le cas, la méthode ThreadRun retourne, ce qui a pour effet de terminer le thread actif qui l'exécute.

Jusqu'à ce que l'événement « sortie du thread » soit signalé, la méthode Producer.ThreadStart tente de conserver 20 éléments dans la file d'attente. Un élément est un entier compris entre zéro et 100. La collection doit être verrouillée avant d'ajouter de nouveaux éléments pour empêcher les threads consommateur et le thread principal d'accéder en même temps à la collection. Cela est fait en utilisant le mot clé lock. L'argument passé à lock est le champ SyncRoot exposé à travers l'interface ICollection. Ce champ est fourni spécifiquement pour synchroniser l'accès au thread. L'accès exclusif à la collection est accordé pour toutes les instructions contenues dans le bloc de code suivant lock. Pour chaque nouvel élément que le producteur ajoute à la file d'attente, un appel à la méthode Set sur l'événement « nouvel élément » est effectué. Cela indique au thread consommateur qu'il doit sortir de son état suspendu pour traiter le nouvel élément.

L'objet Consumer définit également une méthode appelée ThreadRun. Comme la version du producteur de ThreadRun, cette méthode est exécutée par un thread actif créé par la méthode Main. Toutefois, la version consommateur de ThreadStart doit répondre à deux événements. La méthode Consumer.ThreadRun se présente comme suit :

// Consumer.ThreadRun
public void ThreadRun()
{
    int count = 0;
    while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            int item = _queue.Dequeue();
        }
        count++;
    } 
    Console.WriteLine("Consumer Thread: consumed {0} items", count);
}

Cette méthode utilise WaitAny pour bloquer le thread consommateur jusqu'au signalement des handles d'attente dans le tableau fourni. Dans ce cas, il y a deux handles dans le tableau, un pour terminer les threads actifs, et l'autre pour indiquer qu'un nouvel élément a été ajouté à la collection. WaitAny retourne l'index de l'événement qui a été signalé. L'événement « nouvel élément » étant le premier du tableau, un index de zéro indique un nouvel élément. Dans ce cas, cherchez un index de 1 qui indique l'événement « sortie du thread ». Celui-ci permet de déterminer si cette méthode continue à consommer des éléments. Si l'événement « nouvel élément » a été signalé, vous obtenez l'accès exclusif à la collection avec lock et consommez le nouvel élément. Parce que cet exemple produit et consomme des milliers d'éléments, vous n'affichez pas tous les éléments consommés. Utilisez plutôt Main pour afficher périodiquement le contenu de la file d'attente, comme illustré plus loin.

La méthode Main commence par créer la file d'attente dont le contenu sera produit et consommé, et une instance de SyncEvents que vous avez vue précédemment :

Queue<int> queue = new Queue<int>();
SyncEvents syncEvents = new SyncEvents();

Ensuite, Main configure les objets Producer et Consumer pour les utiliser avec les threads actifs. Toutefois, cette étape ne crée ni ne lance les threads de travail réels :

Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);

Remarquez que la file d'attente et l'objet événement de synchronisation sont passés aux threads Consumer et Producer comme arguments de constructeur. Cela fournit les deux objets qui ont les ressources partagées nécessaires pour effectuer leurs tâches respectives. Deux nouveaux objets Thread sont ensuite créés, à l'aide de la méthode ThreadRun, comme arguments pour chaque objet. Chaque thread de travail, lors de son démarrage, utilise cet argument comme point d'entrée pour le thread.

Ensuite, Main lance les deux threads de travail avec un appel à la méthode Start, tel que :

producerThread.Start();
consumerThread.Start();

À ce stade, les deux nouveaux threads actifs sont créés et commencent l'exécution asynchrone, indépendamment du thread principal qui exécute actuellement la méthode Main. En fait, Main interrompt ensuite le thread principal avec un appel à la méthode Sleep. La méthode interrompt le thread actuellement en cours d'exécution pendant un nombre donné de millisecondes. Une fois cet intervalle écoulé, la méthode Main est réactivée et affiche le contenu de la file d'attente. Main répète ceci quatre fois de la manière suivante :

for (int i=0; i<4; i++)
{
    Thread.Sleep(2500);
    ShowQueueContents(queue);
}

Enfin, Main signale aux threads actifs qu'ils doivent se terminer en appelant la méthode Set de l'événement « sortie du thread », puis fait appel à la méthode Join sur chaque thread actif pour bloquer le thread principal jusqu'à ce que chaque thread actif réponde à l'événement et se termine.

Voici un dernier exemple de synchronisation de threads : la méthode ShowQueueContents. Cette méthode, comme les threads consommateur et producteur, utilise lock pour avoir un accès exclusif à la file d'attente. Dans ce cas, toutefois, l'accès exclusif est très important car ShowQueueContents énumère toute la collection. L'énumération d'une collection est une opération qui est particulièrement sujette à la corruption de données par des opérations asynchrones, car elle implique de parcourir le contenu de la collection entière.

Enfin, remarquez que ShowQueueContents, étant donné qu'il est appelé par Main, est exécuté par le thread principal. Cela signifie que cette méthode, lorsqu'elle obtient l'accès exclusif à la file d'attente de l'élément, empêche les threads producteur et consommateur d'accéder à la file d'attente. ShowQueueContents verrouille la file d'attente et énumère le contenu :

private static void ShowQueueContents(Queue<int> q)
{
    lock (((ICollection)q).SyncRoot)
    {
        foreach (int item in q)
        {
            Console.Write("{0} ", item);
        }
    }
    Console.WriteLine();
}

Voici un exemple de code complet :

Exemple

using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;

public class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}
public class Producer 
{
    public Producer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Producer.ThreadRun
    public void ThreadRun()
    {
        int count = 0;
        Random r = new Random();
        while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                while (_queue.Count < 20)
                {
                    _queue.Enqueue(r.Next(0,100));
                    _syncEvents.NewItemEvent.Set();
                    count++;
                }
            }
        }
        Console.WriteLine("Producer thread: produced {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

public class Consumer
{
    public Consumer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Consumer.ThreadRun
    public void ThreadRun()
    {
        int count = 0;
        while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                int item = _queue.Dequeue();
            }
            count++;
        } 
        Console.WriteLine("Consumer Thread: consumed {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

public class ThreadSyncSample
{
    private static void ShowQueueContents(Queue<int> q)
    {
        lock (((ICollection)q).SyncRoot)
        {
            foreach (int item in q)
            {
                Console.Write("{0} ", item);
            }
        }
        Console.WriteLine();
    }

    static void Main()
    {
        Queue<int> queue = new Queue<int>();
        SyncEvents syncEvents = new SyncEvents();

        Console.WriteLine("Configuring worker threads...");
        Producer producer = new Producer(queue, syncEvents);
        Consumer consumer = new Consumer(queue, syncEvents);
        Thread producerThread = new Thread(producer.ThreadRun);
        Thread consumerThread = new Thread(consumer.ThreadRun);

        Console.WriteLine("Launching producer and consumer threads...");        
        producerThread.Start();
        consumerThread.Start();

        for (int i=0; i<4; i++)
        {
            Thread.Sleep(2500);
            ShowQueueContents(queue);
        }

        Console.WriteLine("Signaling threads to terminate...");
        syncEvents.ExitThreadEvent.Set();

        producerThread.Join();
        consumerThread.Join();
    }

}
Configuring worker threads...
Launching producer and consumer threads...
22 92 64 70 13 59 9 2 43 52 91 98 50 96 46 22 40 94 24 87
79 54 5 39 21 29 77 77 1 68 69 81 4 75 43 70 87 72 59
0 69 98 54 92 16 84 61 30 45 50 17 86 16 59 20 73 43 21
38 46 84 59 11 87 77 5 53 65 7 16 66 26 79 74 26 37 56 92
Signalling threads to terminate...
Consumer Thread: consumed 1053771 items
Producer thread: produced 1053791 items

Voir aussi

Tâches

Synchronisation de moniteurs, exemple de technologie

Synchronisation d'attente, exemple de technologie

Concepts

Guide de programmation C#

Référence

Synchronisation de threads (Guide de programmation C#)

Thread

lock, instruction (Référence C#)

AutoResetEvent

ManualResetEvent

Set

Join

WaitOne

WaitAll

Queue

ICollection

Start

Sleep

WaitHandle

EventWaitHandle