Built For Speed

Develop Turbocharged Apps For Windows Compute Cluster Server

Rich Ciapala

This article is based on a prerelease verison of Windows Compute Cluster Server 2003. All information herein is subject to change.

This article discusses:

  • Windows-based high-performance compute clusters
  • Scheduling and executing serial tasks
  • Using MPI to implement parallel execution
  • Debugging MPI tasks
This article uses the following technologies:
Windows Server 2003, Visual Studio 2005

Code download available at:ClusterComputing.exe(128 KB)

Contents

High-Performance Compute Clusters
Windows Cluster Solution
Serial Tasks
Scheduling Jobs
Executing Tasks
Multiple Serial Tasks
MPI and Parallel Tasks
Debugging MPI Tasks
Compute Cluster Availability
Conclusion

High Performance Computing (HPC) involves the execution of long-running, computation-intensive workloads. Examples of such workloads include equity market analysis, weather prediction, stress analysis, and air flow simulations. Historically these workloads have been handled by expensive supercomputers. But with the increase of desktop computer processing power and the introduction of fast and affordable server interconnects, the clustering of large numbers of PCs has been realized as a viable, cost-effective alternative.

Though HPC applications are currently available for the Windows® platform, Microsoft is releasing a version of Windows, called Windows Compute Cluster Server 2003 (CCS), that provides a common execution and management environment for HPC applications. Visual Studio® 2005 also provides tools to accelerate the development of HPC applications. This article explores the services provided by CCS and the tools provided by Visual Studio 2005 for use in developing HPC applications.

High-Performance Compute Clusters

Like any cluster, a compute cluster aggregates multiple physical nodes and exposes them to clients as a single resource pool. Compute clusters are unique in the workloads they can handle. They execute requests (called jobs) that may complete in minutes, hours, or even days. As a result, jobs are developed to leverage resources across multiple machines in order to decrease their execution times. This requires the cluster to monitor the resource utilization of all of its nodes and to match incoming jobs with available resources across those nodes.

If a job executes across multiple nodes, its individual processes may need to share data frequently. As a result, network bandwidth may often be a job's performance bottleneck. To compensate, compute clusters often connect their nodes via fast, high-bandwidth interconnects such as Gigabit Ethernet (GigE), InfiniBand, or Myrinet. There are a number of different usage scenarios for compute clusters, including multiuser compute clusters, personal compute clusters, and cycle scavenging clusters.

Multiuser compute clusters are often owned and managed by an organization's IT department and are shared across multiple users. Users submit jobs to the cluster for execution. The IT department's support personnel control job priority, node requirements and execution context through policy services provided by the cluster.

Personal compute clusters are assigned to a single user, giving that user complete control and isolation.

Cycle scavenging clusters are comprised of nodes that execute some other primary workload, but participate in the compute cluster when normally idle. Desktop scavenging is one type of cycle scavenging, where the resources from client workstations are used after hours for compute cluster workloads. The SETI@home project is one example.

Windows Cluster Solution

CCS focuses on the submission of jobs to multiuser and personal compute clusters (future versions are expected to support other scenarios such as cycle scavenging). Figure 1 illustrates the physical topology of a typical CCS cluster. CCS provides a command-line interface as well as a client user interface for submitting jobs to the cluster. CCS also provides DCOM-based programming interface for managing jobs.

Figure 1 Compute Cluster Topology

Figure 1** Compute Cluster Topology **

Clients submit work to a compute cluster as a job. A job contains a description of the resources necessary to execute it and a collection of tasks that implement the job. A task can be implemented as a serial task or a parallel task. A serial task executes as a single process, while a parallel task executes as multiple tightly coupled processes or threads.

The head node is the entry point into the cluster and receives all jobs submitted to the compute cluster. The head node hosts the cluster's scheduler service, whose role is to match the resources requested by the submitted jobs to resources available across the cluster (such as processors). Once a job receives its resources, its tasks are dispatched for execution to the compute nodes allocated to the job. The head node can also serve as a compute node.

The compute node joins a compute cluster by finding its head node through a local registry setting or the head node's service connection point registration in Active Directory®. When the compute node contacts the head node, the head node checks whether the compute node is running a CCS-compatible operating system. If it is, the head node and the compute node perform mutual authentication. If the authentication succeeds, the node's addition to the cluster then awaits approval by the cluster administrator.

The administrator can approve the addition of nodes via the Compute Cluster Administrator console. The compute nodes' resource utilization statistics and overall health can also be monitored there. The compute nodes send their statistics to the head node every minute. If the head node misses three updates from a compute node, it considers the compute node down. Both the update period and number of missed updates allowed are configurable. CCS also enables rapid deployment of compute nodes through Remote Installation Services (RIS).

The data server, which is often as simple as a file server, is used to share data across the nodes as jobs execute.

The management client hosts the Compute Cluster Administrator and Compute Cluster Job Submission and Monitoring consoles. Some functionality provided by these consoles includes:

  • Identifying a cluster's head node and its compute nodes
  • Approving, enabling, and disabling compute nodes
  • Creating and deploying server images for compute nodes
  • Creating, importing, and exporting compute cluster job definitions
  • Submitting new jobs as well as querying, altering, and cancelling executing jobs
  • Enabling policies for accepting jobs for execution and charging back for the use of cluster resources
  • Submitting compute cluster software updates as low- or high-priority jobs

The Message Passing Interface (MPI) is a standard programming model for developing explicitly parallel applications that frequently share data across processes. MPI defines a language and platform independent API for efficiently sharing data across multiple processes and other related functionality. Individual implementations of the MPI are provided for a variety of interconnects on a variety of operating systems. MSMPI provides a standard messaging implementation for the Windows platform based on MPICH2, an open source implementation of MPI 2.0 started by the Argonne National Laboratory. MSMPI supports language bindings for Fortran77, Fortran90, C, and C++. CCS does not provide a Microsoft® .NET Framework class library for MPI. However, jobs implemented with serial tasks can be developed in .NET Framework-compliant languages, and jobs that do rely on MPI can use many of the MPI functions through P/Invoke. Future versions of the CCS are expected to provide a .NET Framework interface for MPI.

MSMPI also allows third-party hardware interconnect vendors to plug in their products via the existing WinSock Direct driver framework. This eliminates the need for each vendor to build an entire MPI stack. WinSock Direct has been available since Windows 2000 Service Pack 2. It allows faster network I/O by allowing applications to bypass the kernel networking layers and communicate directly to the interconnect driver.

CCS will only ship with support for x64 processors, which includes the AMD Operton (and compatible) and Intel EM64T (and compatible) chipsets. This means CCS requires the use of x64 hardware and Windows Server™ 2003 SP1 x64, but permits both 32-bit and 64-bit applications. CCS will provide 32-bit and 64-bit version of MSMPI, but the WinSock Direct drivers need to be 64-bit.

Serial Tasks

Let's jump into some code to see how to develop and submit a CCS job. Reaching back in my memory to my high school algebra classes, I vaguely recollect the notion of matrix multiplication. I will use this as the basis for the samples in this article to demonstrate three distinct ways to build a CCS job. First let's take at a quick refresher in matrix multiplication. Figure 2 shows two matrixes, A and B, and the product of the two. Each cell in Matrix C is calculated individually. For example, cell C1,1 is calculated as follows:

C1,1 = (A1,1 x B1,1) + (A1,2 x B2,1) + (A1,3 x B3,1)

In real terms this is calculated as:

72 = (6 x 0) + (5 x 0) + (8 x 9)

Figure 2 Matrix Example

Figure 2** Matrix Example **

The MatrixMultipler sample provided with this article is a Win32® console application that implements matrix multiplication. The sample loads two matrixes from individual files, multiplies them together, and saves the results to a third file. The following command line runs the sample:

MatrixMultipler matrix1.mtx matrix2.mtx matrix3.mtx

The heart of the sample is the MultiplyMatrixes function, which multiplies the matrixes as follows:

for(int i=0;i<nMatrixDim;i++) { for(int j=0;j<nMatrixDim;j++) { int* pnOutputCell = GetCellPtr(j, i); for(int k=0;k<nMatrixDim;k++) { *pnOutputCell = *pnOutputCell + (inputMatrix1.GetCellVal(k,i) * inputMatrix2.GetCellVal(j,k)); } } }

The MatrixUtil utility, also available in the code download that accompanies this article, creates the input matrix files. These command lines create the input matrixes of random numbers with dimensions of 5,000 by 5,000:

MatrixUtil matrix1.mtx 1 5000 MatrixUtil matrix2.mtx 1 5000

Though MatrixMultipler is implemented as a Win32 console app, it can still be transformed into a CCS job and submitted to a CCS compute cluster. A CCS job is defined by its tasks, the terms in which to schedule the job, and the terms in which to execute its tasks. CCS jobs can be defined programmatically or as strongly typed XML. Figure 3 shows the job definition for the MatrixMultiplierSerial job using XML. Information about the XML schema for job definitions is available with the beta of CCS.

Figure 3 XML Job Definition

<?xml version="1.0" encoding="utf-8" ?> <Job xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="https://www.w3.org/2001/XMLSchema" SoftwareLicense="" MaximumNumberOfProcessors="1" MinimumNumberOfProcessors="1" Runtime="00:00:50" IsExclusive="false" Priority="Normal" Name="MatrixMultiplierSerial" Project="MSDN Samples" RunUntilCanceled="false"> <Tasks xmlns="https://www.microsoft.com/ComputeCluster/"> <Task MaximumNumberOfProcessors="1" MinimumNumberOfProcessors="1" WorkDirectory="\\DataServer\Public" Stdout="MatrixMultiplier.out" Name="MatrixMultliplier" CommandLine= "MatrixMultiplier matrix1.mtx matrix2.mtx matrix3.mtx" IsCheckpointable="false" IsExclusive="false" IsRerunnable="false" Runtime="00:00:50"> <EnvironmentVariables /> </Task> </Tasks> </Job>

Now that the job has been defined, it can be submitted to the cluster. CCS provides a script called job.vbs that provides a command-line interface for submitting jobs to the CCS head node. This command line will submit the MatrixMultiplierSerial job:

job.vbs submit /f:MatrixMultiplierSerial.xml

If job.vbs succeeds in submitting the job, it will output the job's ID assigned by the compute cluster. The job ID can then be used to query, alter, and cancel the job.

Job created, ID: 55

Scheduling Jobs

Job scheduling entails the matching of jobs with available resources on the cluster while ensuring those resources are used within the context of the submitting user. The Compute Cluster Scheduler Service runs on the cluster's head node. It receives all incoming jobs and begins scheduling them. When a client first submits a job, the user is prompted for credentials to execute the job with. After providing the credentials, the user has the option of storing the credentials locally in an encrypted form. The job is then passed over an encrypted channel to the job scheduler with the specified credentials. When the scheduler receives the job, it encrypts the credentials by using the Windows Data Protection API (DPAPI) and stores the job and the encrypted credentials in a local Microsoft SQL Server™ Desktop Engine (MSDE) database. If the user asked that the credentials also be stored locally, the encrypted credentials are included in the response to the job submission and stored on the client. Subsequent job submissions simply pass the encrypted form of the credentials back to the cluster. Optionally, a Job Submission Filter can be written to ensure that the job includes all the required terms and excludes all disallowed terms as defined by the cluster's administrator. Common examples of policies include restricting which users can submit jobs, enforcing user quotas, and implementing charge backs for using a shared compute cluster.

At this point the job is pending within a logical queue in the MSDE database. At a specified interval, the scheduler starts a scheduling session to process jobs that are currently in the job queue. Job policies are implemented by Job Activation Filters and allow administrators to define rules for jobs submission. If a job is not filtered out by a policy, it will remain in the queue to be scheduled.

The job queue is divided into five sections for low, below normal, normal, above normal, and high priority jobs. Jobs within each section are processed in first-come, first-served order. When a job is first enqueued it is added as the last job in the section that corresponds to its priority. Administrators can change the priority of any message already in the queue, causing the message to be moved to the end of another section. The scheduler will start matching resources to the oldest job in the highest-priority section of the queue.

The most common compute cluster resource to be allocated is a processor, and allocating processors is tied to allocating nodes. By default, the compute nodes are allocated in order based on their amount of physical memory followed by the speed of their processor or processors. If a job explicitly specifies that its nodes are to be allocated through the AskedNodes term, the scheduler instead allocates the nodes specified in the term in the order specified there. If the IsExclusive job term is set to true, even if a job is allocated only some of the processors on a node, it is exclusively allocated the entire node.

CCS does not support the preemption of executing jobs in order to execute higher-priority jobs. In other words, if a higher-priority job is enqueued, it will execute before lower-priority jobs pending in the queue, but it will need to wait for any executing jobs (regardless of their priority) to complete if the executing jobs have resources it needs. Preemption of executing jobs will be implemented in a future version of the scheduler service. Jobs whose resource requirements could not be completely met remain in the queue as pending jobs. These jobs are still allocated those resources that are available and become candidates for backfill.

Backfill allows jobs—usually shorter jobs—that are later in the queue to potentially execute while larger jobs earlier in the queue are waiting for all of their necessary resources. Let's walk through a scenario to see how it works.

A hypothetical compute cluster has two nodes, A and B, both of which have two processors (see Figure 4). Three jobs are submitted to the cluster: X, Y, and Z. Job X has a run-time limit of two hours and is executing on node A. Job Y requires four processors, but only the two processors from node B are available. As a result job Y will remain pending in the queue with the two available processors on node B allocated to it. Job Z is after job Y in the queue and requires one processor for thirty minutes.

Figure 4 Compute Cluster

Figure 4** Compute Cluster **

The scheduler identifies a backfill window made up of the two unused processors on node B currently allocated to job Y, which is waiting for the two processors on node A to become available. The scheduler checks to see whether any jobs after Y in the queue can leverage this window without affecting job Y. Job Z could execute on node B without affecting when job Y is scheduled. As a result, the scheduler schedules job Z on node B ahead of job Y.

Backfill is enabled by default and can be disabled. In order to work optimally, backfill relies on jobs to have accurate run-time limit values. A job's run-time limit is specified by the run-time job term. If a job does not specify a run-time term, the scheduler assumes that its run-time limit is infinite. Jobs whose run time is set to infinite are not candidates for backfill since it is impossible for the scheduler to know if the job's execution can fit into any available backfill window.

The scheduler also allows end users and support personnel to query the status of a queued or executing job, cancel a job, and alter some properties of a queued or executing job (such as increasing resource requirements). The following command line uses Job.vbs to query the status of a submitted job:

job.vbs view 55

The results returned would look something like this:

JOB_ID: 55 SubmittedBy: TestComputeCluster\Administrator NAME: MatrixMultiplierSerial STATUS: Running CPUS: 1,1 ALLOCATED_NODES: TestNode1 SUBMIT_TIME : 11/22/2005 9:49:00 AM NUM_TASKS: 1 Queued: 0 Running: 1 Finished: 0 Failed: 0 Cancelled: 0

Executing Tasks

After the job scheduler allocates all of a job's required resources, it dispatches the job's tasks to the allocated nodes. The tasks are dispatched in the same order as they are defined in their job, along with the end user's credentials, over an encrypted channel. If the number of tasks exceeds the number of processors requested and allocated to the job, the scheduler will dispatch n tasks at at time where n is the number of processors allocated to the job. As tasks complete, another n tasks (or the remaining tasks) are dispatched. Backfilling does not occur at the task level.

When the compute node receives a task to execute, it is handled by the Compute Cluster Node Manager service. The Node Manager receives the task's command line, environment variables, submitting user's credentials, and a list of processors to use. The submitting user's credentials are used to create a Windows security token and then immediately erased. A Windows process is then created in suspended mode based on the task's command line and environment variables and is assigned the submitting user's security token. The new process is then associated with a Windows job object, which assigns the processors to be used and monitors that task's resource utilization. Once the process is associated with the job object, its execution resumes.

As the process executes, the local Node Manager monitors the process and communicates its status to the scheduler. If the process terminates abnormally, the Node Manager will communicate the error back to the scheduler and, optionally, invoke a Dr. Watson crash dump. After the task completes, the scheduler is notified of the completion and updates the status of the job in the job queue to Completed. Completed jobs remain in the job queue for a specified period of time to allow users and administrators to query for their status.

Multiple Serial Tasks

The MatrixMultiplierSerial job only leveraged a single processor of the compute cluster and as a result had a fairly long run time. Luckily there are ways to parallelize matrix multiplication. One simple approach is to have each unit of execution receive both input matrixes, but only compute a designated submatrix of the output matrix, as illustrated in Figure 5. The MatrixMultiplier sample supports four more command-line arguments that specify the submatrix of the output matrix to calculate. This allows the job to execute multiple instances of MatrixMultiplier.exe on different processors, each calculating different output submatrixes. This design pattern is known as a Parametric Sweep, whereby the scheduler provides parallelism to the job by launching multiple instances of a serial task.

Figure 5 MatrixMultiplierMultiSerial Parallel Execution

Figure 5** MatrixMultiplierMultiSerial Parallel Execution **

The CCS job definition for MatrixMultiplierMultiSerial is shown in Figure 6. The first difference in comparing this job to the MatrixMultiplierSerial job is that four separate serial tasks have been defined to calculate a different submatrix of the output matrix in parallel. This increases the processor requirement for the job to four—one for each of the serial tasks. The submatrix that each task calculates is specified as command-line arguments along with the input and output files. A fifth task is added to the job that merges the output of the four previous tasks. Since this task requires the output of the four previous tasks, it specifies those tasks as dependencies, causing it not to execute until those tasks complete. The handy MatrixUtil sample is used to merge the matrixes. The sixth and final task cleans up the individual output matrixes once the merged output matrix is created and, as a result, is dependent on the fifth task. The dependencies of these two tasks demonstrate some of the basic task flow capabilities of the scheduler service. After testing the job, an overall run time of 12 minutes is estimated for the job's definition. The tasks that calculate the submatrixes are estimated at 10 minutes each with 1 minute for the merging of the output matrixes and 1 minute for the cleanup task. The following command line submits the MatrixMultiplierMultiSerial job to the compute cluster.

job.vbs submit /f:MatrixMultiplierMultiSerial.xml

Figure 6 MatrixMultiplierMultiSerial Job Definition

<?xml version="1.0" encoding="utf-8" ?> <Job xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="https://www.w3.org/2001/XMLSchema" SoftwareLicense="" MaximumNumberOfProcessors="4" MinimumNumberOfProcessors="4" Runtime="00:00:12" IsExclusive="false" Priority="Normal" Name="MatrixMultiplierMultiSerial" Project="MSDN Samples" RunUntilCanceled="false"> <Tasks xmlns="https://www.microsoft.com/ComputeCluster/"> <Task MaximumNumberOfProcessors="1" MinimumNumberOfProcessors="1" WorkDirectory="\\DataServer\Public" Stdout="MatrixMultiplierMultiSerial.out" Name="Task1" CommandLine="MatrixMultiplier.exe matrix1.mtx matrix2.mtx matrix_out1.mtx 0 0 2500" IsCheckpointable="false" IsExclusive="false" IsRerunnable="false" Runtime="00:00:10"> <EnvironmentVariables /> </Task> <!—- Task2, Task3 and Task4 are defined similarly --> <Task MaximumNumberOfProcessors="1" MinimumNumberOfProcessors="1" Depend="Task1,Task2,Task3,Task4" WorkDirectory="\\DataServer\Public" Stdout="MatrixMutltiplierMultiSerial.out" Name="Task5" CommandLine="MatrixUtil.exe MatrixMerged.mtx 2 matrix_out1.mtx matrix_out2.mtx matrix_out3.mtx matrix_out4.mtx" IsCheckpointable="false" IsExclusive="false" IsRerunnable="false" Runtime="00:00:01"> <EnvironmentVariables /> </Task> <Task MaximumNumberOfProcessors="1" MinimumNumberOfProcessors="1" Depend="Task5" WorkDirectory="\\DataServer\Public" Stdout="MatrixMutltiplierMultiSerial.out" Name="Cleanup" CommandLine="del \\DataServer\Public\matrix_out*.mtx" IsCheckpointable="false" IsExclusive="false" IsRerunnable="false" Runtime="00:00:01"> <EnvironmentVariables /> </Task> </Tasks> </Job>

MPI and Parallel Tasks

The MatrixMultiplierMPI sample is another parallel implementation of matrix multiplication that dissects a job's work in a slightly different manner. Like MatrixMultiplierSerial, multiple instances of MatrixMultiplierMPI.exe execute across multiple nodes and each still calculates a designated submatrix of the output matrix. However, instead of having all the processes read both input matrixes in their entirety from a file share, one process loads the input matrixes and the other processes are fed just the input submatrixes that they need to calculate their output submatrixes. This lowers the amount of memory that each task requires and the amount of remote file I/O.

Figure 7 MatrixMultiplierMPI Parallel Execution

Figure 7** MatrixMultiplierMPI Parallel Execution **

When the job is first run, each process receives its initial two input submatrixes. This is illustrated with the blue and yellow lines in Figure 7. Each process then multiplies its two input submatrixes together and applies the product to its output submatrix as illustrated by the green lines in Figure 7. Next, each process gives its input submatrixes to its neighbors to the top and the left and in turn receives new input submatrixes from its neighbors from below and to the right of it (as illustrated with the black lines in Figure 8). The process multiplies the new input submatrixes together and applies the product to its previous output. The shifting and multiplying of new input submatrixes continue until all the processes are given the input submatrixes that they need.

Figure 8 Communication Between Processes

Figure 8** Communication Between Processes **

Since this algorithm requires processes to communicate more often, MPI (instead of a file share) will be used to communicate between the processes. As mentioned at the beginning of this article, MPI is a programming interface for developing parallel applications that need to frequently share data across processes and nodes. The MPI programming model is tailored for efficient I/O and to take advantage of shared memory and fast interconnect technologies.

To use the MPI library the sample must initialize itself with MPI. This is done by calling the MPI_Init API:

MPI_Init(NULL, NULL);

Each time this job is run, multiple instances of MatrixMultiplierMPI are started across the compute cluster. MPI groups these processes in what is referred to as a collective and provides a data type called a communicator, which is a handle to a collective. MPI also defines a constant called MPI_COMM_WORLD, which is defined as a communicator and is set to the current process's default collective. A process can use MPI_COMM_WORLD to query for more information about its collective. For instance, passing MPI_COMM_WORLD to MPI_Comm_size returns the number of processes currently running within the caller's collective. Similarly, passing MPI_COMM_WORLD to MPI_Comm_rank returns the calling process's rank within its collective. Each process is assigned a rank starting at 0 to serve as its unique identifier and to represent its order within the collective. Each process of MatrixMultiplierMPI uses its rank to select the submatrix of the output matrix it will calculate. The following code shows how a process retrieves the number of processes in its collective and its rank:

MPI_Comm_size(MPI_COMM_WORLD, &g_nProcessCount); MPI_Comm_rank(MPI_COMM_WORLD, &g_nProcessRank);

Logically the process's rank gives it an idea as to where it is positioned linearly relative to the other processes in is collective. However, many parallel algorithms require the units of execution (the processes) to be arranged in a specific topology such as a ring or Cartesian plane. With this parallel matrix multiplication algorithm, the processes need to be logically organized into a Cartesian plane so that input submatrixes can correctly be transferred from one process to the other. Implementing this algorithm with the default linear view of the collective forces the developer to do more work.

MPI provides the ability to create a topological view of the physical processes, which for purposes of this sample is a Cartesian plane. Through this view, each process can be referenced with X and Y coordinates and each process can easily reference its surrounding neighbors:

g_nClusterDim = (int) sqrt((double) g_nProcessCount); int rgnDims[] = {g_nClusterDim, g_nClusterDim}; int rgnPeriods[] = {true, true}; MPI_Cart_create(MPI_COMM_WORLD, 2, rgnDims, rgnPeriods, false, &g_commCartesian);

MPI_Cart_create creates a logical view of a Cartesian plane across the physical processes of the collective. The first parameter takes a communicator to the collective of processes to create the view upon. As discussed earlier, MPI_COMM_WORLD includes all of the processes in its default collective. The next parameter specifies the number of dimensions. Since this sample only calculates two-dimensional matrixes, I only need to organize the application's processes as such. The rgnDims parameter specifies the number of processes in each dimension. MatrixMutlplierMPI will create an even-dimensioned Cartesian plane with however many processes are assigned to the job. (Note that the sample requires the number of processes to be an even square for simplicity.) The rgnPeriods parameter specifies whether each dimension is continuous. This essentially means that if the application requests the next process after the last process in a dimension, the first process in the dimension is returned as opposed to an error. The next parameter specifies whether the ranks of the processes can be rearranged in the new collective, and the final parameter returns a communicator to the new collective.

Besides calculating a submatrix of the output matrix, the root instance of MatrixMultiplierMPI is also responsible for loading the input matrixes, divvying up the initial input matrixes across all the processes in the default collective, and later consolidating the results of the other instances. The other instances, which I will refer to as leaf processes, initially wait for their submatrixes from the root process and later send their completed output submatrixes back to the root process (the process with a rank of 0).

First the root process loads the input matrixes from the input files specified by the command line. The matrixes are loaded into submatrixes designated to each process in the collective. (Note that another limitation of this sample is that the dimensions of the input matrixes must be the same to maintain simplicity.) Before sending each process its initial input submatrixes, the root process broadcasts the dimensions of the submatrixes to each process in the collective. MPI_BCast is used to broadcast data to all the processes in a specified collective in one operation:

MPI_Bcast(&nSubMatrixDim, 1, MPI_INT, g_nProcessRank, MPI_COMM_WORLD);

MPI consistently describes data involved in transmissions with three parameters: a pointer to the transmission buffer, the number of data elements transmitted, and the type of the data elements. In the sample, the first three parameters of MPI_Bcast show the dimensions of the submatrixes being broadcast as a single integer—since the height and width of a submatrix is always the same, only one dimension needs to actually be sent. MPI also consistently describes a specific process with two parameters: the rank of the process within its collective and a communicator to its collective. The final two parameters of MPI_Bcast use this pattern to specify the process from which the broadcast is sent.

A quick digression: MPI defines a set of constants that represent primitive data types and the bindings between these data types and their corresponding data types in C, C++, and Fortran. However, MPI does not define a standard wire format for these types. Therefore the behavior of an MPI process developed in C receiving a message from an MPI process developed in Fortran is undefined by the MPI specifications. With standards such as Web Services Architecture (WSA) in developers' minds, this may seem like a significant issue, but keep in mind that the main use for MPI is to segment a tightly coupled task across nodes to gain use of more resources. In most cases the componentization of tasks will occur vertically, not horizontally, and as a result interoperability between tasks developed on heterogeneous platforms is less important. On the other hand, exposing an interoperable interface to submit, cancel, and query jobs is a common scenario, and CCS exposes a WSA-compliant interface for doing just that.

After sending the submatrixes' dimensions, the root process sends the initial input submatrixes to the leaf processes. MPI_Scatter allows the root to send a different submatrix to each process in a single operation:

MPI_Scatter(pnSubMatrixes, nSubMatrixCells, MPI_INT, inputSubMatrix1.GetBuffer(), nSubMatrixCells, MPI_INT, g_nSubMatrixTag, MPI_COMM_WORLD);

The first parameter is a pointer to a buffer containing all the submatrixes to send to the processes of the collective. The submatrixes are laid out contiguously in the buffer one right after the other. The second and third parameters specify the number of cells in each submatrix and the type of the cells (which are of type integer), respectively. MPI_Scatter is also used to receive the data that was sent via MPI_Scatter to itself as a member of the collective. Since the root process is also responsible for calculating a submatrix of the output matrix, it will receive back from MPI_Scatter the submatrix that is designated to it. The fourth, fifth, and sixth parameters are used to receive the submatrix. The seventh parameter is used to tag the data that is sent so that it is distinguishable by the receivers. The eighth and final parameter specifies the communicator to the collective to send the submatrixes. If the collective has four processes (including the root), the buffer passed in the first parameter must contain four contiguous submatrixes.

Let's take a look at how the leaf processes receive the data sent via BCast and MPI_Scatter. First the leaf process needs to receive the dimensions of the submatrix. MPI_BCast was used to broadcast the dimensions of the submatrixes and is also used to receive them. The function knows that it's being used to receive broadcast data when its fourth parameter takes a rank of a process that is not its own. As a result the buffer passed in the first parameter is used to receive the data instead of providing the data to be sent.

Once the leaf processes know the dimensions of the submatrixes they are about to receive, the buffer size of each submatrix can be calculated and the submatrixes themselves can be received. MPI_Scatter is also used to receive data sent via MPI_Scatter. When used to receive data, the first parameter takes a NULL value and the next two parameters are ignored. The fourth, fifth, and sixth parameters are used to receive the submatrix in the same way the sending process receives back its submatrix. The seventh parameter specifies the same tag the sender used when sending the data and the final parameter is again the communicator to the collective containing the sending process.

Now that all the processes have their initial submatrixes, each begins to calculate the submatrixes that of the output matrix designated to them. As explained, the calculation is done by multiplying the two input submatrixes that the processes currently have and saving the results in the output submatrix. Each process then shifts the input submatrixes, sending one input submatrix to the node above it and the other input submatrix to the node to its left. The process then receives the submatrixes being shifted to it from the nodes below it and to the right. After the process receives the next set of submatrixes, it multiplies them together and adds the results to the results from the previous multiplication. The processes continue shifting their current input submatrixes and calculating the newly acquired input submatrixes until all the necessary submatrixes are multiplied:

do { outputSubMatrix.Multiply(inputSubMatrix1, inputSubMatrix2); nSubMatrixesReceived++; } while(ShiftSubMatrixes(inputSubMatrix1, inputSubMatrix2, nSubMatrixesReceived));

The ShiftSubMatrixes function implements shifting of the submatrixes. Each process sends its input1 submatrix to the process to its left and receives its next input1 submatrix from the process to its right. Similarly each process also sends its input2 submatrix to the process above it and receives its next input2 from the process below it. Doing so requires that each process have the rank of its surrounding processes; this is where the Cartesian plane view created earlier can be leveraged. MPI_Cart_shift will return the ranks of the processes that that would be involved in the specified shift:

MPI_Cart_shift(g_commCartesianView, 0, -1, &nAboveProcessID, &nBelowProcessID);

The first parameter is the communicator that references the Cartesian view. The second parameter specifies the dimension of the plane that is shifting (0 is the vertical plane). The third parameter's numerical value represents how many coordinates should take place in the shift, and its sign represents the direction of the shift. The final two parameters receive the ranks of the processes to send to and the process to receive from, respectively.

To perform the actual shift, the MPI function MPI_Sendrecv_replace is used. In one operation the current input submatrix is sent and the incoming input submatrix is received. The process ranks from MPI_Cart_shift are used to specify the external processes involved in the shift. ShiftQuadrants returns false when the process has received all the submatrixes it needs to calculate its output submatrix.

As each process completes the calculation of its output, its output submatrix is sent to the root process. MPI_Gather allows leaf processes to easily send their output submatrixes to the root process and allows the root process to gather all the submatrixes in a single operation. Like MPI_BCast and MPI_Scatter, MPI_Gather is used on the sending and receiving side of this operation:

MPI_Gather(outputSubMatrix.GetBuffer(), nSubMatrixCells, MPI_INT, NULL, 0, 0, g_nRootProcess, MPI_COMM_WORLD);

The first three parameters to MPI_Gather specify the data to send. The fourth, fifth, and sixth parameters are ignored on the sending side. The last two parameters specify the process that performs the gathering, which is the root process:

MPI_Gather(pnSubMatrix, nSubMatrixCells, MPI_INT, pnGatheredMatrixes, nSubMatrixCells, MPI_INT, 0, MPI_COMM_WORLD);

The root process also uses MPI_Gather to receive the submatrixes. The first three parameters specify the root process's output submatrix, which is simply copied to the correct location in the receive buffer specified next. The fourth, fifth, and sixth parameters specify the buffer that will receive the output submatrixes of each process in the collective. If there are four processes in the collective, the buffer must be large enough to accept four submatrixes.

Finally, MatrixMultiplierMPI saves the consolidated output matrix to disk and then terminates the MPI library via MPI_Finalize. Now let's take a look at the job definition to execute MatrixMultiplierMPI as on a compute cluster (see Figure 9).

Figure 9 MatrixMultiplierMPI Job Definition

<?xml version="1.0" encoding="utf-8" ?> <Job xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="https://www.w3.org/2001/XMLSchema" SoftwareLicense="" MaximumNumberOfProcessors="4" MinimumNumberOfProcessors="4" Runtime="00:00:12" IsExclusive="false" Priority="Normal" Name="MatrixMultiplierMPI" Project="MSDN Samples" RunUntilCanceled="false"> <Tasks xmlns="https://www.microsoft.com/ComputeCluster/"> <Task MaximumNumberOfProcessors="4" MinimumNumberOfProcessors="4" WorkDirectory="\\DataServer\MatrixMultiplier" Stdout="MatrixMultiplierMPI.out" Name="Task1" CommandLine="mpiexec.exe -hosts %CCP_NODES% MatrixMultiplierMPI.exe matrix1.mtx matrix2.mtx matrix_mpi.mtx" IsCheckpointable="false" IsExclusive="false" IsRerunnable="false" Runtime="00:00:12"> <EnvironmentVariables /> </Task> </Tasks> </Job>

The first variation in this job definition is that there is only one task even though multiple processes will be spawned. When executing parallel tasks, the job scheduler leaves it up to the MPI implementation to spawn the multiple processes on nodes and processors that the scheduler allocated to the job. The second version of the MPI standard (MPI2) recommends a standardized startup for all MPI applications so that scripts, schedulers, and other launching programs can expect to start MPI applications in a similar fashion. More specifically, MPI2 recommends that all MPI implementations provide a launcher, mpiexec.exe, that supports a minimum set of parameters. For the purposes of MatrixMultiplierMPI, its command line would look like this:

mpiexec –hosts %CCP_NODES% MatrixMultiplierMPI.exe

The hosts argument is an MPI standard argument. It takes the nodes on which to launch the processes and it also takes the processors on those nodes that the processes should be bound to. The CPP_NODES environment variable includes the nodes and the processors on each node allocated to the job by the scheduler. This binding allows mpiexec.exe to launch the MatrixMultiplierMPI processes across the cluster based on the resources allocated to the job by the scheduler service. By passing the allocated nodes and processors as an environment variable (in a format defined by MPI), third-party MPI implementations can be integrated with the scheduler service. The following command line submits the MatrixMultiplierMPI job to the compute cluster:

job.vbs submit /f:MatrixMultiplierMPI.xml

Debugging MPI Tasks

MPI applications can be challenging to debug due to chatty conversations between multiple processes executing in parallel. To simplify debugging, Visual Studio 2005 allows debugging of MPI applications in the development environment. Figure 10 shows the Debugging section of the MatrixMultiplierMPI project property sheet.

Figure 10 MatrixMultiplierMPI Debugging Properties

Figure 10** MatrixMultiplierMPI Debugging Properties **

Notice that the debugger to launch is set to MPI Cluster Debugger. This presents a list of options that specify how the MPI application is launched from the Visual Studio debugger. The MPIRun Command is set to the MPI utility that launches the project as an MPI application. Here MatrixMultiplierMPI uses mpiexec.exe that ships with CCS as the launcher implemented by MSMPI. The MPIRun Arguments specify the arguments to mpiexec.exe, and in this case specifies that four instances of MPIMultiplierMPI.exe will be launched locally by the debugger. Alternatively, the hosts argument could also be specified, allowing the processes to be launched remotely across multiple nodes and still be debugged as if they were local. Application Command specifies the path to MatrixMultiplierMPI.exe and Application Arguments specifies the paths to the two input matrixes and output matrix. MPIShim- Location specifies where mpishim.exe can be found. If the MPI app is debugged across multiple nodes, mpishim.exe must be at the location specified by MPIShim Location on each node. The MPI network security mode is set to allow connections from any machine in the local subnet to connect to the launched processes. You can be more restrictive by specifying specific IP addresses from which connections will be allowed in the MPI network filter field.

After closing the property sheet, you can start the MPI application. As with debugging most applications from Visual Studio, F5 causes the application to launch with the Visual Studio debugger attached. Depending on the debugger settings that you have specified in the previous property page, the processes that make up the MPI application are launched on the local machine, on remote nodes, or a combination of both. Either way, however, you can set breakpoints, step through code, view the values of variables and leverage all the debugging features found in the Visual C++® debugger.

Figure 11 shows MatrixMultiplierMPI stopped at a breakpoint after the process identifies its rank within its collective. In the Processes windows, you can specify which process to debug and then view the call stack for that process. This is very similar to how you would select a thread in the Threads window and view its current call stack.

Figure 11 Debugging MatrixMultiplierMPI

Figure 11** Debugging MatrixMultiplierMPI **

By default, when the debugger hits a breakpoint in any process, all other processes in the MPI application will also break. This behavior can be changed at a global level across all projects in the global Options property page. This property page can be opened by selecting the Options menu in the Tools top-level menu. When the property page dialog appears, select the Debugging branch and the General leaf in left-hand tree control. In the list view on the right, deselecting "Break all processes when one process breaks" will cause only the process that triggered the breakpoint to actually break while the other processes continue to execute.

Visual Studio 2005 provides other features that are useful when building CCS applications. These include x64 C++ and .NET-based compilers, Open MP C++ extensions, and floating point improvements to the C++ compiler that allow developers to make trade-offs between speed and accuracy.

Compute Cluster Availability

As I mentioned earlier, the scheduler maintains all of its state in a persisted form in an MSDE database on the head node. This allows the scheduler to recover a job's execution as a result of a node failure. If the head node fails, the cluster cannot accept new jobs or schedule queued jobs until the head node is brought back online, but jobs executing on compute nodes continue to execute. If a compute node shuts down when executing a serial job, the scheduler decides whether the job's tasks running on that node can be run on other nodes already allocated for that job. If they can be, then the job's resource requirements will be decreased and the tasks that were supposed to run on the failed nodes will be rescheduled on the other allocated nodes.

If one or more of the tasks that were executing on the failed node cannot be run on the other nodes allocated to the job, all the executing tasks for that job are killed, their resources are deallocated and the entire job is placed back into the scheduler's queue in the pending state. If a compute node shuts down when executing a parallel job, the entire job is placed back in the queue.

CCS does not support multiple head nodes per compute cluster for redundancy or scalability purposes. Only one head node can be configured per cluster. If the head node fails, it must be restarted or rebuilt before new jobs can be submitted. Again, jobs already executing on separate compute nodes will still execute. Also, other versions of SQL Server cannot be used for the scheduler's database in this version of CCS. Only the MSDE instance on the head node is supported. Optionally the head node could provide RAID drives to improve its overall availability. Future versions of CCS will allow multiple head nodes per cluster.

Conclusion

A common HPC platform provides multiple benefits to a variety of parties. HPC application developers can develop to a common scheduler and MPI implementation while leveraging a wide variety of development environments available on Windows. Hardware vendors that develop interconnect technologies can focus on developing WinSock Direct drivers as opposed to an entire MPI stack. System administrators can manage large compute clusters that host a variety of internal and external applications in a consistent manner. Furthermore, administrators can utilize their existing investments in Windows infrastructure to manage user authentication and to set policies for user authorization, user quotas, and charge backs with services such as Group Policy.

Historically the Windows platform has simplified the development, deployment, and maintainability of a long list of application types. Windows Compute Cluster Server now extends that list to include high-performance computing applications. Now that you're ready to dive into CCS, you should visit Windows Compute Cluster Server 2003 for more information and to download the beta software.

Rich Ciapala has worked in Microsoft Consulting Services and Microsoft product groups for the last 15 years. He is currently a member of the Windows Server Customer Advisory Team focusing on the design of mission-critical applications.