import { interval, Observable, of, OperatorFunction, throwError, pipe, from } from 'rxjs';
import { concatMap, first, switchMap, map, mergeMap, toArray } from 'rxjs/operators';
import { Operation, OperationStatus, OperationResultResponse } from '../../models/operations';
import { OperationsService } from '../operations/operations.service';

const POLLING_INTERVAL = 300;

/* Class of Error for failed `Operations`, which takes in an a failed operation
 and also returns the data of the failed operation for later processing */
export class AsyncOperationsError extends Error {
  name: string;
  failedOperation: Operation<unknown>;

  constructor(failedOperation: Operation<unknown>) {
    super(failedOperation.error?.code);
    this.name = 'AsyncOperationsError';
    this.failedOperation = failedOperation;
  }
}

/**
 * Clients should use `emitOperationResultsWhenComplete` when application flow must be blocked until _all async operations complete.
 * This method polls the backend for the `OperationsStatus` for all operationIds passed to it.
 * It will only emit once, and only when all async operations for operationIds have finished (completed or failed).
 *
 * This method will NOT return an error if the underlying async operations fails - it's up to the client to check the status of
 * the underlying async operation.
 *
 * NOTE: clients should import this function and partially apply the service in their constructor or OnInit
 * e.g. `pollOperationsSync = partial(emitOperationResultsWhenComplete, this.operationsService)`
 */
export const emitOperationResultsWhenComplete = <T>(
  operationsService: OperationsService,
  operationIds: string[]
): Observable<Operation<T>[]> => {
  // special case for when we are passed a list of empty operationIds
  if (operationIds.length === 0) {
    return of([]);
  }

  return interval(POLLING_INTERVAL).pipe(
    concatMap(() => operationsService.getOperationStatuses<T>(operationIds)),
    first(operations => !operations.some(operation => operation.status === OperationStatus.IN_PROGRESS))
  );
};

/**
 * custom rxJS operator for streams where we must process async  `Operations` returned from the backend.
 * if the `Operation#status` equals `FAILURE_UNACKNOWLEDGED | FAILURE_ACKNOWLEDGED` this function will throw an error
 * clients should pass a call back to map the results of a successful operation to type<T>
 *   e.g. `processOperationResponseOrThrowError$((operation) => new Key().deserialize(operation.result)),`
 *
 */
export function processOperationResponseOrThrowError$<T, R>(callback: (o: Operation<R>) => T): OperatorFunction<Operation<R>, (T | never)> {
  return switchMap((operation) => {
    if (operation.status === OperationStatus.FAILURE_UNACKNOWLEDGED || operation.status === OperationStatus.FAILURE_ACKNOWLEDGED) {
      return throwError(new AsyncOperationsError(operation));
    } else {
      return of(callback(operation));
    }
  });
}

function isOperationObjectKey<T>(o: Operation<T>, key: string):
  key is keyof Operation<T> {
  return key in o;
}

/** custom operator function that creates an array of observables from takes in a set of operation ids,
 *  creates an array of observables from the array of operation IDs,
 *  calls `projection` for every inner observable
 *  and merges results of inner observables back into a single array
 *  NOTE: mergeMap will only emit when the inner Observable **completes**
 *  NOTE: `projection` should not return observables that don't complete
 */
export function forAllOperations$<T>(projection: (operationId: string) => Observable<T>): OperatorFunction<OperationResultResponse, T[]> {
  return pipe(
    switchMap(({ operationIds }) => from(operationIds)),
    mergeMap(projection),
    toArray()
  );
}
