You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
RTL/src/app/shared/services/web-socket.service.ts

101 lines
3.3 KiB
TypeScript

import { Injectable, OnDestroy } from '@angular/core';
import { BehaviorSubject, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { WebSocketSubject } from 'rxjs/webSocket';
import { LoggerService } from '../../shared/services/logger.service';
import { SessionService } from './session.service';
@Injectable()
export class WebSocketClientService implements OnDestroy {
public clWSMessages: BehaviorSubject<any> = new BehaviorSubject(null);
public eclWSMessages: BehaviorSubject<any> = new BehaviorSubject(null);
public lndWSMessages: BehaviorSubject<any> = new BehaviorSubject(null);
private wsUrl = '';
private nodeIndex = '';
private socket: WebSocketSubject<any> | null;
private RETRY_SECONDS = 5;
private RECONNECT_TIMEOUT: any = null;
private unSubs: Array<Subject<void>> = [new Subject(), new Subject(), new Subject(), new Subject(), new Subject(), new Subject(), new Subject(), new Subject(), new Subject(), new Subject(), new Subject(), new Subject(), new Subject()];
constructor(private logger: LoggerService, private sessionService: SessionService) { }
connectWebSocket(finalWSUrl: string, nodeIndex: string) {
if (!this.socket || this.socket.closed) {
this.wsUrl = finalWSUrl;
this.nodeIndex = nodeIndex;
this.logger.info('Websocket Url: ' + this.wsUrl);
this.socket = new WebSocketSubject({
url: finalWSUrl,
protocol: [(this.sessionService.getItem('token') || ''), nodeIndex]
});
this.subscribeToMessages();
}
}
reconnectOnError() {
if (this.RECONNECT_TIMEOUT || (this.socket && !this.socket.closed)) { return; }
this.RETRY_SECONDS = (this.RETRY_SECONDS >= 160) ? 160 : (this.RETRY_SECONDS * 2);
this.RECONNECT_TIMEOUT = setTimeout(() => {
this.logger.info('Reconnecting Web Socket.');
this.connectWebSocket(this.wsUrl, this.nodeIndex);
this.RECONNECT_TIMEOUT = null;
}, this.RETRY_SECONDS * 1000);
}
closeConnection() {
if (this.socket) {
this.socket.complete();
this.socket = null;
}
}
private subscribeToMessages() {
this.socket?.pipe(takeUntil(this.unSubs[1])).subscribe({
next: (msg) => {
msg = (typeof msg === 'string') ? JSON.parse(msg) : msg;
if (msg.error) {
this.handleError(msg.error);
} else {
this.logger.info('Next Message from WS:' + JSON.stringify(msg));
switch (msg.source) {
case 'LND':
this.lndWSMessages.next(msg);
break;
case 'CLN':
this.clWSMessages.next(msg);
break;
case 'ECL':
this.eclWSMessages.next(msg);
break;
default:
break;
}
}
},
error: (err) => this.handleError(err),
complete: () => { this.logger.info('Web Socket Closed'); }
});
}
private handleError(err) {
this.logger.error(err);
this.clWSMessages.error(err);
this.eclWSMessages.error(err);
this.lndWSMessages.error(err);
this.reconnectOnError();
}
ngOnDestroy() {
this.closeConnection();
this.clWSMessages.next(null);
this.clWSMessages.complete();
this.eclWSMessages.next(null);
this.eclWSMessages.complete();
this.lndWSMessages.next(null);
this.lndWSMessages.complete();
}
}