Provides a mechanism for receiving push-based notifications.
Assembly: mscorlib (in mscorlib.dll)
Public Interface IObserver(Of In T)
public interface IObserver<in T>
generic<typename T> public interface class IObserver
type IObserver<'T> = interface end
Type Parameters
- In T
-
The object that provides notification information.
This type parameter is contravariant. That is, you can use either the type you specified or any type that is less derived. For more information about covariance and contravariance, see Covariance and Contravariance in Generics.
The IObserver(Of T) type exposes the following members.
| Name | Description | |
|---|---|---|
|
OnCompleted | Notifies the observer that the provider has finished sending push-based notifications. |
|
OnError | Notifies the observer that the provider has experienced an error condition. |
|
OnNext | Provides the observer with new data. |
The IObserver(Of T) and IObservable(Of T) interfaces provide a generalized mechanism for push-based notification, also known as the observer design pattern. The IObservable(Of T) interface represents the class that sends notifications (the provider); the IObserver(Of T) interface represents the class that receives them (the observer). T represents the class that provides the notification information.
An IObserver(Of T) implementation arranges to receive notifications from a provider (an IObservable(Of T) implementation) by passing an instance of itself to the provider's IObservable(Of T).Subscribe method. This method returns an IDisposable object that can be used to unsubscribe the observer before the provider finishes sending notifications.
The IObserver(Of T) interface defines the following three methods that the observer must implement:
-
The OnNext method, which is typically called by the provider to supply the observer with new data or state information.
-
The OnError method, which is typically called by the provider to indicate that data is unavailable, inaccessible, or corrupted, or that the provider has experienced some other error condition.
-
The OnCompleted method, which is typically called by the provider to indicate that it has finished sending notifications to observers.
The following example illustrates the observer design pattern. It defines a Location class that contains latitude and longitude information.
Public Structure Location Dim lat, lon As Double Public Sub New(ByVal latitude As Double, ByVal longitude As Double) Me.lat = latitude Me.lon = longitude End Sub Public ReadOnly Property Latitude As Double Get Return Me.lat End Get End Property Public ReadOnly Property Longitude As Double Get Return Me.lon End Get End Property End Structure
public struct Location { double lat, lon; public Location(double latitude, double longitude) { this.lat = latitude; this.lon = longitude; } public double Latitude { get { return this.lat; } } public double Longitude { get { return this.lon; } } }
The LocationReporter class provides the IObserver(Of T) implementation. It displays information about the current location to the console. Its constructor includes a name parameter, which allows the LocationReporter instance to identify itself in its string output. It also includes a Subscribe method, which wraps a call to the provider's Subscribe method. This enables the method to assign the returned IDisposable reference to a private variable. The LocationReporter class also includes an Unsubscribe method, which calls the IDisposable.Dispose method of the object returned by the IObservable(Of T).Subscribe method. The following code defines the LocationReporter class.
Public Class LocationReporter : Implements IObserver(Of Location) Dim unsubscriber As IDisposable Dim instName As String Public Sub New(ByVal name As String) Me.instName = name End Sub Public ReadOnly Property Name As String Get Return instName End Get End Property Public Overridable Sub Subscribe(ByVal provider As IObservable(Of Location)) If provider Is Nothing Then Exit Sub unsubscriber = provider.Subscribe(Me) End Sub Public Overridable Sub OnCompleted() Implements System.IObserver(Of Location).OnCompleted Console.WriteLine("The Location Tracker has completed transmitting data to {0}.", Me.Name) Me.Unsubscribe() End Sub Public Overridable Sub OnError(ByVal e As System.Exception) Implements System.IObserver(Of Location).OnError Console.WriteLine("{0}: The location cannot be determined.", Me.Name) End Sub Public Overridable Sub OnNext(ByVal value As Location) Implements System.IObserver(Of Location).OnNext Console.WriteLine("{2}: The current location is {0}, {1}", value.Latitude, value.Longitude, Me.Name) End Sub Public Overridable Sub Unsubscribe() unsubscriber.Dispose() End Sub End Class
using System; public class LocationReporter : IObserver<Location> { private IDisposable unsubscriber; private string instName; public LocationReporter(string name) { this.instName = name; } public string Name { get{ return this.instName; } } public virtual void Subscribe(IObservable<Location> provider) { if (provider != null) unsubscriber = provider.Subscribe(this); } public virtual void OnCompleted() { Console.WriteLine("The Location Tracker has completed transmitting data to {0}.", this.Name); this.Unsubscribe(); } public virtual void OnError(Exception e) { Console.WriteLine("{0}: The location cannot be determined.", this.Name); } public virtual void OnNext(Location value) { Console.WriteLine("{2}: The current location is {0}, {1}", value.Latitude, value.Longitude, this.Name); } public virtual void Unsubscribe() { unsubscriber.Dispose(); } }
The LocationTracker class provides the IObservable(Of T) implementation. Its TrackLocation method is passed a nullable Location object that contains the latitude and longitude data. If the Location value is not Nothing, the TrackLocation method calls the OnNext method of each observer.
Public Class LocationTracker : Implements IObservable(Of Location) Public Sub New() observers = New List(Of IObserver(Of Location)) End Sub Private observers As List(Of IObserver(Of Location)) Public Function Subscribe(ByVal observer As System.IObserver(Of Location)) As System.IDisposable _ Implements System.IObservable(Of Location).Subscribe If Not observers.Contains(observer) Then observers.Add(observer) End If Return New Unsubscriber(observers, observer) End Function Private Class Unsubscriber : Implements IDisposable Private _observers As List(Of IObserver(Of Location)) Private _observer As IObserver(Of Location) Public Sub New(ByVal observers As List(Of IObserver(Of Location)), ByVal observer As IObserver(Of Location)) Me._observers = observers Me._observer = observer End Sub Public Sub Dispose() Implements IDisposable.Dispose If _observer IsNot Nothing AndAlso _observers.Contains(_observer) Then _observers.Remove(_observer) End If End Sub End Class Public Sub TrackLocation(ByVal loc As Nullable(Of Location)) For Each observer In observers If Not loc.HasValue Then observer.OnError(New LocationUnknownException()) Else observer.OnNext(loc.Value) End If Next End Sub Public Sub EndTransmission() For Each observer In observers.ToArray() If observers.Contains(observer) Then observer.OnCompleted() Next observers.Clear() End Sub End Class
public class LocationTracker : IObservable<Location> { public LocationTracker() { observers = new List<IObserver<Location>>(); } private List<IObserver<Location>> observers; public IDisposable Subscribe(IObserver<Location> observer) { if (! observers.Contains(observer)) observers.Add(observer); return new Unsubscriber(observers, observer); } private class Unsubscriber : IDisposable { private List<IObserver<Location>>_observers; private IObserver<Location> _observer; public Unsubscriber(List<IObserver<Location>> observers, IObserver<Location> observer) { this._observers = observers; this._observer = observer; } public void Dispose() { if (_observer != null && _observers.Contains(_observer)) _observers.Remove(_observer); } } public void TrackLocation(Nullable<Location> loc) { foreach (var observer in observers) { if (! loc.HasValue) observer.OnError(new LocationUnknownException()); else observer.OnNext(loc.Value); } } public void EndTransmission() { foreach (var observer in observers.ToArray()) if (observers.Contains(observer)) observer.OnCompleted(); observers.Clear(); } }
If the Location value is Nothing, the TrackLocation method instantiates a LocationNotFoundException object, which is shown in the following example. It then calls each observer's OnError method and passes it the LocationNotFoundException object. Note that LocationNotFoundException derives from Exception but does not add any new members.
Public Class LocationUnknownException : Inherits Exception Friend Sub New() End Sub End Class
public class LocationUnknownException : Exception { internal LocationUnknownException() { } }
Observers register to receive notifications from a TrackLocation object by calling its IObservable(Of T).Subscribe method, which assigns a reference to the observer object to a private generic List(Of T) object. The method returns an Unsubscriber object, which is an IDisposable implementation that enables observers to stop receiving notifications. The LocationTracker class also includes an EndTransmission method. When no further location data is available, the method calls each observer's OnCompleted method and then clears the internal list of observers.
The following code then instantiates the provider and the observer.
Module Module1 Dim provider As LocationTracker Sub Main() ' Define a provider and two observers. provider = New LocationTracker() Dim reporter1 As New LocationReporter("FixedGPS") reporter1.Subscribe(provider) Dim reporter2 As New LocationReporter("MobileGPS") reporter2.Subscribe(provider) provider.TrackLocation(New Location(47.6456, -122.1312)) reporter1.Unsubscribe() provider.TrackLocation(New Location(47.6677, -122.1199)) provider.TrackLocation(Nothing) provider.EndTransmission() End Sub End Module ' The example displays output similar to the following: ' FixedGPS: The current location is 47.6456, -122.1312 ' MobileGPS: The current location is 47.6456, -122.1312 ' MobileGPS: The current location is 47.6677, -122.1199 ' MobileGPS: The location cannot be determined. ' The Location Tracker has completed transmitting data to MobileGPS.
using System; class Program { static void Main(string[] args) { // Define a provider and two observers. LocationTracker provider = new LocationTracker(); LocationReporter reporter1 = new LocationReporter("FixedGPS"); reporter1.Subscribe(provider); LocationReporter reporter2 = new LocationReporter("MobileGPS"); reporter2.Subscribe(provider); provider.TrackLocation(new Location(47.6456, -122.1312)); reporter1.Unsubscribe(); provider.TrackLocation(new Location(47.6677, -122.1199)); provider.TrackLocation(null); provider.EndTransmission(); } } // The example displays output similar to the following: // FixedGPS: The current location is 47.6456, -122.1312 // MobileGPS: The current location is 47.6456, -122.1312 // MobileGPS: The current location is 47.6677, -122.1199 // MobileGPS: The location cannot be determined. // The Location Tracker has completed transmitting data to MobileGPS.
.NET Framework
Supported in: 4.NET Framework Client Profile
Supported in: 4Windows 7, Windows Vista SP1 or later, Windows XP SP3, Windows Server 2008 (Server Core not supported), Windows Server 2008 R2 (Server Core supported with SP1 or later), Windows Server 2003 SP2
The .NET Framework does not support all versions of every platform. For a list of the supported versions, see .NET Framework System Requirements.
Reference
As there is a single interface (IObservable<T>) for Observing events, notifications, stream updates etc, the power of LINQ can be applied.
1) Consistency in resource management - As stated above event handles can be a common form of memory leaks. Using Rx will not in itself solve that problem, however it does offer the consistent dispose pattern via IDisposable. This willallow users and tools (like FxCop) to track resource allocation and disposal ina more effective manner.
2) Composition - Many observable sequences can be composed together in many different patterns(Concatenation, Merging, Grouping, Joining etc) which allows for a vast number of use cases. Composition can also aid in resource management as often there will only be a single resource (subscription) to dispose of from the result of a composite observable sequence.
3) Unitive - For an example; if you were trying to compose a Button Click that started a subscription to a Comet feed which should only last for a certain amount of time and then terminate, this would be generally implemented imperatively and possibly using three styles of event handling (Event Handles, APM, Windows Timers). An alternative is that all of these could be exposed via IObservable<T> giving the consuming developer a single pattern to work with.
4) Declarative - As with the example above, without a single interface to work with you would lose the Declarative nature of LINQ. You would probably need several classes to encapsulate the plumbing and possibly a Controller to orchestrate the workflow. With Rx you use extension methods to transition from any of these three example patterns (Event Handles, APM, Timers) into IObservable<T> and then you can use a single declaritive LINQ statement to define the workflow in a declarative nature. Using Rx, this could be achieved with a single LINQ statement which would declare the intent of the code more succinctly.
5) Transformational - As with other flavours of LINQ is is easy to transform in input type to a different output type. You may reduce the input to a more primitive type or you may combine many streams and enrich the single output as a specialised composite of the observable sequences.
6) Extensible - As LINQ offers the ability to provide extension methods you can easily add your own extensions to IObservable<T>. Rx already exposes literally hundreds of extension method overloads that cover Creation, Transformation, Filtering, Scheduling (handling concurrency), Aggregation and Combination/Composition.
In addition to the LINQ benefits that using IObservable<T> via Rx will give you, it also offers other benefits such as :
*an effective and simple way to manage concurrency via the IScheduler interface.
*a Testing framework to enable unit testing of event based workflows. In the example (from point 3 above), it would prove difficult to test all of the moving parts. However as with Rx it would be a single declarative statement then the testing should be greatly simplified.
*IQbservable<T> is to IObservable<T> what IQueryable<T> is to IEnumerable<T>. This interface allows you to construct expression trees to describe the observable sequence you want and possibly serialise it and send that to a server to provide you with specific data you want. This is similar to what LINQ-to-SQL does but for IQbservable<T> it would be push based not pull based.
For an in-depth introduction you could see the Introduction to Rx
http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html
The Rx Forums also has a post that contains the definitive list of Rx resources
http://social.msdn.microsoft.com/Forums/en-US/rx/thread/2cbd3b1c-d535-46ba-a9cf-3cd576a8e7c2
Further experimental Rx documentation can be found at
http://msdn.microsoft.com/en-us/library/hh242985(v=VS.103).aspx
I hope this helps
Lee Campbell
(excuse the formating, this is the 5th attempt to get MSDN to format this properly)
http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx
Rx also provides an entire infrastructure of LINQ extension methods for the reactive push-based model (IObservable<T> / IObserver<T>) and includes new extension methods for the interactive pull-based model that has been around since .NET 2.0 (IEnumerable<T> / IEnumerator<T>).
The reactive LINQ extensions that Rx provides enable composition of IObservable<T> sequences using query comprehension syntax and fluent method call syntax in the same way that we use LINQ to compose queries over IEnumerable<T> sequences.
From what I understand, one of the major advantages of this new approach over the event/event handler mechanism is the prevention of memory leak which happens when we forget to unsubscribe the event handler from the event.
Noli San Jose
