Solving websockets subscriptions and disconnects

Solving websockets subscriptions and disconnects

I finally came around to solve an annoying issue in an EmberJS application that uses websockets to connect to RabbitMQ messaging. The issue is with disconnects after a network error and existing subscription to the channels. I had solved the principal reconnect issue before in a similar fashion as described here.

But as I also use channel subscriptions these active subscriptions should also be restored when the connection is restored...

The idea is that I track those channels subscription, so when an connection error happens I can simply add those a list of pendingSubscriptions.

import Ember from "ember";
import ENV from "../config/environment";
import webstomp from "npm:webstomp-client";

/* global SockJS */

export default Ember.Service.extend({
  account: Ember.inject.service(),
  store: Ember.inject.service(),
  client: null,
  identity: Ember.computed.alias('account.identity'),
  /**
   * Established subscriptions
   * {name: {connection: value, destination: destination} (on which a unsubscribe method is available)
   */
  establishedSubscriptions: {},
  /**
   * Subscriptions that have been requested but are not yet possible because a connection is available yet.
   * {name: destination}
   */
  pendingSubscriptions: {},

  /**
   * Start a connection
   */
  initialize: function () {
    this.connectAndReconnect(`https://${ENV.APP.mq.baseHost}/stomp`, () => {
      Ember.debug('Connected established...');
      this.subscribeToPendingSubscriptions();
    });

    this.requestSubscription('lab-plhw-identity/all', "/exchange/lab-plhw-identity/all");
  },
  subscribeToPendingSubscriptions: function () {
    if (!this.client.connected) {
      setTimeout(() => {
        this.subscribeToPendingSubscriptions();
      }, 1500);

      return;
    }

    let subscriptionName;

    for (subscriptionName in this.pendingSubscriptions) {
      if (this.pendingSubscriptions.hasOwnProperty(subscriptionName)) {
        /**
         * First unsubscribe from destination
         */
        if (!Ember.isNone(this.establishedSubscriptions[subscriptionName])) {
          this.establishedSubscriptions[subscriptionName]['connection'].unsubscribe();
          // remove established
          delete this.establishedSubscriptions[subscriptionName];
        }
        /**
         * Subscribe to destination
         */
        let destination = this.pendingSubscriptions[subscriptionName];
        Ember.debug('Subscribing to: ' + destination);
        this.establishedSubscriptions[subscriptionName] = {
          connection: this.client.subscribe(destination, this.onMessage.bind(this)),
          destination: destination
        };

        // remove pending
        delete this.pendingSubscriptions[subscriptionName];
      }
    }
  },
  requestSubscription(subscriptionName, destination) {
    this.pendingSubscriptions[subscriptionName] = destination;

    Ember.run.scheduleOnce('afterRender', this, this.subscribeToPendingSubscriptions);
  },
  onUserIdChanged: Ember.observer('identity.id', function () {
    this.requestSubscription('lab-plhw-identity/identity', "/exchange/lab-plhw-identity/" + this.get('identity.id'));
  }),

  /**
   * To enable debugging for SockJS execute this in the console and reload "localStorage.debug = '*';"
   *
   * @param socketUrl
   * @param successCallback
   */
  connectAndReconnect: function (socketUrl, successCallback) {
    Ember.debug('Connecting to: ' + socketUrl);

    let sock = new SockJS(socketUrl/*, {transports: ???}*/);

    this.client = webstomp.over(sock, {
      binary: false,
      heartbeat: {outgoing: 0, incoming: 0},
      debug: ENV.environment !== 'production'
    });

    this.client.connect(
      ENV.APP.mq.username,
      ENV.APP.mq.password,
      (frame) => {
        successCallback(frame);
      },
      (error) => {
        /**
         * Connection lost... Make sure that for existing subscriptions new subscriptions are requested
         */
        let subscriptionName;
        for (subscriptionName in this.establishedSubscriptions) {
          if (this.establishedSubscriptions.hasOwnProperty(subscriptionName)) {
            this.requestSubscription(subscriptionName, this.establishedSubscriptions[subscriptionName]['destination']);

            delete this.establishedSubscriptions[subscriptionName];
          }
        }

        Ember.debug('Connection error: ' + error.reason);
        setTimeout(() => {
          this.connectAndReconnect(socketUrl, successCallback);
        }, 1500);
      },
      ENV.APP.mq.vhost
    );
  },
  onMessage: function (message) {
    let payload, store = this.get('store');

    switch (message.headers['content-type']) {
      case 'application/json':
        payload = JSON.parse(message.body);
        break;

      default: // assume 'text/plain'
        payload = message.body;
    }

    Ember.debug('Recieved payload for: ' + message.headers['destination']);

    switch (message.headers['destination']) {
      case '/exchange/lab-plhw-identity/' + this.getWithDefault('identity.id', 'xxx'):
      case '/exchange/lab-plhw-identity/all':
        if (!Ember.isNone(payload.changeset)) {
          if (!Ember.isNone(payload.changeset['entity-insertions'])) {
            Ember.A(payload.changeset['entity-insertions']).forEach(function (mutationPayload) {
              const mutationData = JSON.parse(mutationPayload);
              store.pushPayload(mutationData);
            });
          }
          if (!Ember.isNone(payload.changeset['entity-updates'])) {
            Ember.A(payload.changeset['entity-updates']).forEach(function (mutationPayload) {
              const mutationData = JSON.parse(mutationPayload);
              store.pushPayload(mutationData);
            });
          }
          if (!Ember.isNone(payload.changeset['entity-deletions'])) {
            Ember.A(payload.changeset['entity-deletions']).forEach(function (mutationPayload) {
              const mutationData = JSON.parse(mutationPayload);
              let record = store.peekRecord(mutationData.data.type, mutationData.data.id);
              if (!Ember.isNone(record)) {
                record.unloadRecord();
              }
            });
          }
        }
        break;
      default:
        Ember.debug('payload not for me');
    }
  }
});