Coder Social home page Coder Social logo

akanass / rx-socket-client Goto Github PK

View Code? Open in Web Editor NEW
11.0 2.0 3.0 312 KB

Reconnectable websocket client with RxJS Subject

License: MIT License

Makefile 1.38% TypeScript 98.07% JavaScript 0.55%
websocket observable reconnection client subject rxjs7

rx-socket-client's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

rx-socket-client's Issues

Reconnecting does not work

Hi,

I have tried your code and the reconnecting does not work for me.

I have added come console logs. Here is my finally class:

import { interval, Observable, Observer, Subject, Subscriber, Subscription } from 'rxjs';
import { distinctUntilChanged, filter, finalize, map, share, takeWhile } from 'rxjs/operators';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';

import { Utils } from '../utils';

export class TestWebsocketSubject extends Subject<any> {
    // Subject for connection status stream
    public _connectionStatus: Subject<boolean>;

    // Observable for reconnection stream
    private reconnectionObservable: Observable<number>;
    // WebSocketSubjectConfig instance
    private wsSubjectConfig: WebSocketSubjectConfig<any>;
    // WebSocketSubject instance
    private socket: WebSocketSubject<any>;
    // Socket Subscription
    private socketSubscription: Subscription;
    // Reconnection Subscription
    private reconnectionSubscription: Subscription;
    // Reconnect interval
    private reconnectInterval: number;

    public constructor( url: string, reconnectInterval: number = 5000 ) {
        super();

        this.reconnectInterval = reconnectInterval;

        this._connectionStatus = new Subject<boolean>();

        this.wsSubjectConfig = {
            closeObserver: {
                next: ( e: CloseEvent ): void => {
                    console.log( 'closeObserver' );
                    this.cleanSocket();
                    this._connectionStatus.next( false );
                }
            },
            openObserver: {
                next: ( e: Event ): void => {
                    console.log( 'openObserver' );
                    this._connectionStatus.next( true );
                }
            },
            url: url
        };

        this.connect();

        this.connectionStatus.subscribe( ( isConnected: boolean ) => {
            console.log( 'connectionStatus', isConnected );
            if ( !this.reconnectionObservable && !isConnected ) {
                this.reconnect();
            }
        } );
    }

    public get connectionStatus(): Observable<boolean> {
        return this._connectionStatus
                   .pipe(
                       distinctUntilChanged()
                   );
    }

    public send( data: any ): void {
        this.socket.next( data );
    }

    private on( event: string | 'close', cb: ( data?: any ) => void ): void {
        this._message$( event )
            .subscribe(
                ( message: any ): void => cb( message.data ),
                () => undefined,
                (): void => {
                    if ( event === 'close' ) {
                        cb();
                    }
                }
            );
    }

    private on$( event: string ): Observable<any> {
        return this._message$( event )
                   .pipe(
                       map( ( m: any ) => m.data )
                   );
    }

    private onClose$(): Observable<void> {
        return new Observable( ( observer: Subscriber<any> ): void => {
            this.subscribe(
                /* istanbul ignore next */
                () => undefined,
                /* istanbul ignore next */
                () => undefined,
                () => {
                    observer.next();
                    observer.complete();
                } );
        } );
    }

    private emit( event: string, data: any ): void {
        this.send( { event: event, data: data } );
    }

    private _message$( event: string | 'close' ): Observable<any> {
        return this
            .pipe(
                map( ( message: any ): any =>
                    ( message.type && message.type === 'utf8' && message.utf8Data ) ?
                        message.utf8Data :
                        message
                ),
                filter( ( message: any ): boolean =>
                    message.event &&
                    message.event !== 'close' &&
                    message.event === event &&
                    message.data
                )
            );
    }

    private cleanSocket(): void {
        console.log( 'cleanSocket' );
        if ( this.socketSubscription ) {
            this.socketSubscription.unsubscribe();
        }
        this.socket = undefined;
    }

    private cleanReconnection(): void {
        console.log( 'cleanReconnection' );
        if ( this.reconnectionSubscription ) {
            this.reconnectionSubscription.unsubscribe();
        }
        this.reconnectionObservable = undefined;
    }

    private connect(): void {
        console.log( 'connect' );
        this.socket = new WebSocketSubject( this.wsSubjectConfig );
        this.socketSubscription = this.socket.subscribe(
            ( m: any ) => {
                console.log( 'message', m );
                this.next( m );
            },
            () => {
                console.log( 'error in connect' );
                if ( !this.socket ) {
                    this.cleanReconnection();
                    this.reconnect();
                }
            }
        );
    }

    private reconnect(): void {
        console.log( 'reconnect' );
        this.reconnectionObservable = interval( this.reconnectInterval )
            .pipe(
                takeWhile( ( v: any, index: number ) => {
                    console.log( 'should be reconnected?', !Utils.exists( this.socket ) );

                    return !this.socket;
                } )
            );

        this.reconnectionSubscription = this.reconnectionObservable.subscribe(
            () => {
                console.log( 'try to connect again' );
                this.connect();
            },
            () => undefined,
            () => {
                console.log( 'reconnection, complete' );
                this.cleanReconnection();
                if ( !this.socket ) {
                    this.complete();
                    this._connectionStatus.complete();
                }
            }
        );
    }
}

And the output is:

15:16:02.026 test-websocket-subject.ts:126 cleanSocket
15:16:02.027 test-websocket-subject.ts:52 connectionStatus false
15:16:02.027 test-websocket-subject.ts:160 reconnect
15:16:07.029 test-websocket-subject.ts:164 should be reconnected? true
15:16:07.030 test-websocket-subject.ts:172 try to connect again
15:16:07.030 test-websocket-subject.ts:142 connect
15:16:12.028 test-websocket-subject.ts:164 should be reconnected? false
15:16:12.028 test-websocket-subject.ts:177 reconnection, complete
15:16:12.029 test-websocket-subject.ts:134 cleanReconnection
15:16:13.440 WebSocketSubject.js:91 WebSocket connection to 'wss://localhost:8000/ws?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJDb3JlRWRpdGlvblVzZXIiLCJ1c2VyU2Vzc2lvbklkIjoxMTIsImlhdCI6MTU3MTQwNDU1Mn0.N8Sd-OxmidZhFpqSyEl36voXFL1VTytBYKA21BzyKcY&collection_center_id=3' failed: Error in connection establishment: net::ERR_CONNECTION_REFUSED
15:16:13.441 test-websocket-subject.ts:150 error in connect
15:16:13.442 test-websocket-subject.ts:35 closeObserver
15:16:13.442 test-websocket-subject.ts:126 cleanSocket

I have rxjs 6.5.3 and I use hapi-websocket-plugin in my backend.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.