Amazon IoT mit MQTT

03.02.2016

Seit 2015 bietet Amazon auch eine Plattform für das Internet der Dinge auch IoT genannt an.

Amazon IoT beinhaltet eine Device Registration, eine Rule Engine und einem Public-Subscribe Gateway für Internet Devices. Diese Services können via HTTPs (Rest) und MQTT angesprochen werden. Zudem gibt es eine Library für C, JavaScript und eine Arduino Library.

Für zukünftige Kundenprojekte kann Amazon IoT natürlich eine gute Basis für verschiedene Use-Cases sein. Deshalb habe ich ein paar Test gemacht und geprüft, wie Amazon IoT unter Java angebunden werden kann. Hierzu habe ich ein simples Publish-Subscribe Pattern über ein Topic mit Java umgesetzt.

Zur Vorbereitung musste natürlich erst mal ein Device angelegt werden. Dies wird über die Webseite von Amazon IoT https://eu-west-1.console.aws.amazon.com/iot/home?region=eu-west-1#/dashboard gemacht.

aws_create_resource

Zuerst muss eine sogenannte Ressource angelegt werden (rot umrandet).

Danach wird ein „thing“ wie in der nachfolgenden Grafik dargestellt angelegt.

aws_create_thing

Zu guter Letzt sollte ein Device verbunden werden. Dies ist der letzte und gleich der wichtigste Schritt, denn dadurch werden die notwendigen Zertifikate für die Verbindung mit MQTT erstellt.

aws_connect_device

Hierbei nach „Connect a device“ einfach den Anweisungen folgen.

aws_confirm_start

Die Keys müssen heruntergeladen und danach mit „Confirm & start connecting“ bestätigt werden.

Jetzt geht es an das eigentliche Coding.

Download: amazon-iot.zip

package de.doubleslash.incubator.amazon;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.Security;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;

import org.bouncycastle.cert.X509CertificateHolder;
import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMDecryptorProvider;
import org.bouncycastle.openssl.PEMEncryptedKeyPair;
import org.bouncycastle.openssl.PEMKeyPair;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;

/**
 * The Class AmazonIoTDemo demonstrates a simple publish subscribe with MQTT and Amazon IoT.
 */
public class AmazonIoTDemo implements MqttCallback {

   /** The client. */
   MqttClient client;

   /** The root ca file. */
   final String rootCaFile = "src/main/resources/rootCA.pem";

   /** The client certificate file. */
   final String clientCertificateFile = "src/main/resources/certificate.pem.crt.txt";

   /** The client private key file. */
   final String clientPrivateKeyFile = "src/main/resources/private.pem.key";

   /** The mqtt broker url. */
   final String mqttBrokerUrl = "ssl://XXXXXXXXXX.iot.eu-west-1.amazonaws.com:8883";

   /** The client id. */
   final String clientId = "clientid";

   /** The topic name. */
   final String topicName = "$aws/things/TestThing/shadow/update";

   /**
    * The main method.
    *
    * @param args
    *           the arguments
    */
   public static void main(final String[] args) {
      new AmazonIoTDemo().publishSubscribe();
   }

   /**
    * Publish subscribe.
    */
   public void publishSubscribe() {
      try {
         final MqttConnectOptions comqttConnectOptions = new MqttConnectOptions();
         comqttConnectOptions.setCleanSession(true);
         try {
            comqttConnectOptions.setSocketFactory(createSslSocketFactory("password"));
         } catch (final Exception e) {
            e.printStackTrace();
         }
         client = new MqttClient(mqttBrokerUrl, "AmazonIoTDemo");

         client.connect(comqttConnectOptions);
         client.setCallback(this);

         subscribe();
         publish(10);

         disconnect();
      } catch (final MqttException e) {
         e.printStackTrace();
      } catch (final InterruptedException e) {
         e.printStackTrace();
      }
   }

   private static Object loadFromFile(final String file) throws IOException {
      try (PEMParser parser = createPemParser(file)) {
         return parser.readObject();
      }
   }

   private static PEMParser createPemParser(final String file) throws IOException {
      return new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(file)))));
   }

   /**
    * Publishes an amount of messages.
    *
    * @param numOfMessages
    *           the num of messages
    * @throws MqttException
    *            the mqtt exception
    * @throws MqttPersistenceException
    *            the mqtt persistence exception
    */
   private void publish(final int numOfMessages) throws MqttException, MqttPersistenceException {
      final MqttMessage message = new MqttMessage();
      message.setPayload("A single message from my computer fff".getBytes());

      for (int j = 0; j < numOfMessages; j++) {
         message.setPayload(("" + System.currentTimeMillis()).getBytes());
         client.publish(topicName, message);
      }
   }

   /**
    * Subscribes to topic.
    *
    * @throws MqttException
    *            the mqtt exception
    */
   private void subscribe() throws MqttException {
      client.subscribe(topicName);
   }

   /**
    * Disconnect client.
    *
    * @throws InterruptedException
    *            the interrupted exception
    * @throws MqttException
    *            the mqtt exception
    */
   private void disconnect() throws InterruptedException, MqttException {
      Thread.sleep(1000); // wait for all messsages
      client.disconnect();
   }

   /*
    * (non-Javadoc)
    * @see org.eclipse.paho.client.mqttv3.MqttCallback#connectionLost(java.lang.Throwable)
    */
   @Override
   public void connectionLost(final Throwable cause) {}

   /*
    * (non-Javadoc)
    * @see org.eclipse.paho.client.mqttv3.MqttCallback#messageArrived(java.lang.String,
    * org.eclipse.paho.client.mqttv3.MqttMessage)
    */
   @Override
   public void messageArrived(final String topic, final MqttMessage message) throws Exception {
      final long messsageTime = Long.parseLong(new String(message.getPayload()));
      System.out
            .println("Delivering message " + message + " took " + (System.currentTimeMillis() - messsageTime) + " ms.");
   }

   /*
    * (non-Javadoc)
    * @see
    * org.eclipse.paho.client.mqttv3.MqttCallback#deliveryComplete(org.eclipse.paho.client.mqttv3.IMqttDeliveryToken)
    */
   @Override
   public void deliveryComplete(final IMqttDeliveryToken token) {}

   /**
    * Creates the ssl socket factory.
    *
    * @param password
    *           the password
    * @return the SSL socket factory
    * @throws InvalidPathException
    *            the invalid path exception
    * @throws IOException
    *            Signals that an I/O exception has occurred.
    * @throws KeyStoreException
    *            the key store exception
    * @throws NoSuchAlgorithmException
    *            the no such algorithm exception
    * @throws CertificateException
    *            the certificate exception
    * @throws UnrecoverableKeyException
    *            the unrecoverable key exception
    * @throws KeyManagementException
    *            the key management exception
    * @throws Exception
    *            the exception
    */
   private SSLSocketFactory createSslSocketFactory(final String password)
         throws InvalidPathException, IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException,
         UnrecoverableKeyException, KeyManagementException, Exception {

      Security.addProvider(new BouncyCastleProvider());

      // CACert and Cert from Client
      final X509CertificateHolder caCert = (X509CertificateHolder) loadFromFile(this.rootCaFile);
      final X509CertificateHolder cert = (X509CertificateHolder) loadFromFile(this.clientCertificateFile);

      // Private-Key from Client.
      final Object obj = loadFromFile(this.clientPrivateKeyFile);
      KeyPair key = null;
      JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");

      if (obj instanceof PEMEncryptedKeyPair) {
         final PEMDecryptorProvider decProv = new JcePEMDecryptorProviderBuilder().build(password.toCharArray());
         converter = new JcaPEMKeyConverter().setProvider("BC");
         key = converter.getKeyPair(((PEMEncryptedKeyPair) obj).decryptKeyPair(decProv));
      } else {
         key = converter.getKeyPair((PEMKeyPair) obj);
      }

      final JcaX509CertificateConverter certConverter = new JcaX509CertificateConverter();
      certConverter.setProvider("BC");

      // Use the CA certificate to authenticate the server
      final KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
      caKs.load(null, null);
      caKs.setCertificateEntry("ca-certificate", certConverter.getCertificate(caCert));

      final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
      tmf.init(caKs);

      // Send the client key and certificate to server
      final KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
      ks.load(null, null);
      ks.setCertificateEntry("certificate", certConverter.getCertificate(cert));
      ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(), new java.security.cert.Certificate[] {
            certConverter.getCertificate(cert)
      });
      final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
      kmf.init(ks, password.toCharArray());

      final SSLContext context = SSLContext.getInstance("TLSv1.2");
      context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
      return context.getSocketFactory();
   }

}

Die einzutragenden Informationen können auf der Amazon IoT Webseite entnommen werden.

aws_thing_details

Die Broker URL muss angepasst werden. Diese besteht aus der Domain beginnend mit dem Protokoll „ssl://“ anstelle von „https://“.

Die Zertifikate wurden bereits heruntergeladen und müssen nach „src/main/resources“ kopiert werden. Die Dateinamen sind im Demo entsprechend anzupassen.

Fazit: Das Ansprechen der IoT Plattform funktioniert sehr gut. Erste Messungen ergaben eine Zustellungszeit von ca. 100ms im Schnitt für das Rechenzentrum Irland. Bei Verwendung des Rechenzentrums in Oregon wurden im Schnitt ca. 220ms gemessen.

Hinweis: Die Verwendung von Amazon IoT ist nur für die ersten 250000 Nachrichten kostenfrei. Danach werden pro 1000000 Nachrichten 5$ fällig.

Zurück zur Übersicht

Kommentar verfassen

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert

*Pflichtfelder

*