import { EventEmitter } from 'events';
import pRetry, { type FailedAttemptError, type Options as RetryOptions } from 'p-retry';
import { asSequence } from 'sequency';

import { parseResponse } from '@/core/parsing/parseResponse.ts';
import {
  getActionQueryRequest,
  getRegisterQueryRequest,
  getUnregisterQueryRequest,
} from '@/core/query/activePivortRequests.ts';
import { queryGenerator } from '@/core/queryIterator/queryIterator.ts';
import type { QueryGroup, WebSocketConnection, WsEventEmitter } from '@/core/webSocket/types.ts';
import { queryEventEmitter } from '@/components/common/layout/queryEventEmitter.ts';
import { isGlobalError, type MdxResponse } from '@/types/mdx.ts';
import { createError } from '@/utils/error/createError.ts';
import { logger } from '@/utils/libs/logger.ts';
import { getAuthorizationToken } from '@/utils/sgwt/sgwtConnect.ts';

export function createWsEventEmitter(): WsEventEmitter {
  const eventEmitter = new EventEmitter() as WsEventEmitter;
  eventEmitter.setMaxListeners(0);
  return eventEmitter;
}
let currentId = 0;

/**
 * WS - Errors
 *
 * When connection cannot be established (url does not respond)
 * => ws.onerror
 * => ws.onclose (wasClean = true)
 *
 * Server crashes while ws is open
 * => ws.onclose (wasClean = false)
 *
 * Server throws error while establishing the websocket
 * => ws.onclose (wasClean = false)
 *
 * Server closes the websocket normally
 * => ws.onclose (wasClean = true)
 *
 * Notes:
 * - when websocket closes normally before any response => iterators do not timeout => nothing happens on screen
 */

export function createWebSocketConnection(ws: WebSocket): WebSocketConnection {
  const wsEventEmitter = createWsEventEmitter();

  let closed = false;

  function throwIfClosed() {
    if (closed) {
      throw new Error('This is closed, why do you still have a reference?');
    }
  }

  ws.onmessage = ({ data }) => {
    const message: MdxResponse = JSON.parse(data);
    if ('error' in message) {
      logger.logError('Mdx error {message_s}', JSON.stringify(message));
      // TODO : great does not send queryId for errors ???
      if (isGlobalError(message)) {
        // Emitting the 'error' message will automatically shut down iterators and throw errors.
        // See p-event docs (https://github.com/sindresorhus/p-event#rejectionevents)
        // I think this is a desirable behavior right now, but we can also change it to be 'wsError'
        // if we want to manually handle the error cases.
        wsEventEmitter.emit('error', { message: message.details, title: message.error });
      } else {
        wsEventEmitter.emit('error', {
          message: message.error.errorChain.at(0)?.message ?? 'Unknown error',
          title: 'Query error',
        });
      }
    } else if (message.type === 'cellSetData') {
      const queryId = message.queryId;
      wsEventEmitter.emit('cellSetData', {
        queryId,
        data: parseResponse(message.data),
      });
    } else if (message.type === 'cellData') {
      const queryId = message.queryId;
      const cells = message.data.cells;
      wsEventEmitter.emit('cellData', {
        queryId,
        cells,
      });
    } else {
      console.warn('Unhandled message');
    }
  };

  ws.onclose = error => {
    wsEventEmitter.emit('close', undefined);
    if (!error.wasClean) {
      logger.logError('WebSocket unclean close {message_s}', JSON.stringify(error));
      wsEventEmitter.emit('error', {
        title: 'WS closed',
        message: error.reason ?? error.code,
      });
    }
    closed = true;
    wsEventEmitter.removeAllListeners();
    queryEventEmitter.removeAllListeners();
  };

  const queryIdsByGroup: Record<QueryGroup, Set<string>> = {
    cache: new Set<string>(),
    pricing: new Set<string>(),
    table: new Set<string>(),
    widget: new Set<string>(),
    hedger: new Set<string>(),
  };

  function getAllPausableQueryIds(): string[] {
    return [...queryIdsByGroup['table'], ...queryIdsByGroup['widget']];
  }

  return {
    wsEventEmitter,

    id: currentId++,

    isClosed() {
      return ws.readyState !== WebSocket.OPEN;
    },

    resumeAllQueries() {
      for (const queryId of getAllPausableQueryIds()) {
        ws.send(JSON.stringify(getActionQueryRequest(queryId, 'RESUME')));
      }
    },

    pauseAllQueries() {
      for (const queryId of getAllPausableQueryIds()) {
        ws.send(JSON.stringify(getActionQueryRequest(queryId, 'PAUSE')));
      }
    },

    queryGenerator({ queryId, queryGroup, mdx, contextValues, initialState }, options?) {
      queryIdsByGroup[queryGroup].add(queryId);
      console.log('send query', queryId);
      throwIfClosed();
      ws.send(JSON.stringify(getRegisterQueryRequest(queryId, mdx, initialState, contextValues)));
      queryEventEmitter.emit('onQuery', { mdx, queryGroup, queryId });
      return queryGenerator(wsEventEmitter, queryId, options);
    },

    cancelQueries(queryGroup, filter) {
      const queryIds = queryIdsByGroup[queryGroup];
      const predicate = filter ?? (() => true);
      const { true: queryIdsToRemove, false: queryIdsToKeep } =
        asSequence(queryIds).partition(predicate);

      if (queryIdsToRemove.length === 0) {
        return 0;
      }

      console.log('ws.cancel', queryIdsToRemove);
      for (const queryId of queryIdsToRemove) {
        ws.send(JSON.stringify(getUnregisterQueryRequest(queryId)));
        wsEventEmitter.emit('end', { queryId });
      }
      queryIdsByGroup[queryGroup] = new Set(queryIdsToKeep);
      return queryIdsToRemove.length;
    },
    close() {
      ws.close();
      closed = true;
    },
  };
}

export function createWebsocketConnection(url: string): Promise<WebSocketConnection> {
  const onFailedAttempt = (error: FailedAttemptError) => {
    console.warn(
      `Attempt ${error.attemptNumber} failed. There are ${error.retriesLeft} retries left. Cause: ${JSON.stringify(error.cause)}`,
    );
  };

  const options: RetryOptions = {
    onFailedAttempt,
    retries: 3,
    factor: 2,
    minTimeout: 1000,
    maxTimeout: 5000,
  };

  try {
    return pRetry(() => doCreateWebsocketConnection(url), options);
  } catch (e: any) {
    logger.logError(
      'WebSocket connection error {message_s} on {ws_url_s}',
      JSON.stringify('cause' in e ? e.cause : e),
      url,
    );
    throw e;
  }
}

export function doCreateWebsocketConnection(url: string): Promise<WebSocketConnection> {
  try {
    const ws = new WebSocket(url + '?sgx=' + getAuthorizationToken());
    const wsConnection = createWebSocketConnection(ws);
    return new Promise((resolve, reject) => {
      ws.onopen = () => {
        resolve(wsConnection);
      };
      ws.onerror = event => {
        reject(createError('Error.WebSocket', event));
      };
    });
  } catch (error) {
    return Promise.reject(createError('Error.WebSocket', error));
  }
}
