import {
  BehaviorSubject,
  EMPTY,
  identity,
  Observable,
  of,
  pipe,
  ReplaySubject,
  Subject,
  Subscription,
  throwError,
  TimeoutError,
  timer,
} from 'rxjs';
import {
  catchError,
  concatAll,
  delay,
  distinctUntilChanged,
  filter,
  first,
  map,
  mergeMap,
  pairwise,
  pluck,
  retryWhen,
  startWith,
  switchMap,
  takeUntil,
  tap,
  timeout,
} from 'rxjs/operators';
import { webSocket } from 'rxjs/webSocket';

import {
  convertAbortedRequestToRequest,
  createAppCompatMetadata,
  createAuthenticateTokenRequest,
  createWrappedDataPacket,
  createWrappedDisconnectPacket,
  createWrappedIdentificationPacket,
  createWrappedLanguagePacket,
  createWrappedLogoutPacket,
  createWrappedMetaDataPacket,
  createWrappedMulticastRegistrationPacket,
  createWrappedMulticastUnregistrationPacket,
  dataPacketToRequests,
  delayResponseIfNeeded,
  deserializeDataPacket,
  extractRequests,
  generateUuid,
  isAcceptLegalPopupContent,
  isChangeLanguagePacketWrapper,
  isDataOrRawDataPacket,
  isDataPacketWrapper,
  isFromTheRequestId,
  isInjectedRequest,
  isLogoutSuccessPacketWrapper,
  isMetadataAckPacketWrapper,
  isMulticastPacket,
  isPacketWrapper,
  isPushPacket,
  isShowLegalPopupContent,
  mapPacketWrapperToRequests,
  packetWrapperToDataPacket,
  parseAndDecompressMessage,
  parseMessage,
  parseRequestContent,
  requestToContent,
  sanitizeWsAdapterOptions,
} from './helpers';
import { getLoggerWithPrefix } from './logger';
import {
  DataPacket,
  EAcceptLegalPopupResponse,
  EDataPacketDirection,
  EPacketType,
  EQueueStatus,
  EWebSocketStatus,
  LiteWebSocketSubject,
  MetaData,
  PacketWrapper,
  RawDataPacket,
  Request,
  WebSocketAdapter,
  WebsocketDebugStoreItem,
  WebSocketSubjectFactory,
  WsAdapterOptions,
} from './types';

// The lib can be used in both dom and non dom env
// This type is defined in node_modules/typescript/lib/lib.dom.d.ts
type MessageEvent = {
  data: unknown;
};

const DEFAULT_RECONNECT_ATTEMPTS = 50;
const DEFAULT_RECONNECT_DELAY_IN_MS = 500;
const DEFAULT_RECONNECT_INCREMENT_IN_MS = 500;
const DEFAULT_TIMEOUT_IN_MS = 5000;
const DEFAULT_TIMEOUT_MULTIPLICATOR = 1;
const DEFAULT_DEBUG_STORE_BUFFER = 500;

const createWebSocketDefaultFactory = <T>(): WebSocketSubjectFactory<T> =>
  webSocket;

/**
 * The goal of this class is to provide a central object that manage
 * the WebSocket communications, with the help of rxjs
 * It can
 * - init the WebSocket
 * - automatically reconnect lost connection
 * - received requests and send their response back
 * - manage a queue of requests
 *
 * The class is exported only for test purpose, only the exported instance must
 * be used otherwise!
 */
export class WSAdapter implements WebSocketAdapter {
  /**
   * List of past packets/requests sent in every direction
   */
  protected debugStoreSubject$ = new ReplaySubject<WebsocketDebugStoreItem>(
    DEFAULT_DEBUG_STORE_BUFFER,
  );

  /**
   * Subject containing a list of mock responses to use instead of the ones from
   * the backend when the debug mode is enabled
   */
  protected overwrittenResponsesSubject$ = new BehaviorSubject<
    Record<string, unknown>
  >({});

  /**
   * Subject used to notify the connection status change of the WS
   */
  protected socketStatus$ = new BehaviorSubject<EWebSocketStatus>(
    EWebSocketStatus.disconnected,
  );

  /**
   * Delay before an incoming request is emitted (in ms)
   */
  protected responseDelay = 0;

  /**
   * Subject that listen to the WebSocket, but never send error or completes
   * (even if the WebSocketSubject do so)
   */
  protected socketListener$: Subject<PacketWrapper>;

  /**
   * Observable that only returns multicast requests sent by the server
   */
  protected multicastListener$: Observable<DataPacket>;

  /**
   * Function returning a webSocketSubject. Should be "webSocket" imported from
   * "rxjs/webSocket" but can be injected for testing purpose
   */
  protected webSocketSubjectFactory: WebSocketSubjectFactory<PacketWrapper | null> =
    createWebSocketDefaultFactory();

  /**
   * Special subject linked to a WebSocket, created by rxjs
   */
  private socket$: LiteWebSocketSubject<PacketWrapper | null> | null;

  /**
   * Observable that only yield individuals requests sent by the server
   */
  private requestListener$: Observable<Request>;

  /**
   * Observable that only yield data packets
   */
  private dataPacketListener$: Observable<DataPacket | RawDataPacket>;

  /**
   * Observable that only yield requests sent by the server as push
   */
  private push$: Observable<Request>;

  /**
   * Observable that only yield requests sent by the server as injected packages
   */
  private injected$: Observable<Request>;

  /**
   * Subscription to the WebSocketSubject
   */
  private socketSubscription: Subscription | null = null;

  /**
   * Subscription to the ReconnectionObservable
   */
  private reconnectionSubscription: Subscription | null = null;

  /**
   * Subscription to the "ShowLegalPopup" injected packages
   */
  private showLegalPopupSubscription: Subscription | null = null;

  /**
   * Substription to the "AcceptLegalPopup" response
   */
  private acceptLegalPopupSubscription: Subscription | null = null;

  /**
   * Subscription to push requests in order to log them
   */
  private pushLogSubscription: Subscription | null = null;

  /**
   * Subscription to injected requests in order to log them
   */
  private injectedLogSubscription: Subscription | null = null;

  /**
   * Subscription to the received non data packets for debug purpose
   */
  private debugReceivedPacketsSubscription: Subscription | null = null;

  /**
   * Subscription to the received requests for debug purpose
   */
  private debugReceivedRequestsSubscription: Subscription | null = null;

  /**
   * User token used for authentication (one use only)
   */
  private userToken: string | null;

  /**
   * List of PacketWrappers that will be sent to the WebSocket when it is ready
   */
  private queue: PacketWrapper[];

  /**
   * Count of subscriptions to each multicast group name
   */
  private groupSubscriptionsCount: {
    [groupName: string]: BehaviorSubject<number>;
  };

  /**
   * If true, the wsAdapter can be closed using the `close` method
   */
  private isDisposable = false;

  /** When 'paused', no response is expected from the backend */
  private requestsQueueStatus$ = new BehaviorSubject<EQueueStatus>(
    EQueueStatus.active,
  );

  /**
   * List of Requests aborted by the backend because the legalPopup must be
   * approved
   */
  private abortedRequests: Request[] = [];

  /** Logger instance with prefix on all type of messages */
  private logger = getLoggerWithPrefix('[WS]');

  /**
   * Parameters given on the initialisation of the WebSocket by the client
   */
  private options: Required<WsAdapterOptions> = {
    address: '',
    compression: null,
    debugMode: false,
    defaultTimeout: DEFAULT_TIMEOUT_IN_MS,
    locale: 'en',
    loggerPrefix: '[WS]',
    reconnectAttempts: DEFAULT_RECONNECT_ATTEMPTS,
    reconnectDelay: DEFAULT_RECONNECT_DELAY_IN_MS,
    reconnectIncrement: DEFAULT_RECONNECT_INCREMENT_IN_MS,
    room: '',
    shouldSendInitMetadata: true,
    timeoutValueMultiplicator: DEFAULT_TIMEOUT_MULTIPLICATOR,
  };

  /**
   * Observable that will trigger attempts to reconnect to the WebSocket
   */
  private reconnectionObservable: Observable<number> | null = null;

  /**
   * Observable boolean that will yield false when the WebSocket has stopped
   * trying to reconnect, true otherwise
   */
  private isAttemptingToReconnectAutomatically$ = new BehaviorSubject(false);

  /**
   * BehaviorSubject yielding the number of remaining reconnection attemps. On
   * successful reconnection, will reset to this.options.reconnectAttempts
   */
  private remainingReconnectionAttemps$ = new BehaviorSubject(0);

  /**
   * Prepare the queue and the listeners
   * @param options
   * @param options.webSocketSubjectFactory factory for creating the websocket
   *   subject should only be used for test purpose. Default to undefined (will
   *   use the one from rxjs)
   * @param options.disposable whether the websocket can be closed during the
   *   app lifetime. Default to false
   *
   */
  public constructor(options?: { disposable?: boolean }) {
    this.isDisposable = options?.disposable || false;
    this.userToken = null;
    this.socket$ = null;
    this.socketListener$ = new Subject();

    /** Observable of data packets emitted on the WebSocket */
    this.dataPacketListener$ = this.socketListener$.pipe(
      // Delay requests for a certain amount (only useful in dev)
      delayResponseIfNeeded(this.responseDelay),
      // Extract the data packets from the packet wrapper
      packetWrapperToDataPacket,
    );

    this.requestListener$ = this.dataPacketListener$.pipe(
      // Transform a data packet to one or multiple requests contained within
      dataPacketToRequests,
    );

    this.multicastListener$ = this.dataPacketListener$.pipe(
      // Keep only multicast packets
      filter(isMulticastPacket),
      map(deserializeDataPacket),
    );

    this.push$ = this.dataPacketListener$.pipe(
      filter(isPushPacket),
      dataPacketToRequests,
    );

    this.pushLogSubscription = this.push$.subscribe((request) =>
      this.logger.debug('[Response] [Push]', request),
    );

    this.injected$ = this.requestListener$.pipe(filter(isInjectedRequest));

    this.injectedLogSubscription = this.injected$.subscribe((request) =>
      this.logger.debug('[Response] [Injected]', request),
    );

    this.queue = [];
    this.groupSubscriptionsCount = {};

    /** Listen to the ShowLegalPopup incoming requests */
    this.showLegalPopupSubscription = this.getInjectedRequests$(
      'ShowLegalPopup',
    )
      .pipe(
        filter(isShowLegalPopupContent),
        map(({ RequestsAborted, ShowLegalPopup }) => ({
          shouldPauseQueue: ShowLegalPopup,
          abortedRequests: RequestsAborted?.map(convertAbortedRequestToRequest),
        })),
      )
      .subscribe({
        next: this.handleLegalPopup,
      });

    /** Listen to AcceptLegalPopup incoming requests */
    this.acceptLegalPopupSubscription = this.requestListener$
      .pipe(
        filter((request) => request.Identifier === 'AcceptLegalPopup'),
        pluck('Content'),
        /** Parsing the response */
        map(parseRequestContent),
        filter(isAcceptLegalPopupContent),
      )
      .subscribe({
        next: ({ AcceptLegalPopupStatus }) =>
          this.handleLegalPopup({
            shouldPauseQueue:
              AcceptLegalPopupStatus !== EAcceptLegalPopupResponse.Successful &&
              AcceptLegalPopupStatus !==
                EAcceptLegalPopupResponse.LegalPopupDisabled,
          }),
      });
  }

  /**
   * Observable of the WebSocket status
   *
   */
  public status$ = this.socketStatus$.pipe(distinctUntilChanged());

  /**
   * Get the current WebSocket status
   */
  public getStatus = () => this.socketStatus$.getValue();

  /**
   * Observable of the the request queue status
   */
  public queueStatus$ = this.requestsQueueStatus$.asObservable();

  /**
   * Observable of stored packets sent through the WebSocket
   * in both directions (the debug mode as to be enabled)
   */
  public debugStore$ = this.debugStoreSubject$.asObservable();

  /**
   * Observable of stored mock responses for the request() method (the debug
   * mode as to be enabled)
   */
  public overwrittenResponses$ =
    this.overwrittenResponsesSubject$.asObservable();

  /**
   * Observable which will emit true when the websocket is trying to reconnect
   * automatically
   */
  public isAutomaticallyReconnecting$ =
    this.isAttemptingToReconnectAutomatically$.pipe(distinctUntilChanged());

  /**
   * Returns true if the socket is attempting to (re)connect
   */
  public getIsAutomaticallyReconnecting = () =>
    this.isAttemptingToReconnectAutomatically$.getValue();

  /**
   * Set the status of the request queue
   * Warning: Should only be used in secondaries
   * wsAdapters! (i.e. inside Game Launcher) !
   * The queue status is automatically set in the main app by observing
   * legal popup related backend responses
   */
  public setQueueStatus = (status: EQueueStatus) =>
    this.requestsQueueStatus$.next(status);

  /**
   * Returns true if the init() method was already called
   */
  public wasInitialzed = () =>
    !!this.socket$ ||
    this.socketStatus$.value !== EWebSocketStatus.disconnected;

  /**
   * Provide the URL and supported compression method of the WebSocket (if any)
   * Then open the WebSocket
   * @param options Options of the connection
   * @param options.address Address of the WebSocket
   * @param options.room Room name
   * @param options.locale Locale code of the user
   * @param [options.compression=null] Name of the compression used (if any)
   * @param [options.reconnectAttempts=50] Max number of reconnection attemps
   * @param [options.reconnectDelay=2000] Minimum delay in ms between each reconnection attemps
   * @param [options.reconnectIncrement=100] Delay increment in ms between each reconnection attemps
   */
  public init = (options: WsAdapterOptions) => {
    if (this.wasInitialzed()) {
      this.logger.warn(
        'an attempt was made to init multiple time the wsAdapter',
      );
      return;
    }
    this.options = {
      ...this.options,
      ...sanitizeWsAdapterOptions(options),
    };
    this.remainingReconnectionAttemps$.next(this.options.reconnectAttempts);
    if (options.loggerPrefix) {
      this.logger = getLoggerWithPrefix(options.loggerPrefix);
    }
    this.logger.info('init with options', this.options);
    if (this.options.debugMode) {
      this.setDebugMode(true);
    }
    this.createSocket();
  };

  /** Change the Web Socket factory used to create a WS Subject. Should be
   * called before init() because it will trigger the private createSocket()
   * method which will use that factory (it will also be called when there is a
   * reconnection attempt after the previous WS died) */
  public setWebSocketSubjectFactory = (
    wsSubjectFactory: WebSocketSubjectFactory<PacketWrapper | null>,
  ) => {
    this.webSocketSubjectFactory = wsSubjectFactory;
  };

  /**
   * Attempt to reconnect the websocket if it is down
   */
  public reconnect = () => {
    if (this.socketStatus$.getValue() === EWebSocketStatus.disconnected) {
      if (!this.options.address) {
        this.logger.error(
          "[Reconnection] an attempt to reconnect was made but the WS address isn't set. Did you forget to call the wsAdapter init method?",
        );
        return;
      }
      const reconnectAttempNumber =
        this.remainingReconnectionAttemps$.getValue() + 1;
      if (reconnectAttempNumber > 0) {
        this.logger.info(
          `[Reconnection] automatic attempt ${reconnectAttempNumber}/${this.options.reconnectAttempts}`,
        );
      } else {
        this.logger.info(`[Reconnection] manual attempt`);
      }
      this.socket$ = null;
      this.createSocket();
    }
  };

  /**
   * Send a message to the WebSocket that the application is closing
   */
  public prepareDisconnection = () => {
    this.logger.info('disconnect package sent');
    this.enqueue(createWrappedDisconnectPacket());
  };

  /**
   * Close the websocket and every listeners. The wsAdapter won't be able to
   * do anything after this is called so beware,
   */
  public close = () => {
    if (!this.isDisposable) {
      this.logger.error('Attempt to close a non disposable wsAdapter');
      return;
    }
    this.logger.warn('Socket was manually closed');
    this.options.reconnectAttempts = 0;
    this.socket$?.next(createWrappedDisconnectPacket());
    this.socket$?.complete();
    // Unsub all subscriptions
    this.socketSubscription?.unsubscribe();
    this.reconnectionSubscription?.unsubscribe();
    this.injectedLogSubscription?.unsubscribe();
    this.pushLogSubscription?.unsubscribe();
    this.showLegalPopupSubscription?.unsubscribe();
    this.acceptLegalPopupSubscription?.unsubscribe();
  };

  /**
   * Inform the WebSocket that the user has changed its locale
   * @param locale the locale code (two characters, e.g. 'fr', 'en')
   */
  public switchLocale = (locale: string) => {
    this.options.locale = locale;
    this.logger.info('locale switching', locale);
    this.enqueue(createWrappedLanguagePacket(locale));
    return this.socketListener$.pipe(
      filter(isChangeLanguagePacketWrapper),
      tap((response) =>
        this.logger.debug(`[WS] Locale switching success`, response),
      ),
      first(),
      timeout(
        this.options.defaultTimeout * this.options.timeoutValueMultiplicator,
      ),
    );
  };

  /**
   * Create a auth request when a token is available
   * @param token auth token
   * @param locale optional locale code
   */
  public loginUser = (token: string, locale?: string) => {
    const authRequest = createAuthenticateTokenRequest({
      Token: token,
      LanguageCode: locale || this.options.locale,
    });
    const reqId = authRequest.Id;
    this.logger.info(`[Auth] Request with ID ${reqId} queued, token =`, token);
    return this.request(authRequest, { topPriority: true }).pipe(
      tap({
        error: (err) => {
          this.logger.warn('[Auth] Response error', err);
          // Save the user token after the error/timeout has been received
          this.userToken = token;
        },
        next: (response) => {
          this.logger.info(`[Auth] Response with ID ${reqId} :`, response);
          // Save the user token after the response has been received
          this.userToken = token;
        },
      }),
    );
  };

  /**
   * Create a auth reset request
   */
  public logoutUser = () => {
    this.logger.info('[Auth] Logout request');
    this.enqueue(createWrappedLogoutPacket(), true);
    /* Restore the queue active status because while logged out, the queue
    shouldn't be paused (legal popup is for logged in users) */
    this.setQueueStatus(EQueueStatus.active);
    return this.socketListener$.pipe(
      filter(isLogoutSuccessPacketWrapper),
      tap((response) =>
        this.logger.debug('[Auth] [Response] Logout ', response),
      ),
      first(),
      timeout(
        this.options.defaultTimeout * this.options.timeoutValueMultiplicator,
      ),
    );
  };

  /**
   * Send the request to the WebSocket and return an observable that will only
   * send back the response (filtered by ID)
   * @param request Request object
   * @param options
   * @param options.keepListening Allows to keep listening to responses after
   *  the first one. This is used
   *  in hybrid implementations of request/push like the connected games WS.
   *
   *  Warning: Using `true` will turn the timeout handling off as the stream
   *  becomes and infinite source of data (push listening).
   * @param options.retryOnTimeout Should the request be send again (one time)
   *  on timeout ? (default true)
   * @param options.timeoutAfterMs Amount of ms after which the request is
   *  considered timed out (default 5000)
   * @param options.topPriority Should the packet containing the request be put
   *  on top the the queue? (Should only be used for auth)
   */
  public request = <T = unknown>(
    request: Request,
    {
      /**
       * Allows to keep listening to responses after the first one. This is used
       * in hybrid implementations of request/push like the connected games WS.
       *
       * Warning: Using `true` will turn the timeout handling off as the stream
       * becomes and infinite source of data (push listening).
       */
      keepListening = false,
      /**
       * Should the request be send again (one time) on timeout ? (default true)
       */
      retryOnTimeout = true,
      /**
       * The backend has its own mecanism where a packet is dropped by default
       * after 30sec (if it is enabled, we can't get that info). But 30sec is way
       * too long for the user
       */
      timeoutAfterMs = this.options.defaultTimeout,
      /**
       * Should the packet containing the request be put on top the the queue?
       * (Should only be used for auth)
       * */
      topPriority = false,
    } = {},
  ): Observable<unknown | T> => {
    /** Is the latest sent request a retry after a timeout ? */
    let isRetrying = false;
    /** Latest sent request id */
    request.Id = generateUuid();
    const mockResponse =
      this.overwrittenResponsesSubject$.getValue()[request.Identifier];
    if (this.options.debugMode && mockResponse) {
      return of(1).pipe(
        tap(() => {
          this.enqueue(createWrappedDataPacket(request), topPriority);
          this.logger.debug(
            `[Request] ${request.Identifier} with ID ${request.Id} queued:`,
            parseRequestContent(request.Content),
          );
        }),
        // delay the response to mimick real world usage
        delay(200),
        tap(() => {
          this.logger.debug(
            `[Response MOCK] ${request.Identifier} with ID ${request.Id} queued:`,
            mockResponse,
          );
        }),
        map(() => mockResponse),
      );
    }
    return of(1).pipe(
      tap(() => {
        /**
         * When retrying after a timeout, send the same request, only with a
         * different id
         * */
        if (isRetrying) {
          const previousReqId = request.Id;
          // const newRequest = { ...request, Id: reqId };
          request.Id = generateUuid();
          this.logger.warn(
            `[Request] ${request.Identifier} with ID ${previousReqId} timed out. Retrying with ID ${request.Id}...`,
            request,
          );
        } else {
          try {
            this.logger.debug(
              `[Request] ${request.Identifier} with ID ${request.Id} queued:`,
              parseRequestContent(request.Content),
            );
          } catch (e) {
            this.logger.debug(
              `[Request] ${request.Identifier} with ID ${request.Id} queued, but unable to parse content:`,
              request.Content,
            );
          }
        }
        this.enqueue(createWrappedDataPacket(request), topPriority);
      }),
      mergeMap(() => this.socketListener$.pipe(mapPacketWrapperToRequests)),
      /** Getting only the response from the same id as the request */
      filter(isFromTheRequestId(request.Id)),
      pluck('Content'),
      /** Parsing the response */
      map(parseRequestContent),
      /** Logging the response to the console */
      tap((response) => {
        this.logger.debug(
          `[Response] for ${request.Identifier} request with ID ${request.Id}:`,
          response,
        );
      }),
      /**
       * if keepListening is true, no timeout error should be thrown and the
       * pipe shouldn't complete since it becomes an infinite stream of data.
       */
      keepListening
        ? map(identity)
        : pipe(
            first(),
            timeout(timeoutAfterMs * this.options.timeoutValueMultiplicator),
          ),
      /**
       * Only retry the request if
       * - the error is a timeout
       * - the request has been marked as retryable
       * - there hasn't been already a retry
       * - the queue isn't paused
       */
      retryWhen((err$) =>
        err$.pipe(
          mergeMap((err) => {
            const shouldRetry =
              err instanceof TimeoutError &&
              retryOnTimeout &&
              !isRetrying &&
              (!this.abortedRequests.find(
                (req) =>
                  req.Id === request.Id &&
                  req.Identifier === request.Identifier,
              ) ||
                this.requestsQueueStatus$.getValue() !== EQueueStatus.paused);
            isRetrying = shouldRetry;
            return shouldRetry ? of(err) : throwError(err);
          }),
        ),
      ),
      /** Logging the error and still throwing it */
      catchError((err) => {
        if (
          this.abortedRequests.find(
            (req) =>
              req.Id === request.Id && req.Identifier === request.Identifier,
          )
        ) {
          this.logger.debug(
            `[Legal Popup] request ${request.Identifier} with ID ${request.Id} was aborted, it will be resent when the popup is accepted`,
          );
          return this.requestsQueueStatus$.pipe(
            startWith(this.requestsQueueStatus$.getValue()),
            filter((status) => status === EQueueStatus.active),
            first(),
            switchMap(() =>
              this.request(request, {
                keepListening,
                retryOnTimeout,
                timeoutAfterMs,
                topPriority,
              }),
            ),
          );
        }
        if (err instanceof TimeoutError) {
          this.logger.error(
            `[Request] ${request.Identifier} with ID ${request.Id} timed out`,
          );
        } else {
          this.logger.error(
            `[Request] ${request.Identifier} with ID ${request.Id} error:`,
            err,
          );
        }
        throw err;
      }),
    );
  };

  /**
   * Subscribe to a multicast group. Returns an observable of requests
   * sent through this group multicast packets
   * @param groupName the name of the multicast group
   * @param options
   * @param options.autoComplete automatically completes the observable when
   *  the subscription count increments
   */
  public subscribeToGroup = (
    groupName: string,
    { autoComplete = true } = {},
  ) => {
    const subscriptionSubject =
      this.getGroupSubscriptionCountSubject(groupName);
    const newCount = subscriptionSubject.value + 1;
    if (newCount > 1) {
      this.logger.debug(
        `[Multicast] Subscribing to the already registered multicast group "${groupName}". ${newCount} subscriptions`,
      );
    } else {
      this.registerToGroup(groupName);
    }
    subscriptionSubject.next(newCount);
    return this.multicastListener$.pipe(
      // Only return packets tagged with the desired multicast group  name
      filter((dataPacket) => dataPacket.Groups.includes(groupName)),
      // Transform data packet to requests
      dataPacketToRequests,
      // Returns the content of the requests
      requestToContent,
      // Uncomment this to log every multicast packets received. Was commented
      // to prevent flooding the console on betting live pages.
      /* tap((content) =>
        this.logger.debug(`[Multicast] Push for group ${groupName}`, content),
      ), */
      // Complete when there is no active subscription anymore or when there is
      // a new subscription and autocomplete is true
      takeUntil(
        subscriptionSubject.pipe(
          pairwise(),
          filter(
            ([previous, current]) =>
              !current || (autoComplete && previous < current),
          ),
        ),
      ),
    );
  };

  /**
   * Inform the adapter that the subscription can safely be removed.
   * Returns the amount of subscriptions left
   * @param groupName the multicast group name
   */
  public unsubscribeToGroup = (groupName: string) => {
    const subscriptionSubject =
      this.getGroupSubscriptionCountSubject(groupName);
    const subscriptionCount = subscriptionSubject.value;
    // No subscriber
    if (!subscriptionCount) {
      this.logger.warn(
        `[Multicast] There was an attempt to unsubscribe to the multicast group "${groupName}" but no registration to that group was active`,
      );
      return subscriptionCount;
    }
    // At least one subscriber left
    if (subscriptionCount > 1) {
      const newCount = subscriptionCount - 1;
      this.logger.debug(
        `[Multicast] Unsubscription to the multicast group "${groupName}". ${newCount} subscriptions left`,
      );
      subscriptionSubject.next(newCount);
      return newCount;
    }
    // No subscriber left
    this.unRegisterGroup(groupName);
    subscriptionSubject.next(0);
    return 0;
  };

  /**
   * Get an observable of the total of subscriptions to a multicast group
   * @param groupName the multicast group name
   */
  public getGroupSubscriptionCount$ = (groupName: string) =>
    this.getGroupSubscriptionCountSubject(groupName).asObservable();

  /**
   * Get an observable of all requests received via push to actor with the given
   * identifier
   */
  public getPushRequests$ = (requestIdentifier: string) =>
    this.push$.pipe(
      filter((request) => request.Identifier === requestIdentifier),
      pluck('Content'),
      map(parseRequestContent),
      catchError(() => EMPTY),
    );

  /**
   * Get an observable of all requests received via package injection with the
   * given identifier
   */
  public getInjectedRequests$ = (requestIdentifier: string) =>
    this.injected$.pipe(
      filter((request) => request.Identifier === requestIdentifier),
      pluck('Content'),
      map(parseRequestContent),
      catchError(() => EMPTY),
    );

  /**
   * Send some metada to the WebSocket
   * @param metadata An object of strings
   */
  public sendMetadata = (metadata: MetaData, topPriority = false) => {
    const packet = createWrappedMetaDataPacket(metadata);
    this.enqueue(packet, topPriority);
    this.logger.debug('[Metadata] Sent', metadata);
    return this.socketListener$.pipe(
      filter(isMetadataAckPacketWrapper),
      tap((response) => this.logger.debug('[Metadata] Acknowledged', response)),
      first(),
      timeout(
        this.options.defaultTimeout * this.options.timeoutValueMultiplicator,
      ),
    );
  };

  /**
   * Delay any incoming response emissions
   * ⚠️ Should only be used for dev/debug purpose!
   * @param [delayInMs] delay value (in ms). Default is 0
   */
  public setResponseDelay = (delayInMs = 0) => {
    this.logger.warn(`[Dev] Response delay manually set to ${delayInMs}`);
    this.responseDelay = delayInMs;
  };

  /**
   * Induce a WebSocket disconnection
   * ⚠️ Should only be used for dev/debug purpose or interop with another WS
   */
  public triggerDisconnection = () => {
    this.logger.warn('Client disconnection triggered manually');
    this.socketStatus$.next(EWebSocketStatus.disconnected);
    if (this.socket$) {
      this.socket$.error({ code: 1000, reason: 'Manual client disconnection' });
    }
  };

  /** Set the wsAdapter to debug mode. This means that:
   * - every packets/requests will be logged for debug purpose
   * - request responses can be overwritten with arbitrary content
   */
  public setDebugMode = (enabled = false) => {
    this.options.debugMode = enabled;
    if (this.debugReceivedRequestsSubscription) {
      this.debugReceivedRequestsSubscription.unsubscribe();
    }
    if (this.debugReceivedPacketsSubscription) {
      this.debugReceivedPacketsSubscription.unsubscribe();
    }

    /* Push received packets to the debug store */
    if (enabled) {
      /* Data packets - requests */

      const requestWithDirection$ = this.socketListener$
        // Transform a data packet to one or multiple requests contained within
        .pipe(
          filter(isDataPacketWrapper),
          packetWrapperToDataPacket,
          // Extract requests and add the direction property from their dataPacket
          map((dataPacket) =>
            deserializeDataPacket(dataPacket).Requests.map((request) => ({
              ...request,
              direction: dataPacket.Direction,
            })),
          ),
          concatAll(),
        );

      // this.requestListener$
      this.debugReceivedRequestsSubscription = requestWithDirection$.subscribe({
        next: (request) => {
          const parsedContent = parseRequestContent(request.Content);

          this.debugStoreSubject$.next({
            id: request.Id,
            type: `Request (${request.Type})`,
            direction: request.direction,
            content: parsedContent,
            name: request.Identifier,
            injected: !!request.Injected,
            time: new Date().getTime(),
          });
        },
      });

      /* Other packets */
      this.debugReceivedPacketsSubscription = this.socketListener$
        .pipe(
          // Requests from data packets are already treated below
          filter((message) => message.MessageType !== EPacketType.DataPacket),
          // Those packets are dispatched every x seconds and clutter the debugging
          filter(
            (message) => message.MessageType === EPacketType.KeepAlivePacket,
          ),
        )
        .subscribe({
          next: (incomingMessage) => {
            this.debugStoreSubject$.next({
              id: incomingMessage.Id,
              type: EPacketType[incomingMessage.MessageType],
              direction: EDataPacketDirection.Response,
              content: parseMessage(incomingMessage),
              time: new Date().getTime(),
            });
          },
        });
    } else {
      this.overwrittenResponsesSubject$.next({});
    }
  };

  /**
   * Force a future request() call to use a given response instead of what the
   * server responded with. Will only work in debug mode
   * @param requestName the name of the request to overwrite
   * @param requestContent the content to be used instead of the server
   * response. If empty, will remove the previous stored value (if any) for that
   * request name
   */
  public setOverwrittenResponse = (
    requestName: string,
    requestContent?: unknown,
  ) => {
    if (this.options.debugMode) {
      const currentResponses = this.overwrittenResponsesSubject$.getValue();
      if (requestContent) {
        this.overwrittenResponsesSubject$.next({
          ...currentResponses,
          [requestName]: requestContent,
        });
      } else {
        const { [requestName]: _, ...rest } = currentResponses;
        this.overwrittenResponsesSubject$.next({ ...rest });
      }
    } else {
      this.logger.warn(
        '[Debugging] unable to force a response while not in debug mode',
      );
    }
  };

  /**
   * Returns true if the group name passed as param has at least one subscription
   * @param groupName the multicast group name
   */
  public hasGroupSubscriptions = (groupName: string): boolean =>
    this.getGroupSubscriptionCountSubject(groupName).value > 0;

  /**
   * Create the WebSocket and listen to its notifications
   */
  protected createSocket = () => {
    if (!this.socket$) {
      // Remove previous subscription if any
      if (this.socketSubscription) {
        this.socketSubscription.unsubscribe();
      }

      this.socketStatus$.next(EWebSocketStatus.connecting);

      // WebSocket creation, with callbacks
      this.socket$ = this.webSocketSubjectFactory({
        url: this.options.address,
        openObserver: {
          next: this.onSocketOpen,
        },
        closingObserver: {
          next: (reason) => this.logger.info('[Status] Closing', reason),
        },
        closeObserver: {
          next: (error) => this.logger.info('[Status] Closed', error),
        },
        deserializer: (messageEvent: MessageEvent) => {
          const { data } = messageEvent;
          // If the WS frame payload is a string, we need to decompress and parse it, to return a PacketWrapper
          if (typeof data === 'string') {
            try {
              return parseAndDecompressMessage(data) || null;
            } catch (e) {
              this.logger.error('[Response] Could not parse WS data', data);
              return null;
            }
          }
          // We assume the data is a correct PacketWrapper
          return data;
        },
      });

      // This ensures that errors and complete won't stop our
      // socketListener$ subject
      this.socketSubscription = this.socket$
        .pipe(
          // Ensure that the packet wrapper is correctly formed and not empty
          // (could be the case when failing to parse the WS data)
          filter(isPacketWrapper),
        )
        .subscribe({
          next: (message) => this.socketListener$.next(message),
          error: this.onSocketClose,
          complete: this.onSocketClose,
        });
    }
  };

  /**
   * Get the internal behaviorSubject of the total of subscriptions to a
   * multicast group
   * @param groupName the multicast group name
   */
  protected getGroupSubscriptionCountSubject = (groupName: string) => {
    if (!this.groupSubscriptionsCount[groupName]) {
      this.groupSubscriptionsCount[groupName] = new BehaviorSubject(0);
    }
    return this.groupSubscriptionsCount[groupName];
  };

  /**
   * Add a packet wrapper to the queue or send it immediately if online
   * @param packetWrapper the PacketWrapper object
   * @param [topPriority=false] The PacketWrapper needs to be on the top of the queue
   */
  private enqueue = (packetWrapper: PacketWrapper, topPriority = false) => {
    if (this.socketStatus$.value === EWebSocketStatus.ready && !!this.socket$) {
      this.sendPacket(packetWrapper);
    } else if (topPriority) {
      this.queue = [packetWrapper, ...this.queue];
    } else {
      this.queue = [...this.queue, packetWrapper];
    }
  };

  private sendPacket = (packetWrapper: PacketWrapper) => {
    if (!this.socket$) {
      return false;
    }
    this.socket$.next(packetWrapper);

    /* Push sent packets to the debug store */
    if (this.options.debugMode) {
      const time = new Date().getTime();
      const content = parseMessage(packetWrapper);

      /* Data packets - requests */
      if (
        isDataPacketWrapper(packetWrapper) &&
        isDataOrRawDataPacket(content)
      ) {
        const direction = content.Direction;
        // Transform a data packet to one or multiple requests contained within
        const requests = extractRequests(content);
        requests.forEach((request) => {
          this.debugStoreSubject$.next({
            direction,
            type: `Request (${request.Type})`,
            content: parseRequestContent(request.Content),
            name: request.Identifier,
            id: request.Id,
            time,
          });
        });
        /* Other packets */
      } else {
        this.debugStoreSubject$.next({
          direction: EDataPacketDirection.Request,
          type: EPacketType[packetWrapper.MessageType],
          content,
          id: packetWrapper.Id,
          time,
        });
      }
    }

    return true;
  };

  /**
   * Attempt to send each item in the queue
   */
  protected startQueue = () => {
    this.logger.info('[Queue] started');
    while (
      this.queue.length > 0 &&
      this.socketStatus$.value === EWebSocketStatus.ready &&
      !!this.socket$
    ) {
      const item = this.queue[0];
      this.enqueue(item);
      this.queue.shift();
    }
  };

  /**
   * Send the first packets and start the queue
   */
  protected onSocketOpen = () => {
    if (this.socket$) {
      this.logger.info('[Status] Open');
      // Status update
      this.socketStatus$.next(EWebSocketStatus.connected);
      if (this.options.shouldSendInitMetadata) {
        // First request to send: the id package
        const identificationOptions = {
          compression: this.options.compression || undefined,
          locale: this.options.locale,
          room: this.options.room,
        };
        this.sendPacket(
          createWrappedIdentificationPacket(identificationOptions),
        );
        this.logger.info(
          '[Init] Identification packet sent with options',
          identificationOptions,
        );

        // Second request to send : metadata for announcing frontend version
        this.sendMetadata(createAppCompatMetadata(), true);
      }

      // Enqueuse the auth request if needed
      if (this.userToken) {
        this.loginUser(this.userToken).subscribe({
          next: () => this.logger.info('[Auth] User has been re-authenticated'),
          error: (err) =>
            this.logger.error('[Auth] User could not be authenticated', err),
        });
      }
      // Register to all subscribed groups
      Object.entries(this.groupSubscriptionsCount).forEach(
        ([groupName, count$]) => {
          if (count$.value) {
            this.registerToGroup(groupName);
          }
        },
      );

      // The delay is there to ensure the socket wasn't closed as soon as it opened
      timer(10).subscribe(() => {
        if (this.socketStatus$.getValue() === EWebSocketStatus.connected) {
          // Remove reference to a potential previous reconnection observable
          if (this.reconnectionSubscription) {
            this.reconnectionSubscription.unsubscribe();
            this.reconnectionObservable = null;
            this.reconnectionSubscription = null;
            this.isAttemptingToReconnectAutomatically$.next(false);
            this.remainingReconnectionAttemps$.next(
              this.options.reconnectAttempts,
            );
          }
          // Status update
          this.socketStatus$.next(EWebSocketStatus.ready);
          // Start to process the request queue
          this.startQueue();
        }
      });
    }
  };

  /**
   * Attempt to recreate a WebSocket when it closes
   * @param [error] an error object
   */
  protected onSocketClose = (error?: unknown) => {
    this.socketStatus$.next(EWebSocketStatus.disconnected);
    if (
      this.remainingReconnectionAttemps$.getValue() <
      this.options.reconnectAttempts
    ) {
      this.logger.warn('[Reconnection] attempt failed');
    } else {
      this.logger.error('[Reconnection] Socket failed to open');
    }
    this.logger.error(
      `[Status] closed ${error ? 'with' : 'without'} error`,
      error,
    );
    this.logger.warn('[Queue] stopped');
    // Unsubscribe from the dead socket$
    if (this.socketSubscription) {
      this.socketSubscription = null;
    }
    if (
      this.options.reconnectAttempts &&
      this.remainingReconnectionAttemps$.getValue()
    ) {
      this.isAttemptingToReconnectAutomatically$.next(true);
      const attemptNumber =
        this.options.reconnectAttempts -
        this.remainingReconnectionAttemps$.getValue();
      const initialDelayInMs =
        this.options.reconnectDelay +
        attemptNumber * this.options.reconnectIncrement;
      this.remainingReconnectionAttemps$.next(
        this.remainingReconnectionAttemps$.getValue() - 1,
      );
      // Next after a delay, increasing on each attempt
      this.reconnectionObservable = timer(initialDelayInMs).pipe(
        // Stop the stream if the websocket is reconnected
        takeUntil(
          this.socketStatus$.pipe(
            filter((status) => status === EWebSocketStatus.ready),
          ),
        ),
      );
      // Attempt to reconnect when the observable next
      this.reconnectionSubscription = this.reconnectionObservable.subscribe({
        next: this.reconnect,
      });
    } else {
      this.logger.warn(
        '[Reconnection] will no longer attempt to automatically reconnect as max attempts number has been reached',
      );
      this.remainingReconnectionAttemps$.next(-1);
      this.isAttemptingToReconnectAutomatically$.next(false);
    }
  };

  /**
   * Register to a multicast group
   * @param groupName the multicast group name
   */
  private registerToGroup = (groupName: string) => {
    this.logger.debug(
      `[Multicast] Registering to the multicast group "${groupName}".`,
    );
    const packet = createWrappedMulticastRegistrationPacket(groupName);
    this.enqueue(packet);
  };

  /**
   * Unegister to a multicast group
   * @param groupName the multicast group name
   */
  private unRegisterGroup = (groupName: string) => {
    this.logger.debug(
      `[Multicast] Unregistration to the multicast group "${groupName}"`,
    );
    const packet = createWrappedMulticastUnregistrationPacket(groupName);
    this.enqueue(packet);
  };

  /**
   * This method is called when the adapter receive a package from the backend
   * stating that the backend is "paused" or not.
   * The backend can be "paused" (= not answering to any requests sent to it)
   * when the "legal popup" (a blocking modal) must be approved by the user.
   * During that time, any aborted request sent by the client and ignored for
   * this reason will be provided by the backend. In this function, it's the
   * "abortedRequests" property
   */
  private handleLegalPopup = ({
    shouldPauseQueue,
    abortedRequests,
  }: {
    shouldPauseQueue: boolean;
    abortedRequests?: Request[];
  }) => {
    const newQueueStatus = shouldPauseQueue
      ? EQueueStatus.paused
      : EQueueStatus.active;
    if (this.requestsQueueStatus$.getValue() !== newQueueStatus) {
      this.logger.info(`[Legal Popup] Backend status set to ${newQueueStatus}`);
      this.setQueueStatus(
        shouldPauseQueue ? EQueueStatus.paused : EQueueStatus.active,
      );
      // Clear the previous aborted requests list
      if (shouldPauseQueue) {
        this.abortedRequests = [];
      }
    }
    if (abortedRequests) {
      this.abortedRequests = [...this.abortedRequests, ...abortedRequests];
    }
  };
}

/**
 * This factory should **ONLY** be used in external apps for external websockets
 *
 * For any other use throughout the app, please use the default instance which
 * is exported by `@g1-network` and which is named `wsAdapter`
 */
export const createWSAdapter = ({
  disposable = false,
}: {
  disposable?: boolean;
} = {}) => new WSAdapter({ disposable });

/** The singleton WSAdapter instance that must be used through all the app to
 * communicate with the backend */
export const wsAdapter = new WSAdapter();
