import { Observable, Subscription, pipe } from 'rxjs';
import { take, bufferCount, map, distinctUntilChanged, filter, scan } from 'rxjs/operators';

/**
 * Renderer classes are meant to extend Destroyable$ and use
 * Observable.prototype.safeSubscribe to avoid memory leaks due
 * to dangling observables not being unsubscribed from (especially fromEvents)
 */

export class Destroyable$ {
  constructor() {
    /* https://rxjs-dev.firebaseapp.com/api/index/class/Subscription */
    this.safeSubscriptions = new Subscription();
  }

  destroy() {
    this.safeSubscriptions.unsubscribe();
  }
}

export function safeSubscribe(instance, ...args) {
  if (!(instance instanceof Destroyable$)) throw new Error('safeSubscribe target instance must extend Destroyable$');
  const sub = this.subscribe(...args);
  instance.safeSubscriptions.add(sub);
  return sub;
}

Observable.prototype.safeSubscribe = safeSubscribe;

export function safeTake(n) {
  /* safeTake implements the take operator but skip its completion logic */
  return (source$) => new Observable((subscriber) => {
    const subscription = source$
      .pipe(take(n))
      .subscribe(
        (x) => subscriber.next(x),
        (e) => subscriber.error(e),
        () => { /* void complete */ }
      );

    return subscription;
  });
}

export function bufferConsecutiveEvents({ count, event }) {
  return pipe(
    bufferCount(count, 1),
    map((evts) => evts.reduce((val, evt) => val && evt === event, true)),
    distinctUntilChanged(),
    filter(Boolean)
  );
}

// it's really hard to test as it relies on a real dom element to change size
/* istanbul ignore next */
export function resizeObservable(elem) {
  return new Observable((subscriber) => {
    const ro = new ResizeObserver((entries) => subscriber.next(entries));

    ro.observe(elem);
    return function unsubscribe() {
      ro.unobserve(elem);
    };
  });
}

export function takeUntilWithLast(terminator$, lastValue) {
  return (source$) => new Observable((subscriber) => {
    terminator$.subscribe(
      () => {
        subscriber.next(lastValue);
        subscriber.complete();
      },
      (e) => subscriber.error(e),
      () => { /* void complete */ }
    );
    if (!subscriber.closed) { source$.subscribe(subscriber); }
  });
}

export function slidingBufferCount(size, defaultValue = null) {
  return pipe(
    scan(
      (buffer, current) => [...buffer.slice(1), current],
      new Array(size).fill(defaultValue)
    )
  );
}
