import { useEffect, useState, useRef, useCallback } from 'react';
import { decode } from '@msgpack/msgpack';
import { throttle } from 'lodash';
import { STREAM_HOST } from '../../config';
import { TAPE_DEFAULT_FILTER_ID, DEFAULT_TNS_SYMBOLS } from '../../config/tape';
import { StreamType } from '../../types';
import {
  RawOptionFeedData,
  Filter,
  TnsSortedDataWindow,
  FilterItem,
} from '../../types/tape';
import { getOverrideToken, getCachedToken } from '../../util';
import { validateTnsRow, getOptionFeedDataRow } from '../../util/tape';
import useLog from '../useLog';

const MAX_RETRIES = 3;
const RETRY_DELAY = 5000;

//  WebSocket manager
const WebSocketManager = {
  wsInstance: null as WebSocket | null,
  currentSymbols: ['*'] as string[],
  retryCount: 0,
  retryTimeout: null as number | null,
  subscriber: null as {
    callback: (data: RawOptionFeedData[]) => void;
    filters: Filter[];
    sortedWindow?: TnsSortedDataWindow;
  } | null,

  sendMessage(msg: object) {
    if (!this.wsInstance || this.wsInstance.readyState !== WebSocket.OPEN) {
      return;
    }
    this.wsInstance.send(JSON.stringify(msg));
  },

  setSubscriber(
    callback: (data: RawOptionFeedData[]) => void,
    filters: Filter[],
    sortedWindow?: TnsSortedDataWindow,
  ) {
    this.subscriber = { callback, filters, sortedWindow };
  },

  removeSubscriber() {
    this.subscriber = null;
  },

  processNewData(newRow: RawOptionFeedData): boolean {
    if (!this.subscriber) {
      return false;
    }

    return validateTnsRow(
      newRow,
      this.subscriber.filters,
      this.subscriber.sortedWindow,
    );
  },

  updateSymbols(symbols: string[]) {
    if (!this.wsInstance || this.wsInstance.readyState !== WebSocket.OPEN) {
      return;
    }

    // If we have existing symbols, unsubscribe first
    if (this.currentSymbols.length > 0) {
      this.sendMessage({
        action: 'unsubscribe_all',
      });
    }

    this.sendMessage({
      action: 'subscribe',
      underlyings: symbols,
      stream_types: StreamType.FULL_ABSOLUTE_SIGNAL,
    });

    console.log(`TNS WebSocket subscribing to ${symbols.join(', ')}`);
    this.currentSymbols = symbols;
  },

  initialize(token: string, onError: (error: string) => void) {
    // Don't return early if connection exists but is CLOSING or CLOSED
    if (
      this.wsInstance?.readyState === WebSocket.OPEN ||
      this.wsInstance?.readyState === WebSocket.CONNECTING
    ) {
      return;
    }

    // Cleanup any existing connection
    this.cleanup();

    this.wsInstance = new WebSocket(
      `wss://${STREAM_HOST}/stream?token=${encodeURIComponent(token)}`,
    );

    this.wsInstance.onopen = () => {
      console.log('TNS WebSocket connection opened');
      this.retryCount = 0;
      if (this.currentSymbols.length > 0) {
        this.updateSymbols(this.currentSymbols);
      }
    };

    this.wsInstance.onmessage = async (event: MessageEvent) => {
      const { data } = event;
      if (typeof data === 'string') {
        return;
      }

      try {
        const signalTuple: any = decode(await data.arrayBuffer(), {
          useBigInt64: true,
        });
        // Drop the first member member of tuple. It's signal type and we know it.
        const [, signal] = signalTuple;
        const newRow = getOptionFeedDataRow(signal);

        // Only process and emit rows that pass our criteria
        if (this.processNewData(newRow)) {
          if (this.subscriber) {
            this.subscriber.callback([newRow]);
          }
        }
      } catch (error) {
        console.error('Error decoding message:', error);
      }
    };

    this.wsInstance.onclose = () => {
      console.log('TNS WebSocket connection closed');
      if (this.retryCount < MAX_RETRIES) {
        this.retryCount++;
        this.retryTimeout = window.setTimeout(() => {
          console.log(
            `TNS Socket attempting to reconnect... (Attempt ${this.retryCount})`,
          );
          this.initialize(token, onError);
        }, RETRY_DELAY);
      }
    };

    this.wsInstance.onerror = () => {
      onError('An error occurred during the TNS WebSocket connection.');
    };
  },

  cleanup() {
    if (this.wsInstance) {
      this.wsInstance.onclose = null;
      this.wsInstance.onerror = null;
      this.wsInstance.onmessage = null;
      this.wsInstance.onopen = null;
      this.wsInstance.close();
      this.wsInstance = null;
    }
    if (this.retryTimeout) {
      clearTimeout(this.retryTimeout);
      this.retryTimeout = null;
    }
    this.retryCount = 0;
  },
};

interface SocketProps {
  filters: Filter[];
  setRows: (data: RawOptionFeedData[]) => void;
  sortedDataWindow?: TnsSortedDataWindow;
  isTnsFlowLive: boolean;
}

const useTapeWebsocket = ({
  filters,
  setRows,
  sortedDataWindow,
  isTnsFlowLive = true,
}: SocketProps) => {
  const [socketError, setSocketError] = useState<string | null>(null);
  const { nonProdDebugLog } = useLog('useTapeWebsocket');
  const bufferedDataRef = useRef<RawOptionFeedData[]>([]);

  // Throttled function to update rows with final filter pass
  const throttledUpdateRows = useCallback(
    throttle(() => {
      const finalFilteredData = bufferedDataRef.current.filter((row) =>
        validateTnsRow(row, filters, sortedDataWindow),
      );
      setRows(finalFilteredData);
      bufferedDataRef.current = [];
    }, 2_000),
    [filters, setRows, sortedDataWindow],
  );

  // Handle new data
  const handleNewData = useCallback(
    (newRows: RawOptionFeedData[]) => {
      if (isTnsFlowLive) {
        bufferedDataRef.current.push(...newRows);
        throttledUpdateRows();
      }
    },
    [throttledUpdateRows, isTnsFlowLive],
  );

  // Initialize WebSocket connection
  useEffect(() => {
    if (!isTnsFlowLive) {
      return;
    }

    const token = getOverrideToken() ?? getCachedToken();
    if (token == null) {
      nonProdDebugLog('Invalid or missing streaming token.');
      return;
    }

    WebSocketManager.setSubscriber(handleNewData, filters, sortedDataWindow);
    WebSocketManager.initialize(token, setSocketError);

    return () => {
      throttledUpdateRows.cancel();
      WebSocketManager.cleanup();
      WebSocketManager.removeSubscriber();
    };
  }, [isTnsFlowLive]);

  // Update subscriber when filters or window changes
  useEffect(() => {
    WebSocketManager.setSubscriber(handleNewData, filters, sortedDataWindow);
    const syms =
      ((
        filters.find(
          (f) => f.id === TAPE_DEFAULT_FILTER_ID.Symbols,
        ) as FilterItem
      )?.value as string[]) ?? DEFAULT_TNS_SYMBOLS;
    WebSocketManager.updateSymbols(syms);
  }, [filters, sortedDataWindow]);

  return { error: socketError };
};

export default useTapeWebsocket;
