Parallel Uploads to Windows Azure Blob Storage via a Silverlight Control
Author: Rahul Rai, Associate Consultant, Microsoft Global Delivery
Last updated: October 4, 2011
Summary
Instead of using a traditional file upload control, which is slow and unreliable, or JQuery, which is single threaded, use a Silverlight and TPL-based solution to upload files faster and with more reliability to Windows Azure blob storage.
Problem
Traditionally, uploading files to Windows Azure blob storage involves one of the following approaches, assuming the keys of your account are not to be made available to the client uploading the file:
-
Web role uploads file to temporary directory and then uses API to upload file to a blob in parallel.
-
Web role uses client side script, generally JQuery, to split a client file and use an intercepting WCF service to upload the file to a blob sequentially.
-
Web role uses client-side script and shared access signature to upload file sequentially to blob storage.
-
Web role uses Silverlight control to upload files using shared access signature on the container.
The traditional approaches listed above do not upload files directly to blob storage. Moreover, they also do not take advantage of the parallel upload capabilities supported by Windows Azure.
Solution
The solution to the problem is two staged:
-
Use a client-side application to manage file operations such as splitting the file in chunks and retrying in case of failure.
-
Use a thread-based model with support of cancellation tokens to take web requests with payload in parallel, send the requests to Azure storage, and cancel an upload operation midway.
Implementation of Solution
The mechanism that the solution would follow is:
-
When a user selects any file to upload, the code generates a shared access signature (SAS) on the container, which acts as a key for the file to be uploaded. A DispatcherTimer is launched as soon as the control loads to keep track of the time when the SAS is going to expire. This dispatcher would cancel all pending upload operations in case an upload is going on or would notify the user of expiry of time limit in case no upload operation is in progress.
-
The algorithm then splits the file into chunks each of size 1 MB and counts the total number of chunks generated.
-
If total number of chunks > 1 then a temporary thread pool is created by the algorithm, containing all the chunks.
-
The thread pool treats each chunk as an individual task associated with a singlethread, and uploads each of them as per the threads availability in the thread pool.
-
A successful file upload notification message gets displayed to the user on successful uploading of the file.
-
If the chunk(s) fail to upload on first attempt, then the uploader will keep trying to upload the chunk(s) repeatedly until it reaches total number of attempts configured by the user in the code. The number of retry attempts for failed chunk(s) can be customized easily in the code.
-
If uploader fails to upload the any chunk(s) after all the retry attempts, then an error message gets displayed to the user notifying about the same.
-
The thread pool treats each chunk as an individual task associated with a singlethread, and uploads each of them as per the threads availability in the thread pool.
-
If there exists only a single chunk (i.e. total number of chunks = 1) then the uploader will simply upload the complete file blob. As above, if the upload fails, the uploader will retry until it reaches the total number of attempts configured by the user in the code. If the uploader fails to upload the block after all retry attempts, then an error message is displayed to notify the user.
The diagram below shows the steps in the process:
Building such a solution is simple with Silverlight. It runs on the client and supports threading as well. Let’s proceed step-by-step to build such a solution:
-
Create a cross domain policy for access to blob storage through the Silverlight application. This involves adding a policy file to the $root container of your storage account.
-
Acquire a shared access signature on the container for a sufficient time in which the file may be uploaded.
-
Pass this signature to the Silverlight application handling file uploads. I passed it through the init param to the Silverlight application.
-
Inside the Silverlight application:
-
Define the DispatcherTimer operation (cancel pending upload operations on shared access signature expiry).
-
Split the file into chunks of 1 MB each.
-
Define the DispatcherTimer operation (cancel pending upload operations on shared access signature expiry).
-
Upload the file using a single PUT request if you get a single packet; otherwise, upload the file chunks as block blobs in parallel using multi-threading. I used Portable TPL, which is an open source abstraction to threading in Silverlight 4. Silverlight 5 would have inbuilt TPL, but the process would remain essentially the same.
-
If any of the threads fail to upload its designated content, retry a finite number of times and fail the entire upload process if it keeps on failing.
-
When you have successfully uploaded the file as block blobs, then issue a PUT request to commit the blocks.
-
Exit the application.
Here are the steps in more detail, with code examples:
-
Creating Silverlight policy to allow cross-domain requests:
private void CreateSilverlightPolicy(CloudBlobClient blobs) { blobs.GetContainerReference("$root").CreateIfNotExist(); blobs.GetContainerReference("$root").SetPermissions( new BlobContainerPermissions() { PublicAccess = BlobContainerPublicAccessType.Blob }); var blob = blobs.GetBlobReference("clientaccesspolicy.xml"); blob.Properties.ContentType = "text/xml"; blob.UploadText(@"<?xml version=""1.0"" encoding=""utf-8""?> <access-policy> <cross-domain-access> <policy> <allow-from http-methods=""*"" http-request-headers=""*""> <domain uri=""*"" /> <domain uri=""http://*"" /> </allow-from> <grant-to> <resource path=""/"" include-subpaths=""true"" /> </grant-to> </policy> </cross-domain-access> </access-policy>"); } -
Generating shared access signature with expiry of 10 mins:
protected void Page_Load(object sender, EventArgs e) { if (!Page.IsPostBack) { var account = CloudStorageAccount.Parse( ConfigurationManager.AppSettings[ConfigurationSectionKey]); var blobs = account.CreateCloudBlobClient(); this.CreateSilverlightPolicy(blobs); var container = blobs.GetContainerReference(ContainerName); container.CreateIfNotExist(); var sas = container.GetSharedAccessSignature(new SharedAccessPolicy() { Permissions = SharedAccessPermissions.Write, SharedAccessExpiryTime = DateTime.UtcNow + TimeSpan.FromMinutes(10) }); this.containerUrl =new UriBuilder(container.Uri) { Query = sas.TrimStart('?') }.Uri.AbsoluteUri; } } -
Passing the parameter to Silverlight application can be done via init Params.
-
Inside the Silverlight application:
-
Defining the DispatcherTimer operation:
public MainPage(string sasUrl,string timeOutSeconds) { this.sasUrl = sasUrl; var sasExpiryTimer =newDispatcherTimer(); sasExpiryTimer.Interval = newTimeSpan(0, 0,Convert.ToInt32(timeOutSeconds)); sasExpiryTimer.Tick += newEventHandler((o, e) => { this.sasExpired =true; if (this.userFile != null) { this.userFile.CancelUpload(); } this.lblMessage.Text = ApplicationResources.SASExpired; this.btnBrowse.IsEnabled = false; this.btnUpload.IsEnabled = false; this.prgUpload.IsIndeterminate = false; this.txtFileName.Text =string.Empty; }); sasExpiryTimer.Start(); this.InitializeComponent(); this.btnBrowse.IsEnabled = true; this.btnUpload.IsEnabled = false; } -
Splitting file into chunks of 1MB each:
public List<DataPacket> TransformStreamToPackets(Stream sourceStream) { int bytesToRead = 0; int serialNumber = 1; byte[] buffer = new byte[this.packetSize]; var dataBlocks = new List<DataPacket>(); while ((bytesToRead = sourceStream.Read(buffer, 0, buffer.Length)) > 0) { var payloadArray = new byte[bytesToRead]; Array.Copy(buffer, payloadArray, bytesToRead); dataBlocks.Add(new DataPacket() { IsTransported = false, Payload = payloadArray, RetryCount = 0, SerialNumber = Convert.ToBase64String(Encoding.UTF8.GetBytes( string.Format(CultureInfo.InvariantCulture, "{0:D4}", serialNumber++))) }); } return dataBlocks; } -
Issuing block blob request as per the number of packets you have:
public void StartUpload(string initParams) { if (this.packets.Count > 1) { int concurrencyLevel = 0; ////TO DO: Modify the code to work on a fixed concurrency level. this.allTasks = new Task[this.packets.Count]; while (new Func<bool>(() => { return packets.Count(packet => packet.IsTransported == false) > 0 ? true : false; })()) { if (concurrencyLevel < this.packets.Count) { var uploadBlock = (from uploadPacket in this.packets where uploadPacket.IsTransported == false select uploadPacket).FirstOrDefault(); uploadBlock.IsTransported = null; this.allTasks[concurrencyLevel] = Task.Factory.StartNew( () => this.UploadFileChunk(uploadBlock, this.file, this.request.GetBlockBlobUri(uploadBlock.SerialNumber)), this.cancellationTokens.Token); concurrencyLevel++; } else { try { ////TO DO: This statement never hits currently. ////This statement halts execution of all threads due to some reason. Task.WaitAll(this.allTasks); concurrencyLevel = 0; } catch (AggregateException ex) { this.NotifyClient(Constants.UploadCompleteReason.ErrorOccurred, ex.Message); } } } } else { Task.Factory.StartNew(() => this.UploadFileChunk( this.packets.FirstOrDefault(), this.file), this.cancellationTokens.Token); } }
Committing a blob by simply sending a PUT request, as shown below. (Alternatively, you could send a block blob request with sequentially generated block id.)
HttpWebRequest webRequest = (HttpWebRequest)WebRequestCreator.ClientHttp.Create( blockUri == null ? this.request.SASUrl : blockUri); webRequest.Method = Constants.RequestType.PUT.ToString(); var webRequestState = new AsyncWebRequestState() { WebRequestState = webRequest, RequestPayload = requestPayload, FileToUpload = fileToUpload }; webRequest.BeginGetRequestStream( new AsyncCallback(this.WriteToStreamCallback), webRequestState); -
You can write into the request stream the data packet which you have to transfer:
private void WriteToStreamCallback(IAsyncResult asynchronousResult) { if (!this.cancellationTokens.IsCancellationRequested) { AsyncWebRequestState requestState = (AsyncWebRequestState)asynchronousResult.AsyncState; HttpWebRequest webRequest = (HttpWebRequest)requestState.WebRequestState; Stream requestStream = webRequest.EndGetRequestStream(asynchronousResult); requestStream.Write(requestState.RequestPayload.Payload, 0, requestState.RequestPayload.Payload.Length); requestStream.Close(); var webRequestState = new AsyncWebRequestState() { WebRequestState = webRequest, RequestPayload = requestState.RequestPayload, FileToUpload = requestState.FileToUpload }; webRequest.BeginGetResponse(newAsyncCallback( this.ReadHttpResponseCallback), webRequestState); } } -
In the end send a list of blocks that you need to commit (in case of block blobs):
private void PutBlockList(AsyncWebRequestState requestState) { if (!this.cancellationTokens.IsCancellationRequested) { var webRequest = (HttpWebRequest)WebRequestCreator.ClientHttp.Create( this.request.PutBlockBlobListUri()); webRequest.Method = Constants.RequestType.PUT.ToString(); webRequest.Headers[MsVersionHeader] = Constants.XMSVersion; requestState.WebRequestState = webRequest; webRequest.BeginGetRequestStream(new AsyncCallback( this.BlockListWriteToStreamCallback), requestState); } } private void BlockListWriteToStreamCallback(IAsyncResult asynchronousResult) { if (!this.cancellationTokens.IsCancellationRequested) { AsyncWebRequestState requestState = (AsyncWebRequestState)asynchronousResult.AsyncState; HttpWebRequest webRequest = (HttpWebRequest)requestState.WebRequestState; Stream requestStream = webRequest.EndGetRequestStream(asynchronousResult); var writer = XmlWriter.Create(requestStream, new XmlWriterSettings() { Encoding = Encoding.UTF8 }); this.request.UncommittedBlockBlobList(this.packets.Count()).Save(writer); writer.Flush(); requestStream.Close(); requestState.WebRequestState = webRequest; webRequest.BeginGetResponse(new AsyncCallback( this.BlockListReadHttpResponseCallback), requestState); } } private void BlockListReadHttpResponseCallback(IAsyncResult asynchronousResult) { if (!this.cancellationTokens.IsCancellationRequested) { AsyncWebRequestState requestState = (AsyncWebRequestState)asynchronousResult.AsyncState; try { HttpWebRequest webRequest = (HttpWebRequest)requestState.WebRequestState; HttpWebResponse webResponse = (HttpWebResponse)webRequest.EndGetResponse(asynchronousResult); StreamReader reader = new StreamReader(webResponse.GetResponseStream()); reader.ReadToEnd(); reader.Close(); } catch (Exception ex) { this.NotifyClient(Constants.UploadCompleteReason.ErrorOccurred, ex.Message); } this.NotifyClient(Constants.UploadCompleteReason.UploadCommitted); } }
-
Defining the DispatcherTimer operation:
You can download the source code with a sample website all ready and functional from the following location: http://code.msdn.microsoft.com/Silverlight-Azure-Blob-3b773e26.
FAQ
Q1. Would it be possible to handle a case where uploading took longer than time interval specified in shared access signature on container?
Ans. Such a case is not supported. The upload control knows only a single detail about your storage account and that is your shared access signature. This keeps your credentials safe even if someone tries to disassemble this control (which is easy, given that Silverlight control gets downloaded on the client machine). The control should be exposed to the user for upload only when the upload task is pending and SAS should be generated for a time just long enough to cover the time you expect will be required for the slowest upload to take place. Another way of removing the time interval constraint I can think of is intercepting the request sent through a WCF proxy, but this would make uploads slower.
Q2. Would it be possible to specify what kind of changes this solution would undergo when ported to SL5 with TPL built-in?
Ans. Silverlight 5 would remove dependency on PortableTPL library. The only code change required would be to uninstall the PortableTPL nuget and add a reference to System.Threading.Tasks. For complete control, I expect TaskScheduler with all functions to arrive, which would give us control on limiting the concurrency that we desire. However, when I observed the uploads taking place through Fiddler, no more than 6-8 threads were spawned in parallel for upload, therefore the thread pool is itself managing the concurrency efficiently and it just might be an additional feature that we may get to integrate when TaskScheduler with complete functionality becomes available. An instance of upload through Fiddler is inline:
Q3. What are the considerations and/or possible changes in this solution for uploading large files? The code appears to be keeping in memory all chunks resulting from splitting original file before uploading to blob storage?
Ans. I have tested this application for uploads up to 200Mb. For larger uploads the solution may be modified to read first 200 Mb., upload the blocks in parallel, while the FileStream object Seeks another chunk for upload. This would be a change in the functionality of splitting function and it is not currently present in the solution. We need to keep large chunks in memory to speed up the upload process since seeking a position in file, reading bits and uploading chunks is a slower process than reading blocks from memory and uploading chunks.
Q4. Would there be any race condition while selecting and committing upload packets?
Ans. There would be no race condition as this code block is run on a single thread and the thread assigns the upload chunk to a new Task. The IsTransported identifier identifies the file chunks available for upload.
The process is:
The state works as:
StartUpload() picks only that element whose IsTransported value is set to false. It assigns this block to a thread after setting this value to null. ReadHttpResponseCallback() now has a block with IsTransported set to null, which it would set to true only on successful upload. Now when the loop inside StartUpload() runs again, it won't pick the same block, since IsTransported is either null, if it has been assigned to a thread, or true, if it has been uploaded.
This was the reason to make the IsTransported variable tristate. Each thread's ReadHttpResponseCallback has just a single upload block assigned to itself, so it can't also change the value of some other element.
Q5. What is the future scope of the control?
Ans. The following nice to have features may be added to the control:
-
Support for MD5 checksum, to know whether your upload was successful.
-
Support for parallelly uploading multiple files.
-
Support for evaluating network bandwidth from an ongoing upload and cancelling upload in case it is expected that upload may take more time than the time for which the SAS is generated.
References