Exportera (0) Skriv ut
Visa allt
EN
Det här innehållet finns inte tillgängligt på ditt språk men här finns den engelska versionen,

Handling Media Services Job Notifications

Updated: July 8, 2014

Microsoft Azure Media Services has the ability to deliver notification messages to the Azure Queue storage when processing media jobs. This topic shows how to get these notification messages from Queue storage.

Messages delivered to Queue storage can be accessed from anywhere in the world. The Azure Queue messaging architecture is reliable and highly scalable. Polling Queue storage is recommended over using other methods.

One common scenario for listening to Media Services notifications is if you are developing a content management system that needs to perform some additional task after an encoding job completes (for example, trigger the next step in a workflow, or publish content).

Considerations

Consider the following when developing Media Services applications that use Azure storage queue.

Code Example

The code example in this section does the following:

  1. Defines the EncodingJobMessage class that maps to the notification message format. The code deserializes messages received from the queue into objects of the EncodingJobMessage type.

  2. Loads the Media Services and Storage account information from the app.config file. Uses this information to create the CloudMediaContext and CloudQueue objects.

  3. Creates the queue that will be receiving notification messages about the encoding job.

  4. Creates the notification end point that is mapped to the queue.

  5. Attaches the notification end point to the job and submits the encoding job. You can have multiple notification end points attached to a job.

    In this example we are only interested in final states of the job processing, so we pass NotificationJobState.FinalStatesOnly to the AddNew method.

    job.JobNotificationSubscriptions.AddNew(NotificationJobState.FinalStatesOnly, _notificationEndPoint);
    
    
    If you pass NotificationJobState.All you should expect to get all state change notifications: Queued -> Scheduled -> Processing -> Finished. However, as noted earlier, the Azure Storage Queues service does not guarantee ordered delivery. You can use the Timestamp property (defined on the EncodingJobMessage type in the example below) to order messages. It is possible that you will get duplicate notification messages. Use the ETag property (defined on the EncodingJobMessage type) to check for duplicates. Note that it is also possible that some state change notifications will be skipped.

  6. Waits for the job to get to the Finished state by checking the queue every 10 seconds. Deletes messages after they have been processed.

  7. Deletes the queue and the notification end point.

noteNote
The recommended way to monitor a job’s state is by listening to notification messages, as shown in the following example.

Alternatively, you could check on a job’s state by using the IJob.State property.  Note that a notification message about a job’s completion may arrive before the State on IJob is set to Finished. The IJob.State  property will reflect the accurate state with a slight delay.  

using System;
using System.Linq;
using System.Configuration;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using Microsoft.WindowsAzure.MediaServices.Client;
using System.Web;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Auth;
using Microsoft.WindowsAzure.Storage.Queue;
using System.Runtime.Serialization.Json;

namespace JobNotification
{
    public class EncodingJobMessage
    {
        // MessageVersion is used for version control. 
        public String MessageVersion { get; set; }
    
        // Type of the event. Valid values are 
        // JobStateChange and NotificationEndpointRegistration.
        public String EventType { get; set; }
    
        // ETag is used to help the customer detect if 
        // the message is a duplicate of another message previously sent.
        public String ETag { get; set; }
    
        // Time of occurrence of the event.
        public String TimeStamp { get; set; }

        // Collection of values specific to the event.

        // For the JobStateChange event the values are:
        //     JobId - Id of the Job that triggered the notification.
        //     NewState- The new state of the Job. Valid values are:
        //          Scheduled, Processing, Canceling, Cancelled, Error, Finished
        //     OldState- The old state of the Job. Valid values are:
        //          Scheduled, Processing, Canceling, Cancelled, Error, Finished

        // For the NotificationEndpointRegistration event the values are:
        //     NotificationEndpointId- Id of the NotificationEndpoint 
        //          that triggered the notification.
        //     State- The state of the Endpoint. 
        //          Valid values are: Registered and Unregistered.

        public IDictionary<string, object> Properties { get; set; }
    }

    class Program
    {
        private static CloudMediaContext _context = null;
        private static CloudQueue _queue = null;
        private static INotificationEndPoint _notificationEndPoint = null;

        private static readonly string _singleInputMp4Path =
            Path.GetFullPath(@"C:\supportFiles\multifile\BigBuckBunny.mp4");

        static void Main(string[] args)
        {
            // Get the values from app.config file.
            string mediaServicesAccountName = ConfigurationManager.AppSettings["MediaServicesAccountName"];
            string mediaServicesAccountKey = ConfigurationManager.AppSettings["MediaServicesAccountKey"];
            string storageConnectionString = ConfigurationManager.AppSettings["StorageConnectionString"];


            string endPointAddress = Guid.NewGuid().ToString();

            // Create the context. 
            _context = new CloudMediaContext(mediaServicesAccountName, mediaServicesAccountKey);

            // Create the queue that will be receiving the notification messages.
            _queue = CreateQueue(storageConnectionString, endPointAddress);

            // Create the notification point that is mapped to the queue.
            _notificationEndPoint = 
                    _context.NotificationEndPoints.Create(
                    Guid.NewGuid().ToString(), NotificationEndPointType.AzureQueue, endPointAddress);


            if (_notificationEndPoint != null)
            {
                IJob job = SubmitEncodingJobWithNotificationEndPoint(_singleInputMp4Path);
                WaitForJobToReachedFinishedState(job.Id);
            }

            // Clean up.
            _queue.Delete();      
            _notificationEndPoint.Delete();
       }


        static public CloudQueue CreateQueue(string storageAccountConnectionString, string endPointAddress)
        {
            CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageAccountConnectionString);

            // Create the queue client
            CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();

            // Retrieve a reference to a queue
            CloudQueue queue = queueClient.GetQueueReference(endPointAddress);

            // Create the queue if it doesn't already exist
            queue.CreateIfNotExists();

            return queue;
        }
 

        // Upload a video file, and encode to Smooth Streaming format
        public static IJob SubmitEncodingJobWithNotificationEndPoint(string inputMediaFilePath)
        {
            // Declare a new job.
            IJob job = _context.Jobs.Create("My MP4 to Smooth Streaming encoding job");

            //Create an encrypted asset and upload the mp4. 
            IAsset asset = CreateAssetAndUploadSingleFile(AssetCreationOptions.StorageEncrypted, 
                inputMediaFilePath);

            // Get a media processor reference, and pass to it the name of the 
            // processor to use for the specific task.
            IMediaProcessor processor = GetLatestMediaProcessorByName("Azure Media Encoder");

            // Create a task with the conversion details, using a configuration file. 
            ITask task = job.Tasks.AddNew("My Mp4 to Smooth Task",
                processor,
                "H264 Smooth Streaming 720p",
                Microsoft.WindowsAzure.MediaServices.Client.TaskOptions.None);

            // Specify the input asset to be encoded.
            task.InputAssets.Add(asset);

            // Add an output asset to contain the results of the job.
            task.OutputAssets.AddNew("Output asset",
                AssetCreationOptions.None);

            // Add a notification point to the job. You can add multiple notification points.  
            job.JobNotificationSubscriptions.AddNew(NotificationJobState.FinalStatesOnly, 
                _notificationEndPoint);

            job.Submit();

            return job;
        }

        public static void WaitForJobToReachedFinishedState(string jobId)
        {
            int expectedState = (int)JobState.Finished;
            int timeOutInSeconds = 600;

            bool jobReachedExpectedState = false;
            DateTime startTime = DateTime.Now;
            int jobState = -1;

            while (!jobReachedExpectedState)
            {
                // Specify how often you want to get messages from the queue.
                Thread.Sleep(TimeSpan.FromSeconds(10));

                foreach (var message in _queue.GetMessages(10))
                {
                    using (Stream stream = new MemoryStream(message.AsBytes))
                    {
                        DataContractJsonSerializerSettings settings = new DataContractJsonSerializerSettings();
                        settings.UseSimpleDictionaryFormat = true;
                        DataContractJsonSerializer ser = new DataContractJsonSerializer(typeof(EncodingJobMessage), settings);
                        EncodingJobMessage encodingJobMsg = (EncodingJobMessage)ser.ReadObject(stream);

                        Console.WriteLine();

                        // Display the message information.
                        Console.WriteLine("EventType: {0}", encodingJobMsg.EventType);
                        Console.WriteLine("MessageVersion: {0}", encodingJobMsg.MessageVersion);
                        Console.WriteLine("ETag: {0}", encodingJobMsg.ETag);
                        Console.WriteLine("TimeStamp: {0}", encodingJobMsg.TimeStamp);
                        foreach (var property in encodingJobMsg.Properties)
                        {
                            Console.WriteLine("    {0}: {1}", property.Key, property.Value);
                        }

                        // We are only interested in messages 
                        // where EventType is "JobStateChange".
                        if (encodingJobMsg.EventType == "JobStateChange")
                        {
                            string JobId = (String)encodingJobMsg.Properties.Where(j => j.Key == "JobId").FirstOrDefault().Value;
                            if (JobId == jobId)
                            {
                                string oldJobStateStr = (String)encodingJobMsg.Properties.
                                                            Where(j => j.Key == "OldState").FirstOrDefault().Value;
                                string newJobStateStr = (String)encodingJobMsg.Properties.
                                                            Where(j => j.Key == "NewState").FirstOrDefault().Value;

                                JobState oldJobState = (JobState)Enum.Parse(typeof(JobState), oldJobStateStr);
                                JobState newJobState = (JobState)Enum.Parse(typeof(JobState), newJobStateStr);

                                if (newJobState == (JobState)expectedState)
                                {
                                    Console.WriteLine("job with Id: {0} reached expected state: {1}", 
                                        jobId, newJobState);
                                    jobReachedExpectedState = true;
                                    break;
                                }
                            }
                        }
                    }
                    // Delete the message after we've read it.
                    _queue.DeleteMessage(message);
                }

                // Wait until timeout
                TimeSpan timeDiff = DateTime.Now - startTime;
                bool timedOut = (timeDiff.TotalSeconds > timeOutInSeconds);
                if (timedOut)
                {
                    Console.WriteLine(@"Timeout for checking job notification messages, 
                                        latest found state ='{0}', wait time = {1} secs",
                        jobState,
                        timeDiff.TotalSeconds);

                    throw new TimeoutException();
                }
            }
        }
   
        static private IAsset CreateAssetAndUploadSingleFile(AssetCreationOptions assetCreationOptions, string singleFilePath)
        {
            var asset = _context.Assets.Create("UploadSingleFile_" + DateTime.UtcNow.ToString(), 
                assetCreationOptions);

            var fileName = Path.GetFileName(singleFilePath);

            var assetFile = asset.AssetFiles.Create(fileName);

            Console.WriteLine("Created assetFile {0}", assetFile.Name);
            Console.WriteLine("Upload {0}", assetFile.Name);

            assetFile.Upload(singleFilePath);
            Console.WriteLine("Done uploading of {0}", assetFile.Name);

            return asset;
        }

        static private IMediaProcessor GetLatestMediaProcessorByName(string mediaProcessorName)
        {
            var processor = _context.MediaProcessors.Where(p => p.Name == mediaProcessorName).
                ToList().OrderBy(p => new Version(p.Version)).LastOrDefault();

            if (processor == null)
                throw new ArgumentException(string.Format("Unknown media processor", mediaProcessorName));

            return processor;
        }
    }
}

The example above produced the following output. You values will vary.

Created assetFile BigBuckBunny.mp4
Upload BigBuckBunny.mp4
Done uploading of BigBuckBunny.mp4

EventType: NotificationEndPointRegistration
MessageVersion: 1.0
ETag: e0238957a9b25bdf3351a88e57978d6a81a84527fad03bc23861dbe28ab293f6
TimeStamp: 2013-05-14T20:22:37
    NotificationEndPointId: nb:nepid:UUID:d6af9412-2488-45b2-ba1f-6e0ade6dbc27
    State: Registered
    Name: dde957b2-006e-41f2-9869-a978870ac620
    Created: 2013-05-14T20:22:35

EventType: JobStateChange
MessageVersion: 1.0
ETag: 4e381f37c2d844bde06ace650310284d6928b1e50101d82d1b56220cfcb6076c
TimeStamp: 2013-05-14T20:24:40
    JobId: nb:jid:UUID:526291de-f166-be47-b62a-11ffe6d4be54
    JobName: My MP4 to Smooth Streaming encoding job
    NewState: Finished
    OldState: Processing
    AccountName: westeuropewamsaccount
job with Id: nb:jid:UUID:526291de-f166-be47-b62a-11ffe6d4be54 reached expected 
State: Finished


Build Date:

2014-09-09

Gruppinnehåll

Lägg till
Visa:
© 2014 Microsoft