匯出 (0) 列印
全部展開

處理 Media Services 工作通知

更新日期: 2014年7月

Microsoft Azure Media Services 能夠在處理媒體工作時,將通知訊息傳遞至 Azure 佇列儲存體。本主題顯示如何從佇列儲存體取得這些通知訊息。

傳遞至佇列儲存體的訊息可以從世界任一位置加以存取。Azure 佇列訊息架構十分可靠,而且有十分高的擴充性。相較於其他方法,建議您使用輪詢佇列儲存體。

接聽 Media Services 通知的一個常見情況是,您正在開發需要在編碼工作完成之後執行某些其他工作的內容管理系統 (例如,觸發工作流程中的下一步驟,或發佈內容)。

使用 Azure 儲存體佇列來開發 Media Services 應用程式時,請考量以下項目。

本節中的程式碼範例會:

  1. 定義對應到通知訊息格式的 EncodingJobMessage 類別。程式碼會將從佇列接收到的訊息還原序列化為 EncodingJobMessage 類型的物件。

  2. 從 app.config 檔案載入 Media Services 和儲存體帳戶資訊。使用此資訊來建立 CloudMediaContextCloudQueue 物件。

  3. 建立會接收關於編碼工作的通知訊息的佇列。

  4. 建立會對應到佇列的通知端點。

  5. 將通知端點附加到工作,並提交編碼工作。您可以在工作中附加多個通知端點。

    在此範例中,我們僅著重在工作處理時的最後狀態,因此我們會將 NotificationJobState.FinalStatesOnly 傳送至 AddNew 方法。

    job.JobNotificationSubscriptions.AddNew(NotificationJobState.FinalStatesOnly, _notificationEndPoint); 
    
    若您傳送了 NotificationJobState.All,您應該預期會取得所有狀態變更通知:已佇列 -> 已排程 -> 處理中 -> 已完成。不過,如同先前所說明的,Azure 儲存體佇列服務不保證進行排序的傳遞。您可以使用 Timestamp 屬性 (在以下範例中的 EncodingJobMessage 類型上定義) 來排序訊息。您可能會取得重覆的通知訊息。使用 ETag 屬性 (在 EncodingJobMessage 類型上定義) 來檢查重覆項目。注意也有可能略過某些狀態變更通知。

  6. 每 10 秒鐘檢查一次佇列,等待工作狀態變為 Finished。當工作狀態變為已處理之後,刪除訊息。

  7. 刪除佇列和通知端點。

note附註
建議的方法為透過接聽通知訊息來監控工作的狀態,如下列範例所示。

此外,您也可以使用 IJob.State 屬性檢查工作狀態。 注意工作完成的通知訊息可能會在 State on IJob 設為 Finished 之前抵達。IJob.State  屬性對反應實際狀態會稍有延遲。 

using System; using System.Linq; using System.Configuration; using System.IO; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; using Microsoft.WindowsAzure.MediaServices.Client; using System.Web; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Auth; using Microsoft.WindowsAzure.Storage.Queue; using System.Runtime.Serialization.Json;  namespace JobNotification {     public class EncodingJobMessage     {         // MessageVersion is used for version control.          public String MessageVersion { get; set; }              // Type of the event. Valid values are          // JobStateChange and NotificationEndpointRegistration.         public String EventType { get; set; }              // ETag is used to help the customer detect if          // the message is a duplicate of another message previously sent.         public String ETag { get; set; }              // Time of occurrence of the event.         public String TimeStamp { get; set; }          // Collection of values specific to the event.          // For the JobStateChange event the values are:         //     JobId - Id of the Job that triggered the notification.         //     NewState- The new state of the Job. Valid values are:         //          Scheduled, Processing, Canceling, Cancelled, Error, Finished         //     OldState- The old state of the Job. Valid values are:         //          Scheduled, Processing, Canceling, Cancelled, Error, Finished          // For the NotificationEndpointRegistration event the values are:         //     NotificationEndpointId- Id of the NotificationEndpoint          //          that triggered the notification.         //     State- The state of the Endpoint.          //          Valid values are: Registered and Unregistered.          public IDictionary<string, object> Properties { get; set; }     }      class Program     {         private static CloudMediaContext _context = null;         private static CloudQueue _queue = null;         private static INotificationEndPoint _notificationEndPoint = null;          private static readonly string _singleInputMp4Path =             Path.GetFullPath(@"C:\supportFiles\multifile\BigBuckBunny.mp4");          static void Main(string[] args)         {             // Get the values from app.config file.             string mediaServicesAccountName = ConfigurationManager.AppSettings["MediaServicesAccountName"];             string mediaServicesAccountKey = ConfigurationManager.AppSettings["MediaServicesAccountKey"];             string storageConnectionString = ConfigurationManager.AppSettings["StorageConnectionString"];               string endPointAddress = Guid.NewGuid().ToString();              // Create the context.              _context = new CloudMediaContext(mediaServicesAccountName, mediaServicesAccountKey);              // Create the queue that will be receiving the notification messages.             _queue = CreateQueue(storageConnectionString, endPointAddress);              // Create the notification point that is mapped to the queue.             _notificationEndPoint =                      _context.NotificationEndPoints.Create(                     Guid.NewGuid().ToString(), NotificationEndPointType.AzureQueue, endPointAddress);               if (_notificationEndPoint != null)             {                 IJob job = SubmitEncodingJobWithNotificationEndPoint(_singleInputMp4Path);                 WaitForJobToReachedFinishedState(job.Id);             }              // Clean up.             _queue.Delete();                   _notificationEndPoint.Delete();        }           static public CloudQueue CreateQueue(string storageAccountConnectionString, string endPointAddress)         {             CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageAccountConnectionString);              // Create the queue client             CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();              // Retrieve a reference to a queue             CloudQueue queue = queueClient.GetQueueReference(endPointAddress);              // Create the queue if it doesn't already exist             queue.CreateIfNotExists();              return queue;         }            // Upload a video file, and encode to Smooth Streaming format         public static IJob SubmitEncodingJobWithNotificationEndPoint(string inputMediaFilePath)         {             // Declare a new job.             IJob job = _context.Jobs.Create("My MP4 to Smooth Streaming encoding job");              //Create an encrypted asset and upload the mp4.              IAsset asset = CreateAssetAndUploadSingleFile(AssetCreationOptions.StorageEncrypted,                  inputMediaFilePath);              // Get a media processor reference, and pass to it the name of the              // processor to use for the specific task.             IMediaProcessor processor = GetLatestMediaProcessorByName("Azure Media Encoder");              // Create a task with the conversion details, using a configuration file.              ITask task = job.Tasks.AddNew("My Mp4 to Smooth Task",                 processor,                 "H264 Smooth Streaming 720p",                 Microsoft.WindowsAzure.MediaServices.Client.TaskOptions.None);              // Specify the input asset to be encoded.             task.InputAssets.Add(asset);              // Add an output asset to contain the results of the job.             task.OutputAssets.AddNew("Output asset",                 AssetCreationOptions.None);              // Add a notification point to the job. You can add multiple notification points.               job.JobNotificationSubscriptions.AddNew(NotificationJobState.FinalStatesOnly,                  _notificationEndPoint);              job.Submit();              return job;         }          public static void WaitForJobToReachedFinishedState(string jobId)         {             int expectedState = (int)JobState.Finished;             int timeOutInSeconds = 600;              bool jobReachedExpectedState = false;             DateTime startTime = DateTime.Now;             int jobState = -1;              while (!jobReachedExpectedState)             {                 // Specify how often you want to get messages from the queue.                 Thread.Sleep(TimeSpan.FromSeconds(10));                  foreach (var message in _queue.GetMessages(10))                 {                     using (Stream stream = new MemoryStream(message.AsBytes))                     {                         DataContractJsonSerializerSettings settings = new DataContractJsonSerializerSettings();                         settings.UseSimpleDictionaryFormat = true;                         DataContractJsonSerializer ser = new DataContractJsonSerializer(typeof(EncodingJobMessage), settings);                         EncodingJobMessage encodingJobMsg = (EncodingJobMessage)ser.ReadObject(stream);                          Console.WriteLine();                          // Display the message information.                         Console.WriteLine("EventType: {0}", encodingJobMsg.EventType);                         Console.WriteLine("MessageVersion: {0}", encodingJobMsg.MessageVersion);                         Console.WriteLine("ETag: {0}", encodingJobMsg.ETag);                         Console.WriteLine("TimeStamp: {0}", encodingJobMsg.TimeStamp);                         foreach (var property in encodingJobMsg.Properties)                         {                             Console.WriteLine("    {0}: {1}", property.Key, property.Value);                         }                          // We are only interested in messages                          // where EventType is "JobStateChange".                         if (encodingJobMsg.EventType == "JobStateChange")                         {                             string JobId = (String)encodingJobMsg.Properties.Where(j => j.Key == "JobId").FirstOrDefault().Value;                             if (JobId == jobId)                             {                                 string oldJobStateStr = (String)encodingJobMsg.Properties.                                                             Where(j => j.Key == "OldState").FirstOrDefault().Value;                                 string newJobStateStr = (String)encodingJobMsg.Properties.                                                             Where(j => j.Key == "NewState").FirstOrDefault().Value;                                  JobState oldJobState = (JobState)Enum.Parse(typeof(JobState), oldJobStateStr);                                 JobState newJobState = (JobState)Enum.Parse(typeof(JobState), newJobStateStr);                                  if (newJobState == (JobState)expectedState)                                 {                                     Console.WriteLine("job with Id: {0} reached expected state: {1}",                                          jobId, newJobState);                                     jobReachedExpectedState = true;                                     break;                                 }                             }                         }                     }                     // Delete the message after we've read it.                     _queue.DeleteMessage(message);                 }                  // Wait until timeout                 TimeSpan timeDiff = DateTime.Now - startTime;                 bool timedOut = (timeDiff.TotalSeconds > timeOutInSeconds);                 if (timedOut)                 {                     Console.WriteLine(@"Timeout for checking job notification messages,                                          latest found state ='{0}', wait time = {1} secs",                         jobState,                         timeDiff.TotalSeconds);                      throw new TimeoutException();                 }             }         }             static private IAsset CreateAssetAndUploadSingleFile(AssetCreationOptions assetCreationOptions, string singleFilePath)         {             var asset = _context.Assets.Create("UploadSingleFile_" + DateTime.UtcNow.ToString(),                  assetCreationOptions);              var fileName = Path.GetFileName(singleFilePath);              var assetFile = asset.AssetFiles.Create(fileName);              Console.WriteLine("Created assetFile {0}", assetFile.Name);             Console.WriteLine("Upload {0}", assetFile.Name);              assetFile.Upload(singleFilePath);             Console.WriteLine("Done uploading of {0}", assetFile.Name);              return asset;         }          static private IMediaProcessor GetLatestMediaProcessorByName(string mediaProcessorName)         {             var processor = _context.MediaProcessors.Where(p => p.Name == mediaProcessorName).                 ToList().OrderBy(p => new Version(p.Version)).LastOrDefault();              if (processor == null)                 throw new ArgumentException(string.Format("Unknown media processor", mediaProcessorName));              return processor;         }     } } 

以上範例會產生下列輸出。您的值會依狀況有所不同。

Created assetFile BigBuckBunny.mp4 Upload BigBuckBunny.mp4 Done uploading of BigBuckBunny.mp4  EventType: NotificationEndPointRegistration MessageVersion: 1.0 ETag: e0238957a9b25bdf3351a88e57978d6a81a84527fad03bc23861dbe28ab293f6 TimeStamp: 2013-05-14T20:22:37     NotificationEndPointId: nb:nepid:UUID:d6af9412-2488-45b2-ba1f-6e0ade6dbc27     State: Registered     Name: dde957b2-006e-41f2-9869-a978870ac620     Created: 2013-05-14T20:22:35  EventType: JobStateChange MessageVersion: 1.0 ETag: 4e381f37c2d844bde06ace650310284d6928b1e50101d82d1b56220cfcb6076c TimeStamp: 2013-05-14T20:24:40     JobId: nb:jid:UUID:526291de-f166-be47-b62a-11ffe6d4be54     JobName: My MP4 to Smooth Streaming encoding job     NewState: Finished     OldState: Processing     AccountName: westeuropewamsaccount job with Id: nb:jid:UUID:526291de-f166-be47-b62a-11ffe6d4be54 reached expected  State: Finished 

顯示:
© 2014 Microsoft