import { StatusCodes } from "http-status-codes";

import { delay } from "./delay";
import { AppReqResponse } from "../req";

const MAX_RETRIES = 4;
const BREATHING_BETWEEN_SUCCESS = 500;
const BREATHING_BETWEEN_RETRIES = 4000;

export interface IStreamMeta {
  batchStartTime: Date;
}

export async function paginatedRequestStream<ResType, KeyType, AdditionalDataType = undefined>(
  callWithCursor: (currentKey: KeyType | null) => Promise<AppReqResponse<ResType>>,
  accessCursor: (res: ResType) => KeyType | null,
  onResponse: (
    response: ResType,
    currentKey: KeyType | null,
    additionalData: AdditionalDataType
  ) => Promise<void>,
  startingCursor: KeyType | null = null,
  additionalData?: AdditionalDataType,
  parallelize = true
): Promise<AppReqResponse<null>> {
  let isDone = false;
  let currentCursor: KeyType | null = startingCursor;
  let retryCount = 0;
  let delayBetweenRetries = BREATHING_BETWEEN_RETRIES;

  const onResponseErrors: any[] = [];
  const onResponsePromiseQueue: Promise<void>[] = [];

  do {
    const response = await callWithCursor(currentCursor);
    switch (response.type) {
      case "error": {
        console.error(new Error(`${response.status} ${response.message}`));
        console.log("error on paginated stream, retrying");

        retryCount++; // will retry up to MAX_RETRIES

        // Wait a bit before next retry, use random to stagger requests that arrive at the same
        // time (this typically happens with webhooks)
        await delay(delayBetweenRetries + BREATHING_BETWEEN_RETRIES * Math.random());
        // Exponentially increase delay for the next attempt
        delayBetweenRetries *= 2;

        break;
      }
      case "success": {
        // wait for side effect of response
        const emitPromise = onResponse(
          response.body,
          currentCursor,
          additionalData as AdditionalDataType // confused how to not need "as"
        ).catch(error => {
          onResponseErrors.push(error);
        });
        if (parallelize) {
          onResponsePromiseQueue.push(emitPromise);
        } else {
          await emitPromise;
        }
        const prevCursor = currentCursor;
        currentCursor = accessCursor(response.body);

        // done when there is no next cursor (needs to be separate variable in case first call fails)
        if (!currentCursor || currentCursor === prevCursor) {
          isDone = true;
        } else {
          // site can end up not showing up if too many requests made
          await delay(BREATHING_BETWEEN_SUCCESS);
        }
        break;
      }
    }
  } while (!isDone && retryCount < MAX_RETRIES);

  await Promise.all(onResponsePromiseQueue);

  if (onResponseErrors.length) {
    // @ts-ignore
    console.error(onResponseErrors);
    return {
      type: "error",
      status: StatusCodes.BAD_REQUEST,
      message: `failure of consumer: ${JSON.stringify(onResponseErrors)}`,
    };
  }

  if (retryCount >= MAX_RETRIES) {
    return {
      type: "error",
      status: StatusCodes.BAD_REQUEST,
      message: "too many failures",
    };
  }

  return {
    type: "success",
    body: null,
  };
}
