bigbuffet-rw/app/soapbox/stream.js

104 lines
2.5 KiB
JavaScript
Raw Normal View History

2020-03-27 13:59:38 -07:00
'use strict';
import WebSocketClient from '@gamestdio/websocket';
2021-03-24 15:53:09 -07:00
import { getAccessToken } from 'soapbox/utils/auth';
2020-03-27 13:59:38 -07:00
const randomIntUpTo = max => Math.floor(Math.random() * Math.floor(max));
export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) {
return (dispatch, getState) => {
const streamingAPIBaseURL = getState().getIn(['instance', 'urls', 'streaming_api']);
2021-03-24 15:53:09 -07:00
const accessToken = getAccessToken(getState());
2020-03-27 13:59:38 -07:00
const { onConnect, onDisconnect, onReceive } = callbacks(dispatch, getState);
let polling = null;
const setupPolling = () => {
pollingRefresh(dispatch, () => {
polling = setTimeout(() => setupPolling(), 20000 + randomIntUpTo(20000));
});
};
const clearPolling = () => {
if (polling) {
clearTimeout(polling);
polling = null;
}
};
let subscription;
2020-03-27 13:59:38 -07:00
// If the WebSocket fails to be created, don't crash the whole page,
// just proceed without a subscription.
try {
subscription = getStream(streamingAPIBaseURL, accessToken, path, {
connected() {
if (pollingRefresh) {
clearPolling();
}
onConnect();
},
2020-03-27 13:59:38 -07:00
disconnected() {
if (pollingRefresh) {
polling = setTimeout(() => setupPolling(), randomIntUpTo(40000));
}
2020-03-27 13:59:38 -07:00
onDisconnect();
},
2020-03-27 13:59:38 -07:00
received(data) {
onReceive(data);
},
2020-03-27 13:59:38 -07:00
reconnected() {
if (pollingRefresh) {
clearPolling();
pollingRefresh(dispatch);
}
2020-03-27 13:59:38 -07:00
onConnect();
},
2020-03-27 13:59:38 -07:00
});
} catch (e) {
console.error(e);
}
2020-03-27 13:59:38 -07:00
const disconnect = () => {
if (subscription) {
subscription.close();
}
clearPolling();
};
return disconnect;
};
}
export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) {
const params = [ `stream=${stream}` ];
const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken);
ws.onopen = connected;
ws.onclose = disconnected;
ws.onreconnect = reconnected;
ws.onmessage = (e) => {
if (!e.data) return;
try {
received(JSON.parse(e.data));
} catch(error) {
console.error(e);
2020-04-14 11:44:40 -07:00
console.error(`Could not parse the above streaming event.\n${error}`);
}
2020-04-14 11:44:40 -07:00
};
2020-03-27 13:59:38 -07:00
return ws;
2021-08-03 12:22:51 -07:00
}