Solving websockets subscriptions and disconnects
I finally came around to solve an annoying issue in an EmberJS application that uses websockets to connect to RabbitMQ messaging. The issue is with disconnects after a network error and existing subscription to the channels. I had solved the principal reconnect issue before in a similar fashion as described here.
But as I also use channel subscriptions these active subscriptions should also be restored when the connection is restored...
The idea is that I track those channels subscription, so when an connection error happens I can simply add those a list of pendingSubscriptions.
import Ember from "ember";
import ENV from "../config/environment";
import webstomp from "npm:webstomp-client";
/* global SockJS */
export default Ember.Service.extend({
account: Ember.inject.service(),
store: Ember.inject.service(),
client: null,
identity: Ember.computed.alias('account.identity'),
/**
* Established subscriptions
* {name: {connection: value, destination: destination} (on which a unsubscribe method is available)
*/
establishedSubscriptions: {},
/**
* Subscriptions that have been requested but are not yet possible because a connection is available yet.
* {name: destination}
*/
pendingSubscriptions: {},
/**
* Start a connection
*/
initialize: function () {
this.connectAndReconnect(`https://${ENV.APP.mq.baseHost}/stomp`, () => {
Ember.debug('Connected established...');
this.subscribeToPendingSubscriptions();
});
this.requestSubscription('lab-plhw-identity/all', "/exchange/lab-plhw-identity/all");
},
subscribeToPendingSubscriptions: function () {
if (!this.client.connected) {
setTimeout(() => {
this.subscribeToPendingSubscriptions();
}, 1500);
return;
}
let subscriptionName;
for (subscriptionName in this.pendingSubscriptions) {
if (this.pendingSubscriptions.hasOwnProperty(subscriptionName)) {
/**
* First unsubscribe from destination
*/
if (!Ember.isNone(this.establishedSubscriptions[subscriptionName])) {
this.establishedSubscriptions[subscriptionName]['connection'].unsubscribe();
// remove established
delete this.establishedSubscriptions[subscriptionName];
}
/**
* Subscribe to destination
*/
let destination = this.pendingSubscriptions[subscriptionName];
Ember.debug('Subscribing to: ' + destination);
this.establishedSubscriptions[subscriptionName] = {
connection: this.client.subscribe(destination, this.onMessage.bind(this)),
destination: destination
};
// remove pending
delete this.pendingSubscriptions[subscriptionName];
}
}
},
requestSubscription(subscriptionName, destination) {
this.pendingSubscriptions[subscriptionName] = destination;
Ember.run.scheduleOnce('afterRender', this, this.subscribeToPendingSubscriptions);
},
onUserIdChanged: Ember.observer('identity.id', function () {
this.requestSubscription('lab-plhw-identity/identity', "/exchange/lab-plhw-identity/" + this.get('identity.id'));
}),
/**
* To enable debugging for SockJS execute this in the console and reload "localStorage.debug = '*';"
*
* @param socketUrl
* @param successCallback
*/
connectAndReconnect: function (socketUrl, successCallback) {
Ember.debug('Connecting to: ' + socketUrl);
let sock = new SockJS(socketUrl/*, {transports: ???}*/);
this.client = webstomp.over(sock, {
binary: false,
heartbeat: {outgoing: 0, incoming: 0},
debug: ENV.environment !== 'production'
});
this.client.connect(
ENV.APP.mq.username,
ENV.APP.mq.password,
(frame) => {
successCallback(frame);
},
(error) => {
/**
* Connection lost... Make sure that for existing subscriptions new subscriptions are requested
*/
let subscriptionName;
for (subscriptionName in this.establishedSubscriptions) {
if (this.establishedSubscriptions.hasOwnProperty(subscriptionName)) {
this.requestSubscription(subscriptionName, this.establishedSubscriptions[subscriptionName]['destination']);
delete this.establishedSubscriptions[subscriptionName];
}
}
Ember.debug('Connection error: ' + error.reason);
setTimeout(() => {
this.connectAndReconnect(socketUrl, successCallback);
}, 1500);
},
ENV.APP.mq.vhost
);
},
onMessage: function (message) {
let payload, store = this.get('store');
switch (message.headers['content-type']) {
case 'application/json':
payload = JSON.parse(message.body);
break;
default: // assume 'text/plain'
payload = message.body;
}
Ember.debug('Recieved payload for: ' + message.headers['destination']);
switch (message.headers['destination']) {
case '/exchange/lab-plhw-identity/' + this.getWithDefault('identity.id', 'xxx'):
case '/exchange/lab-plhw-identity/all':
if (!Ember.isNone(payload.changeset)) {
if (!Ember.isNone(payload.changeset['entity-insertions'])) {
Ember.A(payload.changeset['entity-insertions']).forEach(function (mutationPayload) {
const mutationData = JSON.parse(mutationPayload);
store.pushPayload(mutationData);
});
}
if (!Ember.isNone(payload.changeset['entity-updates'])) {
Ember.A(payload.changeset['entity-updates']).forEach(function (mutationPayload) {
const mutationData = JSON.parse(mutationPayload);
store.pushPayload(mutationData);
});
}
if (!Ember.isNone(payload.changeset['entity-deletions'])) {
Ember.A(payload.changeset['entity-deletions']).forEach(function (mutationPayload) {
const mutationData = JSON.parse(mutationPayload);
let record = store.peekRecord(mutationData.data.type, mutationData.data.id);
if (!Ember.isNone(record)) {
record.unloadRecord();
}
});
}
}
break;
default:
Ember.debug('payload not for me');
}
}
});