Building the MSDN Aggregation System
Have you visited msdn2.microsoft.com
? It’s the new online face of the MSDN® Developer Tools and Enterprise Server documentation. The infrastructure behind it includes a system developed by my team at Microsoft for aggregating information related to our content. By the time you read this, we expect that all of msdn.microsoft.com will have migrated to our new system and the msdn2 name will have been retired.
When my team set out to build our content aggregation system, we worked toward several high-level goals. The following explains what we wanted to accomplish:
- Keep data providers and consumers loosely coupled using a publish/subscribe model and asynchronous messaging interfaces.
- Let individual usage scenarios define and manage the structure and meaning of their data. We also wanted to keep those definitions opaque to the system and to unrelated scenarios.
- Make it easy to plug new data providers and consumers into the system regardless of how complex their own internal logic may be.
These constraints were necessary because the universe of content and data we wanted to associate with MSDN content is widely varied and we couldn’t enforce structure or taxonomy upon it. For instance, we wanted to be able to recommend blog posts that refer to our documentation and allow users to tag documents to build up a "folksonomy" that describes MSDN content.
Building the System
After agreeing on our design goals we began looking for technologies to support them. It turned out that SQL Server™ Service Broker offered the asynchronous messaging support we needed and, since the message-queuing infrastructure is tightly integrated with the SQL Server database engine, our existing database backup, administration, and failover procedures could cover our messaging solution as well.
Fortunately, SQL Server Service Broker also supports significant scale out. For example, service endpoints (the senders and receivers of messages) can be remote or local and the database code does not need to know where endpoints are. This let us start out with all of our services on a single server, and then move them to remote servers as loads increased.
The common language runtime (CLR) integration in SQL Server 2005 allowed us to build a plug-in model for data providers and consumers and to implement complex business logic in the database layer. If you’ve written extended stored procedures to do things like pull data from a SOAP endpoint, you’ll appreciate how much more straightforward it is to implement this functionality using SQL Server CLR integration.
Our system’s high-level architecture is shown in Figure 1.
Figure 1 System Architecture Overview
The public interfaces to our system are message-based, and since the system is built on top of Service Broker, we were able to provide all the benefits of reliable asynchronous messaging without a lot of additional work. We expose two Service Broker services (which are the logical endpoints for Service Broker messaging) as data input points for the system.
Briefly, services are communication endpoints that use queues as their backing stores. Services specify what types of messages they can process by bundling them into contracts. The glossary on Service Broker Objects explains further. The following are the two services that we expose:
The Job Service receives messages that provide the system with the information it needs to execute a Job. For us, a Job is a unit of work that brings data related to one or more MSDN documents into the system. This is how we support the pull model of data acquisition. A job message contains the information a JobWorker needs to operate, which will differ for various JobWorkers. The data retrieved by the JobWorker is then submitted back into the system via a second Service Broker service called the DataSubmission service.
The DataSubmission Service receives messages containing the data to be associated with MSDN content. DataSubmission messages can be submitted either by JobWorkers or by external data providers using a "push" model. In the latter case, data providers need to know the structure of the message envelope required by the DataSubmission service.
Message Structure and Validation
For either service, certain pieces of information used by the system to route messages and identify data sources must be included in all messages. So the message envelope needs to have a structure verifiable by the system, and the message payload needs to be structured according to a contract agreed upon by the provider and consumer while remaining opaque to the system. An example Job Message is shown in Figure 2.
Figure 2 Sample Job Message
<?xml version="1.0" encoding="utf-16"?>
<!-- optional runtime overrides -->
<!-- optional scheduled job parameters -->
<!-- message payload -->
<!-- reams of additional data omitted -->
We use XML for our messages. SQL Server Service Broker provides internal support for XML Schema Definitions (XSDs) to validate messages, so we adopted an XSD-based open content model to provide this multi-layer message validation. Using an open content model lets different consumers of XML data validate parts of an XML document while ignoring parts that aren’t relevant to their domain. Our system can validate the envelope data it needs without knowing the structure of the message payload, while data consumers can use domain-specific XSDs to validate the payload of messages they consume. This enables providers and consumers to agree on a schema-based contract defining their communications without the system (or other consumers) needing knowledge of that contract. The contract defining the structure of Job messages is described by the XML schema in Figure 3.
Figure 3 Job.xsd
<?xml version="1.0" encoding="utf-16"?>
<xs:element name="providertype" type="xs:string"
<xs:element name="initialization" type="xs:string"
minOccurs="0" maxOccurs="1" />
<xs:element name="nextrun" type="xs:string"
<xs:element name="intervalminutes" type="xs:int"
<xs:element name="lastrun" type="xs:string"
<xs:element name="jobData" minOccurs="0" maxOccurs="1">
<xs:any namespace="##any" minOccurs="0"
maxOccurs="unbounded" processContents="skip" />
<xs:attribute name="submittor" type="xs:string" use="required" />
<xs:attribute name="submitted" type="xs:string" use="required" />
<xs:attribute name="id" type="xs:string" use="optional" />
<xs:attribute name="source" type="xs:string" use="required" />
<xs:attribute name="sourceDomain" type="xs:string" use="optional" />
In the Job.xsd schema we’ve defined an optional jobData element that can contain children from any namespace (xs:any namespace="##any"). An XML document can contain any valid XML in the jobData element and still be validated by this schema. The system uses this schema to enforce the structure of the envelope data it needs while ignoring the message payload. Individual JobWorkers, who share knowledge of the structure of their message payload with Job requesters, can use another XSD schema that the two parties share to validate the message payload. Note that when defining an open content model schema outside of SQL Server 2005, the common pattern is to define the xs:any element with a processContents="lax" attribute which indicates that the parser should make a best-effort attempt at validation, but not fail if unable to validate. SQL Server 2005 does not support "lax" schema validation, so we specify processContents="skip" instead. Once the schema has been defined we make it available to Service Broker and associate it with message types, as shown in Figure 4.
Figure 4 Defining Message Type with Schema Validation
-- define schema collection
CREATE XML SCHEMA COLLECTION JobRequestMessageSchema AS
<!-- schema definition as shown in Figure 3 -->
-- define message type and specify validation using our schema
CREATE MESSAGE TYPE [urn:msdn/aggregator/JobRequestMessage] VALIDATION = VALID_XML WITH SCHEMA COLLECTION JobRequestMessageSchema;
By creating the message type using WITH SCHEMA COLLECTION validation, we’re telling Service Broker to validate all messages of the specified type against our XML schema. If a message is received that does not conform to the schema, Service Broker will discard the message and return an error message to the Service that sent the message. This allows downstream code to consume messages without needing to validate them again, as Service Broker guarantees that all messages of this type on the queue conform to our schema.
In our job schema, root attribute values contain the identification information we carry throughout the system to uniquely identify data items. We use the combination of source, sourceDomain, and id values as our identifiers to mitigate namespace collisions across multiple providers. This protects against cases in which two different providers use the same identifier—an important safeguard since we can’t control how providers generate IDs. Envelope elements are generally used by scheduled jobs to pass and maintain information about when they should run, last-run status, and the like.
Job Worker Infrastructure
The MSDN Aggregation system supports a pull model of data acquisition through our JobWorker framework. A JobWorker is responsible for retrieving data related to MSDN content and adding it to the system via the data-submission messaging infrastructure. The design of the framework is intended to make it relatively easy to add new data providers to the system to support the rapid implementation of new data input scenarios. We needed to address both ease of implementation and ease of deployment and configuration in our design.
CLR integration enabled us to implement the JobWorker framework in C# using powerful object-oriented patterns while exposing its functionality to T-SQL and Service Broker code.
Before we look at how JobWorkers process messages, however, we need to get Service Broker set up to deliver those messages.
Service Broker Plumbing
We saw earlier (in Figure 4
) how to define a Service Broker message type and associate an XML schema with it. Let’s take a look at the rest of the Service Broker setup needed to define our Job messaging infrastructure.We’ve already defined our JobRequest message. Now we’ll bundle this message into a Service Broker contract that specifies its directionality:
-- create Job contract
CREATE CONTRACT [urn:msdn/aggregator/JobRequests]
[urn:msdn/aggregator/JobRequestMessage] SENT BY INITIATOR
This contract specifies that JobRequestMessages can be sent by the service delivering messages (the Initiator of the conversation).
Now we create the queues that will store these messages, as shown in Figure 5. We use unidirectional queues in our system, which means that a service receives messages from one queue and replies to another. This simplifies processing, as service activation code doesn’t need to filter out messages it doesn’t care about. Defining separate queues also enables scale-out, as it allows you to host the queues either locally or remotely without needing to rewrite your application code. Service Broker doesn’t restrict you from either sending or receiving on the same queue, however.
Figure 5 Creating the Queues
-- JobPosting queue: receives JobRequests
CREATE QUEUE [urn:msdn/aggregator/JobPostingQueue]
WITH STATUS = ON, RETENTION = OFF, ACTIVATION
PROCEDURE_NAME = [ProcessJobRequests], -- activation sproc
MAX_QUEUE_READERS = 1,
EXECUTE AS OWNER
-- JobResponse queue: for response messages
CREATE QUEUE [dbo].[urn:msdn/aggregator/JobResponseQueue]
WITH STATUS = ON , RETENTION = OFF, ACTIVATION
PROCEDURE_NAME = [ProcessJobRequestResponses], -- activation sproc
MAX_QUEUE_READERS = 1,
EXECUTE AS OWNER
Now that we have our contract and queues in place, we can define the services that use them:
-- Service for handling (receiving) JobRequest messages
CREATE SERVICE [urn:msdn/aggregator/JobService]
ON QUEUE [urn:msdn/aggregator/JobPostingQueue]
-- Service for posting JobRequest messages (used by clients)
CREATE SERVICE [urn:msdn/aggregator/JobPostingService]
ON QUEUE [urn:msdn/aggregator/JobPostingQueue];
The JobPostingService isn’t bound to a contract, as it only initiates Dialog Conversations. Only the receiving end of a conversation (the target) needs to be bound to a contract when defining the service, but the contract does need to be shared by the services on both ends of a conversation. The service on the sending end of a conversation uses the contract at run time when sending messages to the target. Here’s an example:
-- Begin dialog with JobService. Sender needs to know about
-- shared contract when sending.
DECLARE @dialog_handle UNIQUEIDENTIFIER;
BEGIN DIALOG CONVERSATION @dialog_handle
FROM SERVICE [urn:msdn/aggregator/JobPostingService]
TO SERVICE ‘urn:msdn/aggregator/JobService’
ON CONTRACT [urn:msdn/aggregator/Jobs] ;
-- Send message on dialog conversation
SEND ON CONVERSATION @dialog_handle
MESSAGE TYPE [urn:msdn/aggregator/JobRequestMessage]
So now we’ve got a pair of endpoints that can send and receive messages (the services). We’ve defined the messages we can understand and bound them to the services using a contract, and we’ve defined the backing store used to manage messages. That puts the Service Broker plumbing for our Job processing in place. Now let’s look at how we actually process the messages that come through this plumbing.
With the Service Broker messaging infrastructure set up, we defined a set of CLR interfaces (shown in Figure 6), which represent the contract JobWorkers must implement to participate.
Figure 6 JobWorker Interfaces
JobWorkers implement the IJobWorker interface, which defines a single Execute method and several properties for maintaining Job state data. Execute takes an IJobContext parameter, which is an interface defining the Submit method used to publish data back into the system. We provide a default implementation of IJobContext that implements basic submission functionality, and JobWorkers are free to provide their own implementation if the default doesn’t meet their needs. By exposing IJobContext as a parameter to IJobWorker.Execute, we’re not only enforcing a behavioral contract but also guaranteeing that submission functionality is always available to implementers.
Figure 7 shows an example of a simple implementation of IJobWorker.Execute, which uses XSLT to transform input XML data into the structure required by the DataSubmission service. In this case the message contains the data itself and the JobWorker only needs to transform it before submitting. In a more common scenario, where the JobWorker needs to acquire data from outside the system, the message parameter would contain information the JobWorker needs to retrieve the data (such as the URI of an RSS feed). In any case, the pattern is that the message parameter is the raw Job message received on the JobPosting queue. Our use of XSDs defining an Open Content Model means that the message data can be validated by JobWorkers as long as they and the submitter of the Job message have shared an XSD defining the message payload.
Figure 7 Simple Execute Implementation
// Transform input data into DataSubmission structure using embedded
// resource XSLT stylesheet
public void Execute(IJobContext host, SqlXml message)
XPathDocument doc = new XPathDocument(message.CreateReader());
string xsltOut = string.Empty;
using (MemoryStream buf = new MemoryStream())
// load XSLT from resource
XslCompiledTransform xslt = GetFixupTransform();
xslt.Transform(doc, null, buf);
buf.Position = 0L;
xsltOut = new StreamReader(buf).ReadToEnd();
// package data in Submission envelope
XmlDocument outMsg = Utility.EnvelopSubmissionMessage(xsltOut, host);
// Submit data message via IJobContext callback
Once a JobWorker has been coded and compiled, it needs to be cataloged to SQL Server so that it is available to database application code. Cataloging an assembly stores the assembly’s Microsoft intermediate language (MSIL) code in a SQL Server table as varbinary data, and this is what the SQL Server CLR host uses when loading our assembly. An example of the syntax for cataloging an assembly is shown here:
-- catalog assembly from file location
CREATE ASSEMBLY [VeryUsefulJobWorker]
WITH PERMISSION_SET = EXTERNAL_ACCESS;
For a more detailed description of this statement and its syntax, see msdn2.microsoft.com/library/ms189524.aspx
When you execute this statement, the SQL Server CLR host loads the assembly specified in the FROM clause, checks that the assembly contains valid MSIL, and loads any dependent assemblies referenced by the assembly being cataloged. If the PERMISSION_SET specified is SAFE or EXTERNAL_ACCESS, additional type safety verification is performed, and classes in the assemblies are checked for constructs such as writable static members and finalizer methods (neither of which are allowed in assemblies using these permission sets).
After an assembly has been cataloged, procedures, functions, and the like, which are to be exposed directly to database application code need to be cataloged and the appropriate CREATE statements, such as CREATE PROCEDURE, must be used. Since our JobWorkers don’t expose functionality to the database application code, we skip this step. If you’re wondering how useful JobWorkers really are if we don’t expose their functionality to database code, you’ll soon see.
Now that we’ve got a handle on the interface contracts supporting the JobWorker infrastructure, let’s take a look at how we determine which JobWorker should handle a given Job message.
We saw earlier that Service Broker messages contain type information that is used to identify messages and can be used to differentiate messages from various sources or containing different information. In our system, Job messages can be sent by different services, but they’re all received on the same message queue. The MSDN Aggregation System uses Message Type data to determine which JobWorker implementation should handle a given Job message. The relevant T-SQL code for receiving and processing Job messages is shown in Figure 8.
Figure 8 Receiving Job Messages
DECLARE @conversationHandle AS UNIQUEIDENTIFIER;
DECLARE @messageTypeName AS SYSNAME;
DECLARE @messageBody AS XML;
-- always RECEIVE in a transaction and COMMIT on success,
-- or ROLLBACK if unable to process the message
BEGIN TRANSACTION ;
-- get next message and its type from queue
RECEIVE TOP (1)
@conversationHandle = conversation_handle,
@messageTypeName = message_type_name,
@messageBody = CAST (message_body AS XML)
. . .
-- Process job message
EXECUTE [ProcessJobRequest] @messageBody, @messageTypeName;
END CONVERSATION @conversationHandle;
-- COMMIT when message successfully processed
COMMIT TRANSACTION ;
We receive one message using the RECEIVE TOP (1) statement, and give it a column specifier indicating which columns should be included in the resultset that is returned. Service Broker is a reliable transactional messaging platform, and to be sure your application code extends the reliability guarantees Service Broker makes you should always enclose your RECEIVE and message processing logic in a SQL transaction. If you are unable to process a message, rolling back the transaction will put it back on the queue where it will be processed again if you have activation enabled. It is important to exercise caution about rolling back transactions, however, as once a message has failed to be processed and the processing transaction rolled back five times Service Broker will consider it a "poison message" and disable the queue. Rolling back the transaction is appropriate for transient failures, but if your application logic has no hope of ever successfully processing a message, it’s better to use another means to inform the sender. Our system commits the transaction as soon as it has received the message, and if we fail to process the message, sends an error message to the conversation initiator using END CONVERSATION WITH ERROR.
Our message-processing logic is in an activated stored procedure, which means it is called by the Service Broker infrastructure whenever a message is available on the queue. This lets us process messages one at a time and know that whenever it’s called there will be a message to process. An alternate model is to use WAITFOR (RECEIVE ...) TIMEOUT to periodically check for messages and wait if none are ready on the queue. There are some startup costs associated with activation of a stored procedure, so if your application receives multiple messages within a 2- to 3-second window, you’ll improve performance using WAITFOR.
Once we have the message and its Type information, we pass that to the ProcessJobRequest stored procedure. This is where SQL CLR integration really shines. It enables us to merge the power of Service Broker’s asynchronous messaging with a plug-in model built using a Factory Method software pattern and the IJobWorker interfaces we looked at earlier.
ProcessJobRequest is implemented as a SQL Server CLR stored procedure, meaning it is managed code hosted by the SQL Server runtime and can be called from T-SQL code as a regular stored procedure. You tell the SQL Server CLR runtime that your method should be exposed as a SQL Server CLR stored procedure by decorating it with a Microsoft.SqlServer.Server.SqlProcedure attribute. The method must be static and return either void or an integer Type. (See msdn2.microsoft.com/ms131094.aspx
for details on coding SQL Server CLR stored procedures.) The following code shows the implementation of this procedure (with much important error handling code omitted).
public static void ProcessJobRequest(
SqlXml jobData, string messageType)
// get JobContext data from message envelope
IJobContext ctxt = Utility.GetJobContextFromRequest(jobData);
// use factory method to instantiate correct Job Worker
IJobWorker processor = JobWorkerFactory.CreateWorker(messageType);
We first build up the job’s context data from the Job message envelope. (We don’t need to validate the structure of the envelope data here; Service Broker has already taken care of that by applying the XSD we specified when we created the message Type.) We then use our factory method to instantiate the correct JobWorker based on the type of message. We maintain a table in the database to map message type names to fully qualified CLR type names and use .NET reflection to instantiate the appropriate type. Adding a new JobWorker type requires cataloging the assembly into SQL Server and adding an entry in the mapping table specifying which assembly should process a given message type.
Once an appropriate JobWorker has been instantiated to process the message, its Execute method is called and it performs whatever work it needs to do.
The choice of actions the JobWorker performs in its Execute method belongs to the JobWorker. There are some restrictions placed upon managed code by the SQL Server runtime, depending on which permission set the assembly containing the code uses. SAFE assemblies can’t access any resources outside of the database. EXTERNAL_ACCESS assemblies can make socket calls and access the file system and other system resources, but they are limited in their ability to use certain CLR functionality like threading libraries, synchronization primitives, and writable static members. UNSAFE assemblies are free to do just about anything they please (including executing unmanaged code and bypassing type safety validation). It is quite possible to shoot yourself in the foot using UNSAFE assemblies, and their use is generally discouraged. See msdn2.microsoft.com/ms189566.aspx
for detailed information about SQL Server CLR Permission Sets. Most of our JobWorkers require access either to network resources such as sockets for making HTTP requests or to local or remote file systems, so they run using the EXTERNAL_ACCESS Permission Set.
Once a JobWorker has performed whatever it needs to retrieve and package its data, it submits the data back to the system by sending a message to the Data Submission service. The IJobContext implementation passed to the JobWorker’s Execute method contains a Submit method that is used for this purpose. This model lets the JobWorker remain unaware of the details of getting its data back into the system. By using the same service for receiving data from push providers and pull providers, we can centralize the distribution point for the publisher/subscriber implementation we use to route messages to data consumers. This makes it easy for data consumers to focus on the type of data they want to get and not on how that data is received.
Data Publishers and Subscribers
One of the core tenets of the MSDN Aggregation System is that information flowing into the system will not be of interest to all the consumers. To support this, our DataSubmission service uses a publisher/subscriber model, which provides a loose coupling between data providers and consumers. Data providers publish data into the system and data consumers "subscribe" to message types corresponding to the data they want to receive. As with Job messages, it is the responsibility of data providers and consumers to agree on the structure and semantics of the data they share via this mechanism. We use the same XSD-based open content model to allow the system and different data consumers to validate slices of message data according to shared schemas. Data consumers subscribe to message types, and we implemented a simple routing service that receives messages from the DataSubmission queue, checks their type, and forwards them to each consumer registered for that message type. Registration is managed using a database table that maps message types to subscribers. In our current release, subscribing to a message type requires manual configuration, but we’re investigating a more message-based subscription model that will support ad hoc subscriptions for a future release.
As with messages on the Job queue, Service Broker validates the message envelope using an open content model XSD, while the message payload is opaque to the system. It is the responsibility of data publishers and subscribers to agree upon the structure and meaning of the data they share, and manage the sharing of domain-specific XML schemas if they want to validate message payloads. The implementation logic of the pub/sub router service is similar enough to that of the JobWorker Factory Service that we can omit code examples in the interest of space.
Once subscribers receive data from the pub/sub router, how they process, structure, and store that data is entirely up to them. The Aggregation System doesn’t provide or prescribe this functionality as a matter of design. Just as we can’t predict all the different types of data coming into the system, we can’t predict all the ways in which consumers might want to manage that data. By leaving it open and providing an infrastructure that supports a wide variety of data producers and consumers, our hope is that we’ll see partners coming up with interesting scenarios we haven’t thought of.
Associating Metadata with MSDN Content
For data consumers that are tightly coupled with the MSDN Publishing System, the data they acquire through the Aggregation System is associated with MSDN content using SQL Server replication. The Publishing System provides a mechanism for associating arbitrary chunks of data with a given Content Item, and we piggyback on that functionality. Data consumers that use this mechanism format their data according to an agreed upon XML schema and insert it into a table for replication. The Publishing System then picks up the data and stores it separately from the Content Item itself, while making it easy for the Rendering Framework, which is the front edge of msdn2.microsoft.com
, to retrieve it.
Other data consumers, which don’t have a tight coupling with the MSDN Publishing System, are responsible for providing access to their data according to their needs. The Aggregation System doesn’t in and of itself provide this functionality.
Send your questions and comments to email@example.com.
is a Software Engineer in the MSDN/Technet Product Group at Microsoft. Previously he developed content publishing and distribution systems for Starwave and Walt Disney, and internet timing and scoring systems for the 2002 Summer Olympics. Reach him at blogs.msdn.com/mollman