import { defer, MonoTypeOperatorFunction, Observable, of } from 'rxjs';
import { shareReplay, switchMap } from 'rxjs/operators';

/**
 * On subscription to the source, callback function will be executed only once and
 * the source Observable will continue to operate normally afterwards.
 * ## Example
 * ### Start loading before request is sent to the network
 * ```ts
 * this.api.getData().pipe(
 *   startWithTap(() => this.loading = true), // start loading on subscription
 *   finalize(() => this.loading = false), // stop loading on completion
 * )
 * ```
 * @param callback function to be executed on subscription
 * @return The same source Observable that starts with executing callback
 * function on subscription
 */
export function startWithTap<T>(callback: () => void): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) => defer(() => {
    callback();
    return source;
  });
}

/**
 * This operator caches http request with the help of `shareReplay`. New http request is issued
 * when new subscriber arrives after specified cache periond.
 * @warning This operator will only refresh value for the new subscribers
 * @param cachePeriod HTTP Request after cache period (in milliseconds) will be requested again
 */
export function cache<T>(cachePeriod: number): MonoTypeOperatorFunction<T> {
  let cacheTime: Date;
  let cached: Observable<T>;

  const isAfterCachePeriod = (): boolean => {
    const now = new Date();
    const diffMinutes = (now.getTime() - cacheTime.getTime());
    return diffMinutes > cachePeriod;
  };

  const updateCache = (src: Observable<T>) => {
    cacheTime = new Date();
    cached = src.pipe(
      shareReplay({ refCount: true, bufferSize: 1 }),
    );
  };

  return (src: Observable<T>) => {
    if (!cached) {
      updateCache(src);
    }

    return of({}).pipe(
      switchMap(() => {
        if (isAfterCachePeriod()) {
          updateCache(src);
        }

        return cached;
      }),
    );
  };
}
