Share via


Procedura: Esportare e modificare le registrazioni in blocco

In alcuni scenari è necessario creare o modificare quantità elevate di registrazioni i in hub di notifica. Alcuni di questi scenari sono costituiti da aggiornamenti di tag successivi a calcoli in batch o alla migrazione di un'implementazione push esistente in modo che usi Hub di notifica.

Questo argomento illustra come usare il supporto in blocco di Hub di notifica per eseguire un numero elevato di operazioni in un hub di notifica oppure per esportare tutte le registrazioni.

Flusso di livello elevato

Il supporto in batch è stato progettato per supportare processi a lunga esecuzione che coinvolgono milioni di registrazioni. Per ottenere questa scalabilità, il supporto per l'invio in batch usa Archiviazione di Azure per archiviare i dettagli e l'output del processo. Per operazioni di aggiornamento in blocco, l'utente deve creare un file in un contenitore BLOB, che deve includere l'elenco di operazioni di aggiornamento delle registrazioni. Quando avvia il processo, l'utente fornisce un URL per il BLOB di input, oltre a un URL per una directory di output, che si trova in un contenitore BLOB. Dopo l'avvio del processo, l'utente può verificare lo stato eseguendo query relative a un percorso URL fornito all'avvio del processo. Si noti che un processo specifico può eseguire solo operazioni di un tipo specifico, ad esempio creazioni, aggiornamenti o eliminazioni. Le operazioni di esportazione sono eseguite in modo analogo.

Importa

Eseguire la configurazione

In questa sezione si presuppone che siano disponibili gli elementi seguenti:

  1. Un hub di notifica sottoposto a provisioning.

  2. Un contenitore BLOB dell'archiviazione di Azure.

  3. Riferimenti ai pacchetti di Archiviazione di Azure e bus di servizio di Azure NuGet.

Creare il file di input e archiviarlo in un BLOB

Un file di input include un elenco di registrazioni serializzate in XML, una per riga. Usando Azure SDK, l'esempio di codice seguente mostra come serializzare le registrazioni e caricarle nel contenitore BLOB.

private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
{
    StringBuilder builder = new StringBuilder();
    foreach (var registrationDescription in descriptions)
    {
        builder.AppendLine(RegistrationDescription.Serialize());
    }

    var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
    using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
    {
        inputBlob.UploadFromStream(stream);
    }
}

Importante

Il codice precedente serializza le registrazioni in memoria e quindi carica l'intero flusso in un BLOB. Se è stato caricato un file di più di pochi megabyte, fare riferimento alle indicazioni sui BLOB di Azure su come eseguire questi passaggi; ad esempio BLOB in blocchi.

Creare token URL

Dopo il caricamento del file di input è necessario generare gli URL da fornire all'hub di notifica per il file di input e la directory di output. Si noti che è possibile usare due contenitori BLOB diversi per input e output.

static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
{
    SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
    {
        SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
        Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
    };

    string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
    return new Uri(container.Uri + sasContainerToken);
}

static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
{
    SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
    {
        SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
        Permissions = SharedAccessBlobPermissions.Read
    };
    string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);
    return new Uri(container.Uri + "/" + filePath + sasToken);
}

Inviare il processo

Quando gli URL di input e output sono disponibili, è possibile iniziare il processo in batch.

NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var createTask = client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
        JobType = NotificationHubJobType.ImportCreateRegistrations,
        OutputContainerUri = outputContainerSasUri,
        ImportFileUri = inputFileSasUri
    }
);
createTask.Wait();

var job = createTask.Result;
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
    var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
    getJobTask.Wait();
    job = getJobTask.Result;
    Thread.Sleep(1000);
    i--;
}

Oltre agli URL di input e output URL, questo esempio crea un oggetto NotificationHubJob che include un oggetto JobType, che può essere uno degli elementi seguenti:

  • ImportCreateRegistrations

  • ImportUpdateRegistrations

  • ImportDeleteRegistrations

Al termine della chiamata, il processo viene continuato dall'hub di notifica ed è possibile controllarne lo stato con la chiamata a GetNotificationHubJobAsync.

Al termine del processo è possibile esaminare i risultati, verificando i file seguenti nella directory di output:

  • /<hub>/<jobid>/Failed.txt

  • /<hub>/<jobid>/Output.txt

Questi file includono l'elenco delle operazioni riuscite e non riuscite del batch. I file hanno estensione cvs. Ogni riga include il numero di riga del file di input originale e l'output dell'operazione, ovvero in genere la descrizione della registrazione creata o aggiornata.

Esportazione

L'esportazione della registrazione è analoga all'importazione, con le differenze seguenti:

  1. È necessario solo l'URL di output.

  2. È necessario creare un processo NotificationHub di tipo ExportRegistrations.

Codice di esempio completo

Di seguito è disponibile un esempio funzionante completo per l'importazione di registrazioni in un hub di notifica.

using Microsoft.ServiceBus.Notifications;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;

namespace ConsoleApplication1
{
    class Program
    {
        private static string CONNECTION_STRING = "---";
        private static string HUB_NAME = "---";
        private static string INPUT_FILE_NAME = "CreateFile.txt";
        private static string STORAGE_ACCOUNT = "---";
        private static string STORAGE_PASSWORD = "---";
        private static StorageUri STORAGE_ENDPOINT = new StorageUri(new Uri("---"));

        static void Main(string[] args)
        {
            var descriptions = new[]
            {
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMkUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMjUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMhUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMdUxREQFBlVTTkMwMQ"),
            };

            //write to blob store to create an input file
            var blobClient = new CloudBlobClient(STORAGE_ENDPOINT, new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials(STORAGE_ACCOUNT, STORAGE_PASSWORD));
            var container = blobClient.GetContainerReference("testjobs");
            container.CreateIfNotExists();

            SerializeToBlob(container, descriptions);

            // TODO then create Sas
            var outputContainerSasUri = GetOutputDirectoryUrl(container);
            var inputFileSasUri = GetInputFileUrl(container, INPUT_FILE_NAME);


            //Lets import this file
            NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
            var createTask = client.SubmitNotificationHubJobAsync(
                new NotificationHubJob {
                    JobType = NotificationHubJobType.ImportCreateRegistrations,
                    OutputContainerUri = outputContainerSasUri,
                    ImportFileUri = inputFileSasUri
                }
            );
            createTask.Wait();

            var job = createTask.Result;
            long i = 10;
            while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
            {
                var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
                getJobTask.Wait();
                job = getJobTask.Result;
                Thread.Sleep(1000);
                i--;
            }
        }

        private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
        {
            StringBuilder builder = new StringBuilder();
            foreach (var registrationDescription in descriptions)
            {
                builder.AppendLine(RegistrationDescription.Serialize());
            }

            var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
            using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
            {
                inputBlob.UploadFromStream(stream);
            }
        }

        static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
        {
            //Set the expiry time and permissions for the container.
            //In this case no start time is specified, so the shared access signature becomes valid immediately.
            SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
            {
                SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
                Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
            };

            //Generate the shared access signature on the container, setting the constraints directly on the signature.
            string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);

            //Return the URI string for the container, including the SAS token.
            return new Uri(container.Uri + sasContainerToken);
        }

        static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
        {
            //Set the expiry time and permissions for the container.
            //In this case no start time is specified, so the shared access signature becomes valid immediately.
            SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
            {
                SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
                Permissions = SharedAccessBlobPermissions.Read
            };

            //Generate the shared access signature on the container, setting the constraints directly on the signature.
            string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);

            //Return the URI string for the container, including the SAS token.
            return new Uri(container.Uri + "/" + filePath + sasToken);
        }

        static string GetJobPath(string namespaceName, string notificationHubPath, string jobId)
        {
            return string.Format(CultureInfo.InvariantCulture, @"{0}//{1}/{2}/", namespaceName, notificationHubPath, jobId);
        }
    }
}