Control.MailboxProcessor<'Msg>-Klasse (F#)

Ein Agent für die Meldungsverarbeitung, der eine asynchrone Berechnung ausführt.

Namespace/Modulpfad: Microsoft.FSharp.Control

Assembly: FSharp.Core (in FSharp.Core.dll)

[<Sealed>]
[<AutoSerializable(false)>]
type MailboxProcessor<'Msg> =
 class
  interface IDisposable
  new MailboxProcessor : (MailboxProcessor<'Msg> -> Async<unit>) * ?CancellationToken -> MailboxProcessor<'Msg>
  member this.Post : 'Msg -> unit
  member this.PostAndAsyncReply : (AsyncReplyChannel<'Reply> -> 'Msg) * int option -> Async<'Reply>
  member this.PostAndReply : (AsyncReplyChannel<'Reply> -> 'Msg) * int option -> 'Reply
  member this.PostAndTryAsyncReply : (AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> Async<'Reply option>
  member this.Receive : ?int -> Async<'Msg>
  member this.Scan : ('Msg -> Async<'T> option) * ?int -> Async<'T>
  member this.Start : unit -> unit
  static member Start : (MailboxProcessor<'Msg> -> Async<unit>) * ?CancellationToken -> MailboxProcessor<'Msg>
  member this.TryPostAndReply : (AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> 'Reply option
  member this.TryReceive : ?int -> Async<'Msg option>
  member this.TryScan : ('Msg -> Async<'T> option) * ?int -> Async<'T option>
  member this.add_Error : Handler<Exception> -> unit
  member this.CurrentQueueLength :  int
  member this.DefaultTimeout :  int with get, set
  member this.Error :  IEvent<Exception>
  member this.remove_Error : Handler<Exception> -> unit
 end

Hinweise

Im Agent ist eine Meldungswarteschlange gekapselt, die mehrere Writer und einen einzigen Reader-Agent unterstützt.Writer senden mithilfe der Post-Methode und deren Varianten Nachrichten an den Agent.Der Agent wartet möglicherweise auf Meldungen, die die Receive-Methode oder die TryReceive-Methode verwenden, oder er überprüft möglicherweise alle verfügbaren Meldungen mithilfe der Scan-Methode oder der TryScan-Methode.

Der Name dieses Typs in der .NET-Assembly lautet FSharpMailboxProcessor.Verwenden Sie diesen Namen, wenn Sie in einer anderen .NET-Sprache als F# oder durch Reflektion auf den Typ zugreifen.

Konstruktoren

Member

Description

new

Erstellt einen Agent.Die body-Funktion wird die asynchrone Berechnung generiert, die der Agent ausführt.Diese Funktion wird erst bei einem Aufruf von Start ausgeführt.

Instanzmember

Member

Description

add_Error

Tritt auf, wenn die Ausführung der Agent-Ergebnisse eine Ausnahme ergibt.

CurrentQueueLength

Gibt die Anzahl der nicht verarbeiteten Meldungen in der Meldungswarteschlange des Agents zurück.

DefaultTimeout

Löst eine Timeoutausnahme aus, wenn eine Meldung nicht innerhalb dieser Zeitspanne empfangen wird.Standardmäßig wird kein Timeout verwendet.

Fehler

Tritt auf, wenn die Ausführung der Agent-Ergebnisse eine Ausnahme ergibt.

Post

Sendet asynchron eine Meldung an die Meldungswarteschlange des MailboxProcessor.

PostAndAsyncReply

Sendet asynchron eine Meldung an einen Agent und wartet auf eine Antwort über den Kanal.

PostAndReply

Stellt eine Nachricht an einen Agent bereit und erwartet gleichzeitig eine Antwort auf dem Kanal.

PostAndTryAsyncReply

Wie AsyncPostAndReply, es wird jedoch None zurückgegeben, wenn innerhalb des Timeoutzeitraums keine Antwort empfangen wird.

Receive

Wartet auf eine Meldung.Die erste Meldung in der Reihenfolge des Eintreffens wird verarbeitet.

remove_Error

Tritt auf, wenn die Ausführung der Agent-Ergebnisse eine Ausnahme ergibt.

Scan

Sucht nach einer Meldung, indem die Meldungen in der Reihenfolge ihres Eintreffens geprüft werden, bis der scanner einen Some-Wert zurückgibt.Andere Meldungen verbleiben in der Warteschlange.

Starten

Startet den Agent.

TryPostAndReply

Wie PostAndReply, es wird jedoch None zurückgegeben, wenn innerhalb des Timeoutzeitraums keine Antwort empfangen wird.

TryReceive

Wartet auf eine Meldung.Die erste Meldung in der Reihenfolge des Eintreffens wird verarbeitet.

TryScan

Sucht nach einer Meldung, indem die Meldungen in der Reihenfolge ihres Eintreffens geprüft werden, bis der scanner einen Some-Wert zurückgibt.Andere Meldungen verbleiben in der Warteschlange.

Statische Member

Member

Description

Starten

Erstellt und startet einen Agent.Die body-Funktion wird die asynchrone Berechnung generiert, die der Agent ausführt.

Beispiel

Im folgenden Beispiel wird die grundlegende Verwendung der MailboxProcessor-Klasse veranschaulicht.

open System
open Microsoft.FSharp.Control

type Message(id, contents) =
    static let mutable count = 0
    member this.ID = id
    member this.Contents = contents
    static member CreateMessage(contents) =
        count <- count + 1
        Message(count, contents)

let mailbox = new MailboxProcessor<Message>(fun inbox ->
    let rec loop count =
        async { printfn "Message count = %d. Waiting for next message." count
                let! msg = inbox.Receive()
                printfn "Message received. ID: %d Contents: %s" msg.ID msg.Contents
                return! loop( count + 1) }
    loop 0)

mailbox.Start()

mailbox.Post(Message.CreateMessage("ABC"))
mailbox.Post(Message.CreateMessage("XYZ"))


Console.WriteLine("Press any key...")
Console.ReadLine() |> ignore

Beispielausgabe

  
  
  
  
  
  
  
  
  

Im folgenden Beispiel wird veranschaulicht, wie mit MailboxProcessor ein einfacher Agent erstellt wird, der verschiedene Arten von Nachrichten akzeptiert und entsprechende Antworten zurückgibt.Dieser Server-Agent stellt einen Wertpapierhändler dar, der Einkäufer und Verkäufer an einer Börse ist, die Geld-/Briefkurse für Güter festlegt.Clients können Preise abfragen oder Anteile kaufen und verkaufen.

open System

type AssetCode = string

type Asset(code, bid, ask, initialQuantity) =
    let mutable quantity = initialQuantity
    member this.AssetCode = code
    member this.Bid = bid
    member this.Ask = ask
    member this.Quantity with get() = quantity and set(value) = quantity <- value


type OrderType =
    | Buy of AssetCode * int
    | Sell of AssetCode * int

type Message =
    | Query of AssetCode * AsyncReplyChannel<Reply>
    | Order of OrderType * AsyncReplyChannel<Reply>
and Reply =
    | Failure of string
    | Info of Asset
    | Notify of OrderType

let assets = [| new Asset("AAA", 10.0, 10.05, 1000000);
                new Asset("BBB", 20.0, 20.10, 1000000);
                new Asset("CCC", 30.0, 30.15, 1000000) |]

let codeAssetMap = assets
                   |> Array.map (fun asset -> (asset.AssetCode, asset))
                   |> Map.ofArray

let mutable totalCash = 00.00
let minCash = -1000000000.0
let maxTransaction = 1000000.0

let marketMaker = new MailboxProcessor<Message>(fun inbox ->
    let rec Loop() =
        async {
            let! message = inbox.Receive()
            match message with
            | Query(assetCode, replyChannel) ->
                match (Map.tryFind assetCode codeAssetMap) with
                | Some asset ->
                    printfn "Replying with Info for %s" (asset.AssetCode)
                    replyChannel.Reply(Info(asset))
                | None -> replyChannel.Reply(Failure("Asset code not found."))
            | Order(order, replyChannel) ->
                match order with
                | Buy(assetCode, quantity) ->
                    match (Map.tryFind assetCode codeAssetMap) with
                    | Some asset ->
                        if (quantity < asset.Quantity) then
                            asset.Quantity <- asset.Quantity - quantity
                            totalCash <- totalCash + float quantity * asset.Ask
                            printfn "Replying with Notification:\nBought %d units of %s at price $%f. Total purchase $%f."
                                    quantity asset.AssetCode asset.Ask (asset.Ask * float quantity)
                            printfn "Marketmaker balance: $%10.2f" totalCash
                            replyChannel.Reply(Notify(Buy(asset.AssetCode, quantity)))
                        else
                            printfn "Insufficient shares to fulfill order for %d units of %s."
                                    quantity asset.AssetCode
                            replyChannel.Reply(Failure("Insufficient shares to fulfill order."))
                    | None -> replyChannel.Reply(Failure("Asset code not found."))
                | Sell(assetCode, quantity) ->
                    match (Map.tryFind assetCode codeAssetMap) with
                    | Some asset ->
                        if (float quantity * asset.Bid <= maxTransaction && totalCash - float quantity * asset.Bid > minCash) then
                            asset.Quantity <- asset.Quantity + quantity
                            totalCash <- totalCash - float quantity * asset.Bid
                            printfn "Replying with Notification:\nSold %d units of %s at price $%f. Total sale $%f."
                                    quantity asset.AssetCode asset.Bid (asset.Bid * float quantity)
                            printfn "Marketmaker balance: $%10.2f" totalCash
                            replyChannel.Reply(Notify(Sell(asset.AssetCode, quantity)))
                        else
                            printfn "Insufficient cash to fulfill order for %d units of %s."
                                    quantity asset.AssetCode
                            replyChannel.Reply(Failure("Insufficient cash to cover order."))
                    | None -> replyChannel.Reply(Failure("Asset code not found."))
            do! Loop()
        }
    Loop())

marketMaker.Start()

// Query price.
let reply1 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for AAA"
    Query("AAA", replyChannel))

// Test Buy Order.
let reply2 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for BBB"
    Order(Buy("BBB", 100), replyChannel))

// Test Sell Order.
let reply3 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for CCC"
    Order(Sell("CCC", 100), replyChannel))

// Test incorrect code.
let reply4 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for WrongCode"
    Order(Buy("WrongCode", 100), replyChannel))

// Test too large a number of shares.

let reply5 = marketMaker.PostAndReply(fun replyChannel ->
    printfn "Posting message with large number of shares of AAA."
    Order(Buy("AAA", 1000000000), replyChannel))

// Too large an amount of money for one transaction.

let reply6 = marketMaker.PostAndReply(fun replyChannel ->
    printfn "Posting message with too large of a monetary amount."
    Order(Sell("AAA", 100000000), replyChannel))

let random = new Random()
let nextTransaction() =
    let buyOrSell = random.Next(2)
    let asset = assets.[random.Next(3)]
    let quantity = Array.init 3 (fun _ -> random.Next(1000)) |> Array.sum
    match buyOrSell with
    | n when n % 2 = 0 -> Buy(asset.AssetCode, quantity)
    | _ -> Sell(asset.AssetCode, quantity)

let simulateOne() =
   async {
       let! reply = marketMaker.PostAndAsyncReply(fun replyChannel ->
           let transaction = nextTransaction()
           match transaction with
           | Buy(assetCode, quantity) -> printfn "Posting BUY %s %d." assetCode quantity
           | Sell(assetCode, quantity) -> printfn "Posting SELL %s %d." assetCode quantity
           Order(transaction, replyChannel))
       printfn "%s" (reply.ToString())
    }

let simulate =
    async {
        while (true) do
            do! simulateOne()
            // Insert a delay so that you can see the results more easily.
            do! Async.Sleep(1000)
    }

Async.Start(simulate)

Console.WriteLine("Press any key...")
Console.ReadLine() |> ignore

Beispielausgabe

  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  

Plattformen

Windows 8, Windows 7, Windows Server 2012, Windows Server 2008 R2

Versionsinformationen

F#-Kern-Bibliotheks-Versionen

Unterstützt in: 2,0, 4,0, portablen

Siehe auch

Referenz

Microsoft.FSharp.Control-Namespace (F#)