导出 (0) 打印
全部展开

处理 Media Services 作业通知

更新时间: 2013年11月

注:本页面内容可能不完全适用中国大陆地区运营的 Windows Azure服务。如要了解不同地区 Windows Azure 服务的差异, 请参考本网站.

Microsoft Azure Media Services 可以在处理媒体作业时将通知消息传送到 Azure 队列存储。本主题将介绍如何从队列存储获取这些通知消息。

传送到队列存储的消息可以在世界任何位置进行访问。Azure 队列消息传送体系结构具有可靠性和高度可扩展性。在所有方法中,我们建议使用轮询队列存储方法。

侦听 Media Services 通知的一个常见情景是您要开发一个需要在编码作业完成后执行一些附加任务的内容管理系统(例如,触发工作流中的下一个步骤或发布内容)。

注意事项

开发使用 Azure 存储队列的 媒体服务 应用程序时,应考虑以下注意事项。

代码示例

本部分中的代码示例执行以下操作:

  1. 定义映射到通知消息格式的 EncodingJobMessage 类。此代码会将从队列中收到的消息反序列化到 EncodingJobMessage 类型的对象中。

  2. 从 app.config 文件加载 Media Services 和存储帐户信息。使用此信息创建 CloudMediaContextCloudQueue 对象。

  3. 创建将用于接收编码作业的通知消息的队列。

  4. 创建映射到队列的通知终点。

  5. 将通知终点附加到作业并提交编码作业。可以将多个通知终点附加到作业。

    在本例中,我们只关心作业的最终处理状态,因此我们将 NotificationJobState.FinalStatesOnly 传递到 AddNew 方法。

    job.JobNotificationSubscriptions.AddNew(NotificationJobState.FinalStatesOnly, _notificationEndPoint); 
    
    如果传递 NotificationJobState.All,则会获得所有状态更改通知:已排队 -> 已安排 -> 正在处理 -> 已完成。但是,正如上文所述,Azure 存储队列服务并不保证按顺序进行传送。您可以使用 Timestamp 属性(通过以下示例中的 EncodingJobMessage 类型进行定义)对消息进行排序。您有可能获得重复的通知消息。可使用 ETag 属性(以 EncodingJobMessage 类型进行定义)检查重复消息。请注意,也有可能某些状态更改消息会被跳过。

  6. 每 10 秒检查队列一次,等待作业到达 Finished 状态。在消息处理完之后将其删除。

  7. 删除队列和通知终点。

note注意
建议通过侦听通知消息的方式来监视作业的状态,如以下示例所示。

或者,也可以使用 IJob.State property 来检查作业的状态。请注意,有关作业完成情况的通知消息可能会在 IJob 的 State 设置为 Finished 之前到达。IJob.State  属性将以略微延迟的方式反映准确的状态。 

using System; using System.Linq; using System.Configuration; using System.IO; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; using Microsoft.WindowsAzure.MediaServices.Client; using System.Web; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Auth; using Microsoft.WindowsAzure.Storage.Queue; using System.Runtime.Serialization.Json;  namespace JobNotification {     public class EncodingJobMessage     {         // MessageVersion is used for version control.          public String MessageVersion { get; set; }              // Type of the event. Valid values are          // JobStateChange and NotificationEndpointRegistration.         public String EventType { get; set; }              // ETag is used to help the customer detect if          // the message is a duplicate of another message previously sent.         public String ETag { get; set; }              // Time of occurrence of the event.         public String TimeStamp { get; set; }          // Collection of values specific to the event.          // For the JobStateChange event the values are:         //     JobId - Id of the Job that triggered the notification.         //     NewState- The new state of the Job. Valid values are:         //          Scheduled, Processing, Canceling, Cancelled, Error, Finished         //     OldState- The old state of the Job. Valid values are:         //          Scheduled, Processing, Canceling, Cancelled, Error, Finished          // For the NotificationEndpointRegistration event the values are:         //     NotificationEndpointId- Id of the NotificationEndpoint          //          that triggered the notification.         //     State- The state of the Endpoint.          //          Valid values are: Registered and Unregistered.          public IDictionary<string, object> Properties { get; set; }     }      class Program     {         private static CloudMediaContext _context = null;         private static CloudQueue _queue = null;         private static INotificationEndPoint _notificationEndPoint = null;          private static readonly string _singleInputMp4Path =             Path.GetFullPath(@"C:\supportFiles\multifile\BigBuckBunny.mp4");          static void Main(string[] args)         {             // Get the values from app.config file.             string mediaServicesAccountName = ConfigurationManager.AppSettings["MediaServicesAccountName"];             string mediaServicesAccountKey = ConfigurationManager.AppSettings["MediaServicesAccountKey"];             string storageConnectionString = ConfigurationManager.AppSettings["StorageConnectionString"];               string endPointAddress = Guid.NewGuid().ToString();              // Create the context.              _context = new CloudMediaContext(mediaServicesAccountName, mediaServicesAccountKey);              // Create the queue that will be receiving the notification messages.             _queue = CreateQueue(storageConnectionString, endPointAddress);              // Create the notification point that is mapped to the queue.             _notificationEndPoint =                      _context.NotificationEndPoints.Create(                     Guid.NewGuid().ToString(), NotificationEndPointType.AzureQueue, endPointAddress);               if (_notificationEndPoint != null)             {                 IJob job = SubmitEncodingJobWithNotificationEndPoint(_singleInputMp4Path);                 WaitForJobToReachedFinishedState(job.Id);             }              // Clean up.             _queue.Delete();                   _notificationEndPoint.Delete();        }           static public CloudQueue CreateQueue(string storageAccountConnectionString, string endPointAddress)         {             CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageAccountConnectionString);              // Create the queue client             CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();              // Retrieve a reference to a queue             CloudQueue queue = queueClient.GetQueueReference(endPointAddress);              // Create the queue if it doesn't already exist             queue.CreateIfNotExists();              return queue;         }            // Upload a video file, and encode to Smooth Streaming format         public static IJob SubmitEncodingJobWithNotificationEndPoint(string inputMediaFilePath)         {             // Declare a new job.             IJob job = _context.Jobs.Create("My MP4 to Smooth Streaming encoding job");              //Create an encrypted asset and upload the mp4.              IAsset asset = CreateAssetAndUploadSingleFile(AssetCreationOptions.StorageEncrypted,                  inputMediaFilePath);              // Get a media processor reference, and pass to it the name of the              // processor to use for the specific task.             IMediaProcessor processor = GetLatestMediaProcessorByName("Azure Media Encoder");              // Create a task with the conversion details, using a configuration file.              ITask task = job.Tasks.AddNew("My Mp4 to Smooth Task",                 processor,                 "H264 Smooth Streaming 720p",                 Microsoft.WindowsAzure.MediaServices.Client.TaskOptions.None);              // Specify the input asset to be encoded.             task.InputAssets.Add(asset);              // Add an output asset to contain the results of the job.             task.OutputAssets.AddNew("Output asset",                 AssetCreationOptions.None);              // Add a notification point to the job. You can add multiple notification points.               job.JobNotificationSubscriptions.AddNew(NotificationJobState.FinalStatesOnly,                  _notificationEndPoint);              job.Submit();              return job;         }          public static void WaitForJobToReachedFinishedState(string jobId)         {             int expectedState = (int)JobState.Finished;             int timeOutInSeconds = 600;              bool jobReachedExpectedState = false;             DateTime startTime = DateTime.Now;             int jobState = -1;              while (!jobReachedExpectedState)             {                 // Specify how often you want to get messages from the queue.                 Thread.Sleep(TimeSpan.FromSeconds(10));                  foreach (var message in _queue.GetMessages(10))                 {                     using (Stream stream = new MemoryStream(message.AsBytes))                     {                         DataContractJsonSerializerSettings settings = new DataContractJsonSerializerSettings();                         settings.UseSimpleDictionaryFormat = true;                         DataContractJsonSerializer ser = new DataContractJsonSerializer(typeof(EncodingJobMessage), settings);                         EncodingJobMessage encodingJobMsg = (EncodingJobMessage)ser.ReadObject(stream);                          Console.WriteLine();                          // Display the message information.                         Console.WriteLine("EventType: {0}", encodingJobMsg.EventType);                         Console.WriteLine("MessageVersion: {0}", encodingJobMsg.MessageVersion);                         Console.WriteLine("ETag: {0}", encodingJobMsg.ETag);                         Console.WriteLine("TimeStamp: {0}", encodingJobMsg.TimeStamp);                         foreach (var property in encodingJobMsg.Properties)                         {                             Console.WriteLine("    {0}: {1}", property.Key, property.Value);                         }                          // We are only interested in messages                          // where EventType is "JobStateChange".                         if (encodingJobMsg.EventType == "JobStateChange")                         {                             string JobId = (String)encodingJobMsg.Properties.Where(j => j.Key == "JobId").FirstOrDefault().Value;                             if (JobId == jobId)                             {                                 string oldJobStateStr = (String)encodingJobMsg.Properties.                                                             Where(j => j.Key == "OldState").FirstOrDefault().Value;                                 string newJobStateStr = (String)encodingJobMsg.Properties.                                                             Where(j => j.Key == "NewState").FirstOrDefault().Value;                                  JobState oldJobState = (JobState)Enum.Parse(typeof(JobState), oldJobStateStr);                                 JobState newJobState = (JobState)Enum.Parse(typeof(JobState), newJobStateStr);                                  if (newJobState == (JobState)expectedState)                                 {                                     Console.WriteLine("job with Id: {0} reached expected state: {1}",                                          jobId, newJobState);                                     jobReachedExpectedState = true;                                     break;                                 }                             }                         }                     }                     // Delete the message after we've read it.                     _queue.DeleteMessage(message);                 }                  // Wait until timeout                 TimeSpan timeDiff = DateTime.Now - startTime;                 bool timedOut = (timeDiff.TotalSeconds > timeOutInSeconds);                 if (timedOut)                 {                     Console.WriteLine(@"Timeout for checking job notification messages,                                          latest found state ='{0}', wait time = {1} secs",                         jobState,                         timeDiff.TotalSeconds);                      throw new TimeoutException();                 }             }         }             static private IAsset CreateAssetAndUploadSingleFile(AssetCreationOptions assetCreationOptions, string singleFilePath)         {             var asset = _context.Assets.Create("UploadSingleFile_" + DateTime.UtcNow.ToString(),                  assetCreationOptions);              var fileName = Path.GetFileName(singleFilePath);              var assetFile = asset.AssetFiles.Create(fileName);              Console.WriteLine("Created assetFile {0}", assetFile.Name);             Console.WriteLine("Upload {0}", assetFile.Name);              assetFile.Upload(singleFilePath);             Console.WriteLine("Done uploading of {0}", assetFile.Name);              return asset;         }          static private IMediaProcessor GetLatestMediaProcessorByName(string mediaProcessorName)         {             var processor = _context.MediaProcessors.Where(p => p.Name == mediaProcessorName).                 ToList().OrderBy(p => new Version(p.Version)).LastOrDefault();              if (processor == null)                 throw new ArgumentException(string.Format("Unknown media processor", mediaProcessorName));              return processor;         }     } } 

以上示例会生成下列输出。您获得的值可能会有所不同。

Created assetFile BigBuckBunny.mp4 Upload BigBuckBunny.mp4 Done uploading of BigBuckBunny.mp4  EventType: NotificationEndPointRegistration MessageVersion: 1.0 ETag: e0238957a9b25bdf3351a88e57978d6a81a84527fad03bc23861dbe28ab293f6 TimeStamp: 2013-05-14T20:22:37     NotificationEndPointId: nb:nepid:UUID:d6af9412-2488-45b2-ba1f-6e0ade6dbc27     State: Registered     Name: dde957b2-006e-41f2-9869-a978870ac620     Created: 2013-05-14T20:22:35  EventType: JobStateChange MessageVersion: 1.0 ETag: 4e381f37c2d844bde06ace650310284d6928b1e50101d82d1b56220cfcb6076c TimeStamp: 2013-05-14T20:24:40     JobId: nb:jid:UUID:526291de-f166-be47-b62a-11ffe6d4be54     JobName: My MP4 to Smooth Streaming encoding job     NewState: Finished     OldState: Processing     AccountName: westeuropewamsaccount job with Id: nb:jid:UUID:526291de-f166-be47-b62a-11ffe6d4be54 reached expected  State: Finished 


构建日期:

2014-05-23

社区附加资源

添加
显示:
© 2014 Microsoft