Try fast search NHibernate

10 January 2017

Azure IoT Hub a puro REST

El año pasado, en una reunión con el colorado el ruso y el pibe, me pidieron un ejemplo de uso de Azure IoT Hub sin usar el SDK. Yo pensaba que con un poco de GoogleFu algo iba a encontrar pero… nada.

En lugar de seguir buscando codigo y ya que todo, o casi, en Azure tiene API REST empecé a leer la documentación de la API y a codear.

Si quieren probar el codigo de este post, a parte un account de Azure, necesitan crear un IoTHub y tener a mano tres parametros:
1) el host del IoTHub que creaste
HostName
2) El nombre de la policy. En este caso, aunque ya tienen policies definida por default, le conviene crear una policy nueva.
PolicyName
3) la key de la policy. Como key pueden usar la primary o la secondary; en muchos servicios en Azure siempre tienen dos keys para cambiar/regenerar la key sin sufrir downtime.
SAS

Ya tenemos los ingredientes arriba la mesada y podemos empezar a prepararlos para la cocción.

Ya hace un tiempito que en Azure muchos servicios gozan de la fantastica SAS (Shared Access Signature) que, basicamente, nos permite compartir recursos y/o servicios sin por eso tener que hacer “viajar” las keys o utilizar procesos de auth que pueden ser más complejos como OAuth2. La SAS viaja plácidamente en el query string o en los headers de un request REST. En este caso usaremos la SAS para evitar el obscuro MitM (man in the middle) creando una SAS, usable por un tiempo relativamente corto (TTL time to live), para enviar mensajes al Azure IoT Hub. Una SAS para, condimentar un request al IoT Hub, se prepara de esta forma:

    private static readonly DateTime epochTime = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);

    public static string SharedAccessSignature(string hostUrl, string policyName, string policyAccessKey, TimeSpan timeToLive)
    {
      if (string.IsNullOrWhiteSpace(hostUrl))
      {
        throw new ArgumentNullException(nameof(hostUrl));
      }

      var expires = Convert.ToInt64(DateTime.UtcNow.Add(timeToLive).Subtract(epochTime).TotalSeconds).ToString(CultureInfo.InvariantCulture);
      var resourceUri = WebUtility.UrlEncode(hostUrl.ToLowerInvariant());
      var toSign = string.Concat(resourceUri, "\n", expires);
      var signed = Sign(toSign, policyAccessKey);

      var sb = new StringBuilder();
      sb.Append("sr=").Append(resourceUri)
        .Append("&sig=").Append(WebUtility.UrlEncode(signed))
        .Append("&se=").Append(expires);
      if (!string.IsNullOrEmpty(policyName))
      {
        sb.Append("&skn=").Append(WebUtility.UrlEncode(policyName));
      }
      return sb.ToString();
    }

    private static string Sign(string requestString, string key)
    {
      using (var hmacshA256 = new HMACSHA256(Convert.FromBase64String(key)))
      {
        var hash = hmacshA256.ComputeHash(Encoding.UTF8.GetBytes(requestString));
        return Convert.ToBase64String(hash);
      }
    }
Ingredientes y condimento preparados podemos empezar cocinando la entrada o sea el check de existencia y la registración de un device en el IoT Hub… el device se puede registrar en el IoT Hub a mano (desde el portal de Azure) pero va a resultar medio engorroso si planean agregar una estación de monitoreo sin mucha cerimonia.
    public static async Task CreateIfNotExists(HttpClient httpClient, string deviceId)
    {
      if (await Exists(httpClient, deviceId))
      {
        return;
      }
      var jsonMessage =
        $"{{\"deviceId\": \"{deviceId}\", \"status\": \"enabled\", \"statusReason\": \"Listo para enviar info\"}}";

      var request = new HttpRequestMessage(HttpMethod.Put, $"devices/{deviceId}?api-version=2016-02-03")
      {
        Content = new StringContent(jsonMessage, Encoding.ASCII, "application/json")
      };
      var response = await httpClient.SendAsync(request);
      if (!response.IsSuccessStatusCode)
      {
        throw new IOException("No fue posible registrar la estación de medición.");
      }
    }

    public static async Task<bool> Exists(HttpClient httpClient, string deviceId)
    {
      var request = new HttpRequestMessage(HttpMethod.Get, $"devices/{deviceId}?api-version=2016-02-03");
      request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
      var response = await httpClient.SendAsync(request);
      return response.IsSuccessStatusCode;
    }
Como notarán, hasta ahora, son todo metodos de una pequeña utility (todo los metodos son static) que llamé, con muchisima fantasia: DeviceRest . Otra peculiaridad de estos métodos es el hecho que reciben un HttpClient ; el motivo será más claro cocinando el plato principal.
  public class MeditionInfoSender
  {
    private readonly string stationId;
    private HttpClient currentHttpClient;
    private readonly string iotHubHost;
    private readonly string iotHubPolicyName;
    private readonly string iotHubPolicyKey;

    public MeditionInfoSender(string iotHubHost, string iotHubPolicyName, string iotHubPolicyKey, string stationId)
    {
      if (string.IsNullOrWhiteSpace(iotHubHost))
      {
        throw new ArgumentNullException(nameof(iotHubHost));
      }
      if (string.IsNullOrWhiteSpace(iotHubPolicyName))
      {
        throw new ArgumentNullException(nameof(iotHubPolicyName));
      }
      if (string.IsNullOrWhiteSpace(iotHubPolicyKey))
      {
        throw new ArgumentNullException(nameof(iotHubPolicyKey));
      }
      if (string.IsNullOrWhiteSpace(stationId))
      {
        throw new ArgumentNullException(nameof(stationId));
      }
      this.stationId = stationId;
      this.iotHubHost = iotHubHost;
      this.iotHubPolicyName = iotHubPolicyName;
      this.iotHubPolicyKey = iotHubPolicyKey;
    }

    public void InitializeStation()
    {
      using (var httpClient = new HttpClient())
      {
        httpClient.BaseAddress = new UriBuilder {Scheme = "https", Host = iotHubHost}.Uri;
        var hubSharedAccessSignature = DeviceRest.SharedAccessSignature(iotHubHost
          , iotHubPolicyName
          , iotHubPolicyKey
          , TimeSpan.FromMinutes(1));
        httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("SharedAccessSignature", hubSharedAccessSignature);
        DeviceRest.CreateIfNotExists(httpClient, stationId).Wait();
      }
      currentHttpClient = CreateHttpClient();
    }

    public async Task<bool> Send(string jsonMessage)
    {
      if (string.IsNullOrWhiteSpace(jsonMessage))
      {
        return false;
      }
      var request = new HttpRequestMessage(HttpMethod.Post, $"devices/{stationId}/messages/events?api-version=2016-02-03")
      {
        Content = new StringContent(jsonMessage, Encoding.ASCII, "application/json")
      };
      try
      {
        var httpClient = GetHttpClient();
        var response = await httpClient.SendAsync(request);
        return response.IsSuccessStatusCode;
      }
      catch (Exception)
      {
        return false;
      }
    }

    private HttpClient GetHttpClient()=> currentHttpClient ?? (currentHttpClient= CreateHttpClient());

    private HttpClient CreateHttpClient()
    {
      var httpClient = new HttpClient(new SharedAccessSignatureAuthHandler(iotHubHost, iotHubPolicyName, iotHubPolicyKey))
      {
        BaseAddress = new UriBuilder {Scheme = "https", Host = iotHubHost}.Uri,
      };
      return httpClient;
    }

    private class SharedAccessSignatureAuthHandler : HttpClientHandler
    {
      private AuthenticationHeaderValue currentSas;
      private readonly TimeSpan maxSasTtl = TimeSpan.FromMinutes(23);
      private readonly Stopwatch timer = new Stopwatch();
      private readonly string iotHubHost;
      private readonly string iotHubPolicyName;
      private readonly string iotHubPolicyKey;

      public SharedAccessSignatureAuthHandler(string iotHubHost, string iotHubPolicyName, string iotHubPolicyKey)
      {
        if (string.IsNullOrWhiteSpace(iotHubHost))
        {
          throw new ArgumentNullException(nameof(iotHubHost));
        }
        if (string.IsNullOrWhiteSpace(iotHubPolicyName))
        {
          throw new ArgumentNullException(nameof(iotHubPolicyName));
        }
        if (string.IsNullOrWhiteSpace(iotHubPolicyKey))
        {
          throw new ArgumentNullException(nameof(iotHubPolicyKey));
        }
        this.iotHubHost = iotHubHost;
        this.iotHubPolicyName = iotHubPolicyName;
        this.iotHubPolicyKey = iotHubPolicyKey;
      }

      protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
      {
        if (timer.Elapsed >= maxSasTtl || currentSas == null)
        {
          SetCurrentAuth();
        }
        request.Headers.Authorization = currentSas;
        return base.SendAsync(request, cancellationToken);
      }

      private void SetCurrentAuth()
      {
        timer.Reset();
        var hubSharedAccessSignature = DeviceRest.SharedAccessSignature(iotHubHost
          , iotHubPolicyName
          , iotHubPolicyKey
          , maxSasTtl.Add(TimeSpan.FromMinutes(3)));
        currentSas = new AuthenticationHeaderValue("SharedAccessSignature", hubSharedAccessSignature);
        timer.Start();
      }
    }
  }
Esta clase se ocupa de toda (lo que necesité) la comunicación con el IoT Hub usando su API REST. La parte más compleja (bueno… ponele…) es la gestión de la instancia de HttpClient que se usa a cada request.En el pipeline del HttpClient se usa una instancia de HttpClientHandler encargada de generar una SAS, con un TTL de 3 minutos, y adornar el request agregando el Authorization Header (el handler es la clase SharedAccessSignatureAuthHandler ).

Ya está el plato cocinado falta solo ver como se come…

Considerando que ustedes sabrán como obtener el stationId (que para el IoT Hub es el Device-ID), el primer bocón sería mas o menos así:
      sender = new MeditionInfoSender(iotHubHost, iotHubPolicyName, iotHubPolicyKey, stationId);
      Console.WriteLine($"Inicializando estación de medición '{stationId}'...");
      sender.InitializeStation();
      Console.WriteLine($"Estación '{stationId}' inicializada.");
Considerando que ustedes sabrán como obtener/construir el iotJsonMessage , el envío de un mensaje al IoT Hub (todos los otros bocones) se reduce a lo siguiente:
        var sent = await sender.Send(iotJsonMessage);

Esta receta es para la versión ligth/zero que pueden consumir quienes están a dieta o le resulta pesado el SDK o simplemente se divierte cocinando comida casera (pueden usarlo para traducirlo en el lenguaje de su device). Los que pueden ir al fast-food sería mejor que usen el SDK que le corresponda; le paso la que ya es solo la landing page del SDK (en el texto de la pagina encontrarán en link al repo de cada lenguaje): https://github.com/Azure/azure-iot-sdks

Como postre: el codigo de este post, as is, se usó en un par de proyectos .NET Core para los siguientes runtimes:
  "runtimes": {
    "win10-x64": {},
    "ubuntu.14.04-x64": {},
    "debian.8-x64": {} 
  }
o sea que funcionó en windows, ubuntu y docker (meterlo en un RaspberryPi, con UWP, no cuesta mucho)… y si! .NET, al fin, es multiplataforma y OSS.

05 April 2012

NHibernate: autocreate indexes for foreignkey

In these last days we have fallen in a performance issue with one of our DBs; the last and littlest one.

The creation of indexes on FKs seems to be a best-practice for MS-SQL-server and ORACLE and is not needed with Firebird; in Firebird instead than a best-practice is the default behavior: the FK includes an index.

That said I have to be sure that each FK has to have an index. After check our mappings I have realized that no one of our tables have the corresponding index for each FK.
Big PITA ?… no! that has been the lucky, because now I can create a very little piece of code to let NHibernate create all needed indexes for me.
public static class NHibernateConfigurationExtensions
{
    private static readonly PropertyInfo TableMappingsProperty =
        typeof(Configuration).GetProperty("TableMappings", BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);

    public static void CreateIndexesForForeignKeys(this Configuration configuration)
    {
        configuration.BuildMappings();
        var tables = (ICollection<Table>)TableMappingsProperty.GetValue(configuration, null);
        foreach (var table in tables)
        {
            foreach (var foreignKey in table.ForeignKeyIterator)
            {
                var idx = new Index();
                idx.AddColumns(foreignKey.ColumnIterator);
                idx.Name = "IDX" + foreignKey.Name.Substring(2);
                idx.Table = table;
                table.AddIndex(idx);
            }
        }
    }
}
and this is the code to have the migration-step script.
var configuration = CreateConfiguration();
configuration.CreateIndexesForForeignKeys();
var sb = new StringBuilder(500);
new SchemaUpdate(configuration).Execute(s => sb.Append(s), false);
Console.WriteLine(sb.ToString());
That is all… few minutes and everything done and checked for now on.

P.S. perhaps I'll be back to my blog...perhaps

19 November 2011

Azure queues: Producer

If you have read something about Azure’s queue you have probably seen some picture like this:

AzureQ_PC

A producer is anything (a class) that, in some moment, enqueue a message; more exactly this “anything” is something that take the role of message producer. For example it can be a MVC controller, more usually it can be an application-service or a domain-event but it can be even a view.

To simplify the work with messages, to our team, I have created two very simple classes on top of Microsoft.WindowsAzure.StorageClient. The main target of these two classes is establish some conventions and make the message strongly typed (it represent a limitation of the capability of Azure’s queues but for our works it is ok).

The basic

Giving a message like this:

public class NewsViewed
{
    public int CountryId { get; set; }
    public Guid NewsId { get; set; }
    public Uri Referrer { get; set; }
}

I would enqueue a new message with a line like this:

new MessageQueue<NewsViewed>().Enqueue(new NewsViewed
{
    CountryId = args.CountryId,
    NewsId = args.NewsId,
    Referrer = args.Referrer
});

When you want work with a message you will probably need some more information than the pure message data so a message in a queue is represented by:

public class QueueMessage<TMessage>
{
    public string Id { get; internal set; }
    public string PopReceipt { get; internal set; }
    public DateTime? InsertionTime { get; internal set; }
    public DateTime? ExpirationTime { get; internal set; }
    public DateTime? NextVisibleTime { get; internal set; }
    public int DequeueCount { get; internal set; }
    public TMessage Data { get; internal set; }
}

The conventions

The base convention is about the name of the queue which is the name of the class representing the message. For the example above the name of the queue is: newsviewed. The second convention is about the format of the message-content: ours Azure-queues will contains just strings, more exactly the JSON serialization of the class representing the message.

The MessageQueue implementation

Currently the max message size is 8KB, perhaps, hopefully NO, we will need more space so the MessageQueue<TMessage> class have to be extensible to allow a gzip/de-gzip of the content or it needs take the real content from a blob… so far it is far away of our needs and I hope we will never need it.

The implementation is:

/// <summary>
/// Generic base class for messages.
/// </summary>
/// <typeparam name="TMessage">The type of the message.</typeparam>
/// <remarks>
/// The <typeparamref name="TMessage"/> have to be JSON serializable.
/// </remarks>
public class MessageQueue<TMessage> where TMessage : class
{
    private const int MaxMessageBlockAllowedByAzure = 32;
    private readonly CloudQueueClient queueClient;
    private readonly string queueName = typeof (TMessage).Name.ToLowerInvariant();

    public MessageQueue() : this(AzureAccount.DefaultAccount()) {}

    public MessageQueue(CloudStorageAccount account)
    {
        if (account == null)
        {
            throw new ArgumentNullException("account");
        }
        queueClient = account.CreateCloudQueueClient();
    }

    public int ApproximateMessageCount
    {
        get
        {
            CloudQueue queueRef = queueClient.GetQueueReference(queueName);
            queueRef.RetrieveApproximateMessageCount();
            if (queueRef.ApproximateMessageCount.HasValue)
            {
                return queueRef.ApproximateMessageCount.Value;
            }
            return 0;
        }
    }

    public void Enqueue(TMessage messageContent)
    {
        if (messageContent == null)
        {
            throw new ArgumentNullException("messageContent");
        }
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        var message = new CloudQueueMessage(SerializeObjectAsString(messageContent));
        queueRef.AddMessage(message);
    }

    public QueueMessage<TMessage> Dequeue()
    {
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        CloudQueueMessage message = queueRef.GetMessage();
        return ConvertToQueueMessage(message);
    }

    public IEnumerable<QueueMessage<TMessage>> Dequeue(int messagesCount)
    {
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        IEnumerable<CloudQueueMessage> messages = queueRef.GetMessages(messagesCount < MaxMessageBlockAllowedByAzure ? messagesCount : MaxMessageBlockAllowedByAzure);
        return messages.Select(x => ConvertToQueueMessage(x));
    }

    public QueueMessage<TMessage> Dequeue(TimeSpan timeout)
    {
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        CloudQueueMessage message = queueRef.GetMessage(timeout);
        return ConvertToQueueMessage(message);
    }

    public IEnumerable<QueueMessage<TMessage>> Dequeue(int messagesCount, TimeSpan timeout)
    {
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        IEnumerable<CloudQueueMessage> messages = queueRef.GetMessages(messagesCount < MaxMessageBlockAllowedByAzure ? messagesCount:MaxMessageBlockAllowedByAzure, timeout);
        return messages.Select(x => ConvertToQueueMessage(x));
    }

    public QueueMessage<TMessage> Peek()
    {
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        CloudQueueMessage message = queueRef.PeekMessage();
        return ConvertToQueueMessage(message);
    }

    public IEnumerable<QueueMessage<TMessage>> Peek(int messagesCount)
    {
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        IEnumerable<CloudQueueMessage> messages = queueRef.PeekMessages(messagesCount < MaxMessageBlockAllowedByAzure ? messagesCount : MaxMessageBlockAllowedByAzure);
        return messages.Select(x => ConvertToQueueMessage(x));
    }

    public void Remove(QueueMessage<TMessage> queueMessage)
    {
        if (queueMessage == null)
        {
            throw new ArgumentNullException("queueMessage");
        }
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        queueRef.DeleteMessage(queueMessage.Id, queueMessage.PopReceipt);
    }

    public void Remove(IEnumerable<QueueMessage<TMessage>> queueMessages)
    {
        if (queueMessages == null)
        {
            throw new ArgumentNullException("queueMessages");
        }
        CloudQueue queueRef = queueClient.GetQueueReference(queueName);
        foreach (var queueMessage in queueMessages)
        {
            queueRef.DeleteMessage(queueMessage.Id, queueMessage.PopReceipt);
        }
    }

    protected virtual string SerializeObjectAsString(TMessage messageContent)
    {
        // a subclass can gzipr the message (GZipStream) where the serialized TMessage is > 8KB
        return JsonConvert.SerializeObject(messageContent);
    }

    protected virtual TMessage DeserializeObjectFromString(string messageContent)
    {
        // a subclass can de-gzip the message
        return JsonConvert.DeserializeObject<TMessage>(messageContent);
    }

    protected virtual QueueMessage<TMessage> ConvertToQueueMessage(CloudQueueMessage message)
    {
        if (message == null)
        {
            return null;
        }
        string messageContent = message.AsString;
        return new QueueMessage<TMessage>
               {
                   Id = message.Id,
                   PopReceipt = message.PopReceipt,
                   DequeueCount = message.DequeueCount,
                   InsertionTime = message.InsertionTime,
                   ExpirationTime = message.ExpirationTime,
                   NextVisibleTime = message.NextVisibleTime,
                   Data = DeserializeObjectFromString(messageContent)
               };
    }
}

as you can see it is basically a wrapper.

The AzureAccount class, present in the parameters-less constructor, is our static class to access all our storages accounts.

The JsonConverter is the class of the famous Newtonsoft.Json.

The more simple step is done, the next will be about the three parts of the consumer.