Export (0) Print
Expand All

Observable.Window<TSource, TWindowClosing> Method (IObservable<TSource>, Func<IObservable<TWindowClosing>>)

Projects each element of an observable sequence into consecutive non-overlapping windows.

Namespace:  System.Reactive.Linq
Assembly:  System.Reactive (in System.Reactive.dll)

public static IObservable<IObservable<TSource>> Window<TSource, TWindowClosing>(
	this IObservable<TSource> source,
	Func<IObservable<TWindowClosing>> windowClosingSelector
)

Type Parameters

TSource

The type of source.

TWindowClosing

The type of window closing.

Parameters

source
Type: System.IObservable<TSource>
The source sequence to produce windows over.
windowClosingSelector
Type: System.Func<IObservable<TWindowClosing>>
A function invoked to define the boundaries of the produced windows.

Return Value

Type: System.IObservable<IObservable<TSource>>
An observable sequence of windows.

Usage Note

In Visual Basic and C#, you can call this method as an instance method on any object of type IObservable<TSource>. When you use instance method syntax to call this method, omit the first parameter. For more information, see http://msdn.microsoft.com/en-us/library/bb384936.aspx or http://msdn.microsoft.com/en-us/library/bb383977.aspx.

The Window operator breaks up an observable sequence into consecutive non-overlapping windows. The end of the current window and start of the next window is controlled by an observable sequence which is the result of the windowClosingSelect function which is passed as an input parameter to the operator. The operator could be used to group a set of events into a window. For example, states of a transaction could be the main sequence being observed. Those states could include: Preparing, Prepared, Active, and Committed/Aborted. The main sequence could include all of those states are they occur in that order. The windowClosingSelect function could return an observable sequence that only produces a value on the Committed or Abort states. This would close the window that represented transaction events for a particular transaction.

The following simple example breaks up a sequence of integers into consecutive non-overlapping windows. The end of the current window and start of the next window is controlled by an observable sequence of integers produced by the Interval operator every six seconds. Since the main observable sequence is producing an item every seconds, each window will have six items in it. The example code writes each window of integers to the console window along with a timestamp that shows a new window is opened every six seconds.

using System;
using System.Reactive.Linq;

namespace Example
{
  class Program
  {
    static void Main()
    {
      //*********************************************************************************************//
      //*** The mainSequence produces a new long integer from the Interval operator every sec but ***//
      //*** this sequence is broken up by the Window operator into subsets like a windowed        ***//
      //*** view of the sequence. The time when each window stops and the next window starts is   ***//
      //*** controlled by the IObservable<TWindowClosing> named seqWindowControl. It is returned  ***//
      //*** by the lambda expression which is passed to the Window operator. In this case it      ***//
      //**  returns another IObservable<long> generated by the Interval operator. So whenever     ***//
      //*** seqWindowControl produces a item, the current window into the mainSequence stops and  ***//
      //*** a new window starts.                                                                  ***//
      //*********************************************************************************************//

      var mainSequence = Observable.Interval(TimeSpan.FromSeconds(1));

      var seqWindowed = mainSequence.Window(() => 
      {
        var seqWindowControl = Observable.Interval(TimeSpan.FromSeconds(6));
        return seqWindowControl;
      });


      //*********************************************************************************************//
      //*** A subscription to seqWindowed will provide a new IObservable<long> every 6 secs.      ***//
      //***                                                                                       ***//
      //*** Create a subscription to each window into the main sequence and list the values along ***//
      //*** with the time the window was opened and the previous window was closed.               ***//
      //*********************************************************************************************//
      
      seqWindowed.Subscribe(seqWindow => 
      {
        Console.WriteLine("\nA new window into the main sequence has opened: {0}\n",DateTime.Now.ToString());
        seqWindow.Subscribe(x =>
        {
          Console.WriteLine("Integer : {0}", x);
        });
      });

      Console.ReadLine();
    }
  }
}

The following output was generated by the example code.

A new window into the main sequence has opened: 6/1/2011 8:48:43 PM

Integer : 0
Integer : 1
Integer : 2
Integer : 3
Integer : 4
Integer : 5

A new window into the main sequence has opened: 6/1/2011 8:48:49 PM

Integer : 6
Integer : 7
Integer : 8
Integer : 9
Integer : 10
Integer : 11

A new window into the main sequence has opened: 6/1/2011 8:48:55 PM

Integer : 12
Integer : 13
Integer : 14
Integer : 15
Integer : 16
Integer : 17

A new window into the main sequence has opened: 6/1/2011 8:49:02 PM

Integer : 18
Integer : 19
Integer : 20
Integer : 21
Integer : 22
Integer : 23
Show:
© 2014 Microsoft