import { Inject, Injectable } from '@angular/core';
import { Broadcaster, NgXCable } from 'ngx-cable';
import { ReplaySubject, Subject, timer } from 'rxjs';
import { delayWhen, distinctUntilChanged, filter } from 'rxjs/operators';

import { BackendNotification } from './types/backend-notification.model';

@Injectable({
  providedIn: 'root',
})
export class Cable {
  private channels = {};
  private lastConnected = false;
  private lastConnectedAt: Date;
  private connected$ = new ReplaySubject(1);
  private stale$ = new Subject();

  constructor(
    protected cable: NgXCable,
    public broadcaster: Broadcaster,
    @Inject('Flash') private flash,
  ) {
    const cableUrl = document.head
      .querySelector('meta[name=action-cable-url]')
      .getAttribute('content');
    this.cable.connect(cableUrl);

    this.channels['UpdatesChannel'] = this.cable.subscribe({
      channel: 'UpdatesChannel',
      room: '',
    });
    this.channels['Ng1UpdatesChannel'] = this.cable.subscribe({
      channel: 'Ng1UpdatesChannel',
      room: '',
    });
    this.channels['NotificationsChannel'] = this.cable.subscribe({
      channel: 'NotificationsChannel',
      room: '',
    });
    this.channels['AccountNotificationsChannel'] = this.cable.subscribe({
      channel: 'AccountNotificationsChannel',
      room: '',
    });

    this.connected$.next(this.lastConnected);

    setInterval(() => {
      const connected = this.cable.isOpen();

      if (connected !== this.lastConnected) {
        this.lastConnected = connected;
        this.connected$.next(this.lastConnected);
      }

      if (connected && this.lastConnectedAt) {
        const now = new Date();

        if (now.getTime() - this.lastConnectedAt.getTime() > 5 * 60 * 1000) {
          console.warn('Connection stale - refreshing');
          this.stale$.next(true);
        } else {
          this.stale$.next(false);
        }

        // custom stale event to test and trigger stale state
        if (window['stale']) {
          window['stale'] = false;
          this.stale$.next(true);
        }
      }

      if (connected) {
        this.lastConnectedAt = new Date();
      }
    }, 150);

    this.connectionStale().subscribe(() => {
      this.flash.create('Lokale Daten werden neu geladen...', 'warning', 6000);
    });

    this.broadcaster
      .on('NotificationsChannel')
      .subscribe((msg: BackendNotification | null) => {
        if (msg?.message) {
          this.flash.create(
            msg.message,
            msg.type,
            msg.type === 'error' ? 0 : 6000,
          );
        }
      });

    this.broadcaster
      .on('AccountNotificationsChannel')
      .subscribe((msg: BackendNotification | null) => {
        if (msg?.message) {
          this.flash.create(msg.message, msg.type, 0);
        }
      });
  }

  subscribe(func, channel = 'UpdatesChannel') {
    return this.broadcaster.on(channel).subscribe(func);
  }

  send(data, channel = 'UpdatesChannel') {
    return this.cable.send(data, [this.channels[channel]]);
  }

  connected() {
    return this.connected$;
  }

  triggerStale() {
    this.stale$.next(true);
  }

  connectionStale() {
    return this.stale$.pipe(
      distinctUntilChanged(),
      filter((staleState: boolean) => staleState),
      delayWhen(() => timer(Math.random() * 5 * 1000)),
    );
  }
}
