Coder Social home page Coder Social logo

bikashjoshi / event-stream-rxjs-pattern-demo Goto Github PK

View Code? Open in Web Editor NEW
0.0 0.0 0.0 1.16 MB

My own implementation of state management design pattern using RXJS in Angular. This demo provides sample usages along with unit tests practices to follow.

JavaScript 5.14% TypeScript 85.21% HTML 6.79% CSS 2.86%
state-management rxjs marble-testing async-pipes test-scheduling

event-stream-rxjs-pattern-demo's Introduction

EventStream Pattern with RXJS

A light weight state management using RXJS that allows state and data sharing among different components for an Angular application. This application demonstrates a pattern without using any specific framework for state management other than RXJS.

In addition, this demo application also demonstrates usage of one way binding along with transformations of streams using RXJS operators such as combineLatest, merge, distinctUntilChanged, switchMap and filter.

The application also provides sample usage of stream binding using Angular async pipe along with the marble unit tests and TestScheduler using RXJS testing.

Development server

Run ng serve for a dev server. Navigate to http://localhost:4200/.

Event Stream Interface

export interface EventStream<T> {
    event$: Observable<T>;
    emit(t: T): void;
}

Defining General Purpose Event Stream

export class EventStreamBase<T> implements EventStream<T> {
    private event: Subject<T>;
    private eventObservable: Observable<T>;

    constructor(t: T, private scheduler: SchedulerLike = asyncScheduler) {
        this.event = new BehaviorSubject<T>(t);
        this.eventObservable = this.event
                                .pipe(observeOn(scheduler))
                                .pipe(distinctUntilChanged())
                                .pipe(shareReplay(1));
    }

    get event$(): Observable<T> {
       return this.eventObservable;
    }

    emit(t: T): void {
        this.scheduler.schedule(_ => this.event.next(t));
    }
}

Defining Composite Event Stream

export interface TargetSales {
    perDay: number;
    perWeek: number;
    perMonth: number;
}

export class TargetSalesEventStream implements EventStream<TargetSales> {
    perDayEventStream: EventStreamBase<number>;
    perWeekEventStream: EventStreamBase<number>;
    perMonthEventStream: EventStreamBase<number>;

    constructor(private startWithTargetSales: TargetSales, private scheduler: SchedulerLike = asyncScheduler){
        this.perDayEventStream = new EventStreamBase(startWithTargetSales.perDay, scheduler);
        this.perWeekEventStream = new EventStreamBase(startWithTargetSales.perWeek, scheduler);
        this.perMonthEventStream = new EventStreamBase(startWithTargetSales.perMonth, scheduler);
    }

    get event$(): Observable<TargetSales> {
     const observable =  combineLatest([
            this.perDayEventStream.event$,
            this.perWeekEventStream.event$,
            this.perMonthEventStream.event$
        ], this.scheduler)
        .pipe(observeOn(this.scheduler))
        .pipe(filter(([x, y, z]) => x && y && z ? true : false))
        .pipe(map(([x, y, z]) => <TargetSales> {
            perDay: x,
            perWeek: y,
            perMonth: z
        }))
        .pipe(shareReplay(1))
        .pipe(startWith(this.startWithTargetSales));
        
     return observable;
    }

    emit(t: TargetSales): void {
        this.scheduler.schedule(_ => {
            this.perDayEventStream.emit(t.perDay);
            this.perWeekEventStream.emit(t.perWeek);
            this.perMonthEventStream.emit(t.perMonth);
        });
    }
}

Transformations with Pure Functions

export function computedTargetSalesStream(
    workingDaysPerWeekStream: Observable<number>,
    daysPerMonthStream: Observable<number>,
    targetDailySalesStream: Observable<number>,
    targetWeeklySalesStream: Observable<number>,
    targetMonthlySalesStream: Observable<number>,
    scheduler: SchedulerLike = asyncScheduler
  ): Observable<TargetSales> {
      
   const computedFromDailySalesStream: Observable<[number, number, number, TargetSalesType]>
    = combineLatest([
        workingDaysPerWeekStream,
        daysPerMonthStream,
        targetDailySalesStream
      ], scheduler)
      .pipe(filter(([x, y, z]) => x && y && z ? true : false))
      .pipe(map(([daysPerWeek, daysPerMonth, dailySales]) => 
                    [daysPerWeek, daysPerMonth, dailySales, TargetSalesType.Daily]));

    const combinedWeeklyAndMonthlySalesStream: Observable<[number, TargetSalesType]>
    = merge(
        targetWeeklySalesStream.pipe(map(x => [x, TargetSalesType.Weekly])),
        targetMonthlySalesStream.pipe(map(x => [x, TargetSalesType.Monthly])),
        scheduler)
        .pipe(map(([x, y]) => [x as number, y as TargetSalesType]));

    const combinedDailyAndWeeklyDaysStream: Observable<[number, number]>
        = combineLatest([
          workingDaysPerWeekStream,
          daysPerMonthStream],
          scheduler)
         .pipe(filter(([x, y]) => x && y ? true : false));

    const computedWeeklyAndMonthlySalesStream: Observable<[number, number, number, TargetSalesType]>
    = combinedWeeklyAndMonthlySalesStream
      .pipe(map(([targetSales, targetedSalesType]) => {
        return combinedDailyAndWeeklyDaysStream
                .pipe(take(1))
                .pipe(map(([daysPerWeek, daysPerMonth]) => 
                              [daysPerWeek, daysPerMonth, targetSales, targetedSalesType as TargetSalesType]));
      }))
      .pipe(switchMap((_) => _))
      .pipe(map(([a, b, c, d]) => [a as number, b as number, c as number, d as TargetSalesType]));

    const resultingStream
    = merge(
            computedFromDailySalesStream,
            computedWeeklyAndMonthlySalesStream,
            scheduler)
        .pipe(map(([daysPerWeek, daysPerMonth, targetSales, targetedSalesType]) =>
                    computeSalesTarget(daysPerWeek, daysPerMonth, targetSales, targetedSalesType)));

    return resultingStream
           .pipe(map(x => <TargetSales>{
             perDay: x[0],
             perWeek: x[1],
             perMonth: x[2]
           }));
  }

Unit Tests of Transformation function with RXJS Marble Testing

 it('it should compute weekly and monthly from provided daily target sales and days 
 config with daily rates rounded off.', () => {
       scheduler.run(({ expectObservable })  => {
           timer(2)
            .subscribe(_ => {
                workingDaysPerWeekEventStream.next(3);
                daysPerMonthStream.next(12);
                targetDailySalesStream.next(1.3);
            });

           const expectedTargetSales: TargetSales = {
                perDay: 1, // rounding
                perWeek: 1 * 3, // perDay * number of days a in week
                perMonth: 1 * 12 // perDay * number of days in a month
            };

           const expectedMarbles = '--a';
           const expectedValues = { a : expectedTargetSales };

           expectObservable(sut).toBe(expectedMarbles, expectedValues);
        });
       });

Defining Store Service for Component Level Composite EventStream

@Injectable()
export class ComponentStoreProviderService {
  mainEventStream: MainEventStream;
  constructor() {
    const defaultMain: Main = {
      daysConfig: {
        daysPerWeek: 6,
        daysPerMonth: 24
      },
      targetSales: {
        perDay: 1,
        perWeek: 6,
        perMonth: 24
      }
    };

    this.mainEventStream = new MainEventStream(defaultMain);
   }
}

Defining Store Service at Parent Component

@Component({
  selector: 'app-main',
  templateUrl: './main.component.html',
  styleUrls: ['./main.component.css'],
  providers: [ComponentStoreProviderService]
})
export class MainComponent implements OnInit, OnDestroy { 
  @Input()
  title: string;
  subscription: Subscription;
  constructor(private componentStoreProviderService: ComponentStoreProviderService) { }

  ngOnInit() {
    const mainStream = this.componentStoreProviderService.mainEventStream;
    const configStream = mainStream.daysConfigEventStream;
    const targetSalesStream = mainStream.targetSalesEventStream;
    const changedSalesStream = computedTargetSalesStream(
      configStream.daysPerWeekEventStream.event$,
      configStream.daysPerMonthEventStream.event$,
      targetSalesStream.perDayEventStream.event$,
      targetSalesStream.perWeekEventStream.event$,
      targetSalesStream.perMonthEventStream.event$
    );

    this.subscription = changedSalesStream
    .pipe(debounceTime(200))
    .subscribe(x => targetSalesStream.emit(x));
  }

  ngOnDestroy(): void {
    if (this.subscription) {
      this.subscription.unsubscribe();
    }
  }
}

Using async pipe with the intended event streams on the HTML template

<div class="sales-target-container">
        <span>Daily sales target:</span>
        <input type="number"[value]="targetSalesEventStream.perDayEventStream.event$ | async" 
               (change)="onDailyTargetChanges($event.target.value)"/>
</div>
 onDailyTargetChanges(change: number): void {
    this.targetSalesEventStream.perDayEventStream.emit(change);
  }

event-stream-rxjs-pattern-demo's People

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.