import { IDatasource, IGetRowsParams } from '@ag-grid-community/core';
import { CreateDataStreamResponse, FetchDataStreamInput, GqlService } from '@services/gql.service';
import { BehaviorSubject, firstValueFrom, Observable, Subject, Subscription } from 'rxjs';
import { MainQuery } from '../../layouts/main-layout/state/main.query';
import { batchPromises } from './batch-promises';
import { ApiService } from '@services/api.service';
import { AbstractControl, FormGroup } from '@angular/forms';
import { map } from 'rxjs/operators';

export enum ServerSideColumnFilterType {
  IsEqualTo = 'IsEqualTo',
  IsGreaterThan = 'IsGreaterThan',
  IsGreaterThanOrEqualTo = 'IsGreaterThanOrEqualTo',
  IsLessThan = 'IsLessThan',
  IsLessThanOrEqualTo = 'IsLessThanOrEqualTo',
  Contains = 'Contains',
  StartsWith = 'StartsWith',
  EndsWith = 'EndsWith',
}

const serverSideOperatorLookup: { [ft in ServerSideColumnFilterType]: string } = {
  [ServerSideColumnFilterType.IsEqualTo]: 'EQ',
  [ServerSideColumnFilterType.IsGreaterThan]: 'GT',
  [ServerSideColumnFilterType.IsGreaterThanOrEqualTo]: 'GTE',
  [ServerSideColumnFilterType.IsLessThan]: 'LT',
  [ServerSideColumnFilterType.IsLessThanOrEqualTo]: 'LTE',
  [ServerSideColumnFilterType.Contains]: 'CONTAINS',
  [ServerSideColumnFilterType.StartsWith]: 'STARTSWITH',
  [ServerSideColumnFilterType.EndsWith]: 'ENDSWITH',
};

export type ServerSideColumnFilterInfo<TData> = {
  column: keyof TData;
  type: ServerSideColumnFilterType;
  inputPropertyName: string;
  transformFunction?: (inputValue: unknown) => unknown;
};

export type ServerSideCustomFilterInfo = {
  targetPropertyName: string;
  inputPropertyName: string;
  transformFunction?: (inputValue: unknown) => unknown;
};

export type ServerSideFilterInfo<TData> =
  | ServerSideColumnFilterInfo<TData>
  | ServerSideCustomFilterInfo;

export type ServerSideSortOrder<TData> = {
  column: keyof TData;
  descending?: boolean;
};

export interface DatasourceOptions<TData> {
  untilDestroyedPipeOperator: (prev: Observable<unknown>) => Observable<unknown>;
  filters: ServerSideFilterInfo<TData>[];
  filterValues$: BehaviorSubject<Record<string, unknown>>;
  sortModel$: BehaviorSubject<Array<ServerSideSortOrder<TData>>>;
  refreshGridDebounceMilliseconds?: number; // defaults to 500
  disableAutoRefresh?: boolean; // true will prevent the datasource from triggering a refresh$ when trial or filters change
}

export interface FetchDataStreamResponse<TData> {
  total_rows: number;
  aggregation: TData;
  offset: number;
  items: Array<TData>;
}

export type FetchDataStreamFunction<TData> = (
  input: FetchDataStreamInput
) => Promise<GraphqlResponse<FetchDataStreamResponse<TData>>>;

export interface ProxyOptions<TData> {
  endpoint: string;
  fetchFunction: FetchDataStreamFunction<TData>;
  mainQuery: MainQuery;
  apiService: ApiService;
  gqlService: GqlService;
}

export interface ServerSideDatasourceOptions<TData>
  extends DatasourceOptions<TData>,
    ProxyOptions<TData> {}

// transformFunction
export function convertLocalDateToIsoTimestamp(v: unknown): unknown {
  if (v && typeof v === 'string') {
    try {
      const dt = new Date(v);
      if (Number.isFinite(dt.getTime())) {
        dt.setMinutes(dt.getMinutes() + new Date().getTimezoneOffset());
        return dt.toISOString();
      }
    } catch (err) {
      console.warn(`Failed to convert local date to iso: ${v}`, err);
    }
  }
  return v;
}

// transformFunction
export function incrementDateValueByOneDay(v: unknown): unknown {
  if (v && typeof v === 'string') {
    try {
      const dt = new Date(v);
      if (Number.isFinite(dt.getTime())) {
        dt.setDate(dt.getDate() + 1);
        return dt.toISOString().split('T')[0];
      }
    } catch (err) {
      console.warn(`Failed to increment date: ${v}`, err);
    }
  }
  return v;
}

// form group initialization helper
export function applyFilterTransformationsToFormGroup<T>(
  formGroup: FormGroup | unknown,
  filters: Array<ServerSideFilterInfo<T>>
) {
  if (!(formGroup instanceof FormGroup)) {
    throw new Error('formGroup is required');
  }

  const currentValue: Record<string, unknown> = formGroup.value || {};
  let valueChangesObservable = formGroup.valueChanges;
  const transformations: Array<{ propName: string; transformFunction: (v: unknown) => unknown }> =
    [];

  if (Array.isArray(filters)) {
    filters.forEach((filterInfo) => {
      if (filterInfo && typeof filterInfo === 'object' && filterInfo.inputPropertyName) {
        if (typeof filterInfo.transformFunction === 'function') {
          const ctl = formGroup.get(filterInfo.inputPropertyName);
          if (ctl instanceof AbstractControl) {
            currentValue[filterInfo.inputPropertyName] = filterInfo.transformFunction(ctl.value);
          }
          transformations.push({
            propName: filterInfo.inputPropertyName,
            transformFunction: filterInfo.transformFunction,
          });
        }
      }
    });
  }

  if (transformations.length > 0) {
    valueChangesObservable = valueChangesObservable.pipe(
      map((formValue) => {
        const result = formValue || {};
        for (const transformation of transformations) {
          result[transformation.propName] = transformation.transformFunction(
            result[transformation.propName]
          );
        }
        return result;
      })
    );
  }

  return {
    currentValue,
    valueChangesObservable,
  };
}

// datasource implementation
class ServerSideDatasourceInner<TData> {
  private subscriptions?: {
    trialChangeSubscription?: Subscription;
    filterValuesSubscription?: Subscription;
    sortModelSubscription?: Subscription;
  } = undefined;

  private options = {} as ServerSideDatasourceOptions<TData>;

  private destroyed = false;

  private isGetRowsRunning = false;

  private pendingRefreshGrid = false;

  private waitUntilTimestampBeforeInitiatingRefresh = 0;

  private currentStreamId = '';

  private currentStreamRowCount = -1;

  private currentStreamAggregationPublished = false;

  private requestTrialId = '';

  private requestModel: Record<string, unknown> = {};

  private requestModelSignature = '';

  private sortModel: Array<unknown> = [];

  private sortModelSignature = '';

  loading$ = new BehaviorSubject<boolean>(false);

  totalRows$ = new BehaviorSubject<number>(0);

  aggregation$ = new BehaviorSubject<TData>({} as TData);

  refresh$ = new Subject<void>();

  private ensureInitialized(): void {
    if (!this.subscriptions) {
      throw new Error('Datasource not initialized');
    }
    if (this.destroyed) {
      throw new Error('Datasource has been destroyed');
    }
  }

  initialize(options: ServerSideDatasourceOptions<TData>) {
    if (this.destroyed) {
      throw new Error('Datasource has been destroyed');
    }
    if (this.subscriptions) {
      throw new Error('Datasource already initialized');
    }

    if (!options || typeof options !== 'object') {
      throw new Error('options is required');
    }

    this.options = options;

    // note that sort model changes do not trigger a refresh (because ag-grid auto refreshes itself when sort order changes)
    // filters and trial will trigger a refresh automatically (unless options.disableAutoRefresh is set to true)
    const trialChangeSubscription = options.mainQuery
      .select('trialKey')
      .subscribe(() => this.applyTrialChange());

    const filterValuesSubscription = options.filterValues$.subscribe(() =>
      this.applyFilterValues()
    );

    const sortModelSubscription = options.sortModel$.subscribe(() => this.applySortChanges());

    this.applyTrialChange();
    this.applyFilterValues();
    this.applySortChanges();

    this.subscriptions = {
      trialChangeSubscription,
      filterValuesSubscription,
      sortModelSubscription,
    };
  }

  private async disposeRemoteResources(): Promise<void> {
    const maxWaitTimestamp = new Date().getTime() + 60 * 1000;
    while (this.isGetRowsRunning && new Date().getTime() <= maxWaitTimestamp) {
      // eslint-disable-next-line @typescript-eslint/no-loop-func
      await new Promise<void>((res) => setTimeout(res, 10));
    }
    this.removeCurrentStream(true);
  }

  destroy(): void {
    if (this.destroyed) {
      return;
    }
    this.destroyed = true;
    if (this.subscriptions?.trialChangeSubscription) {
      this.subscriptions.trialChangeSubscription.unsubscribe();
    }
    if (this.subscriptions?.filterValuesSubscription) {
      this.subscriptions.filterValuesSubscription.unsubscribe();
    }
    if (this.subscriptions?.sortModelSubscription) {
      this.subscriptions.sortModelSubscription.unsubscribe();
    }
    this.loading$.complete();
    this.totalRows$.complete();
    this.aggregation$.complete();
    this.refresh$.complete();
    this.disposeRemoteResources().catch((err) =>
      console.error('Failed to dispose remote resources', err)
    );
  }

  get rowCount(): number | undefined {
    return this.currentStreamId ? this.currentStreamRowCount : undefined;
  }

  getRows(params: IGetRowsParams): void {
    if (!this.subscriptions || this.destroyed || this.isGetRowsRunning) {
      params.failCallback();
      return;
    }

    this.isGetRowsRunning = true;
    this.loading$.next(true);

    // eslint-disable-next-line no-async-promise-executor
    new Promise<FetchDataStreamResponse<TData>>(async (resolve, reject) => {
      try {
        // create the stream
        if (!this.currentStreamId) {
          const createDataStreamResponse = await firstValueFrom(
            this.options.gqlService.createDataStream$(
              this.options.endpoint,
              false,
              JSON.stringify({
                ...this.requestModel,
                sort_model: this.sortModel,
              })
            )
          );
          if (createDataStreamResponse.success && createDataStreamResponse.data) {
            const sid = this.currentStreamId;
            if (sid) {
              firstValueFrom(this.options.gqlService.removeDataStream$(sid)).catch((err) => {
                console.error(`Failed to delete stream ${sid}: ${err}`);
              });
            }
            this.currentStreamId = createDataStreamResponse.data.stream_id;
            this.currentStreamRowCount = createDataStreamResponse.data.total_rows;
            this.totalRows$.next(this.currentStreamRowCount);
            this.currentStreamAggregationPublished = false;
          } else {
            reject(
              new Error(
                `Failed to create data stream: ${JSON.stringify(createDataStreamResponse.errors)}`
              )
            );
            return;
          }
        }

        let returnEmptyResult = false;
        if (
          this.currentStreamAggregationPublished &&
          params.startRow >= this.currentStreamRowCount
        ) {
          // aggregation already processed so at this time we don't need that field from the server
          // and, we know request is beyond eof so no rows will be returned (ag-grid does this when you click last page in pagination panel)
          returnEmptyResult = true;
        } else if (this.destroyed) {
          // this datasource has been destroyed while the remote stream was being created
          // so, the result we will return no longer matters
          returnEmptyResult = true;
        }

        // fetch the data
        if (returnEmptyResult) {
          resolve({
            total_rows: this.currentStreamRowCount,
            aggregation: {} as TData,
            offset: params.startRow,
            items: [],
          });
        } else {
          // perform fetch
          const input = {
            id: this.currentStreamId,
            limit: params.endRow - params.startRow,
            offset: params.startRow,
          };
          const fetchFunctionResponse = await this.options.fetchFunction({ ...input });
          if (fetchFunctionResponse.success && fetchFunctionResponse.data) {
            if (!this.currentStreamAggregationPublished && this.currentStreamId === input.id) {
              this.aggregation$.next((fetchFunctionResponse.data.aggregation || {}) as TData);
              this.currentStreamAggregationPublished = true;
            }
            resolve(fetchFunctionResponse.data);
          } else {
            reject(
              new Error(`Failed to fetch data: ${JSON.stringify(fetchFunctionResponse.errors)}`)
            );
          }
        }
      } catch (err) {
        reject(err);
      }
    })
      .then((data) => {
        try {
          params.successCallback(data.items, data.total_rows);
        } finally {
          this.isGetRowsRunning = false;
          this.loading$.next(false);
        }
      })
      .catch((err) => {
        try {
          console.error(`getRows failed: ${err}`);
          params.failCallback();
        } finally {
          this.isGetRowsRunning = false;
          this.loading$.next(false);
        }
      });
  }

  get currentServerInput(): Record<string, unknown> {
    return JSON.parse(
      JSON.stringify({
        ...this.requestModel,
        sort_model: this.sortModel,
      })
    );
  }

  forceRefresh(): void {
    this.ensureInitialized();
    this.removeCurrentStream();
  }

  async fetchAll(pageSize?: number): Promise<FetchDataStreamResponse<TData>> {
    this.ensureInitialized();

    let result: FetchDataStreamResponse<TData> | null = null;

    await this.withTempDataStream(false, async (streamInfo) => {
      const numWorkers = 8;
      const effectivePageSize =
        Number.isInteger(pageSize) && (pageSize || 0) > 0 ? pageSize || 0 : 250;
      const lastFrameRecordCount = streamInfo.total_rows % effectivePageSize;
      let frameCount = (streamInfo.total_rows - lastFrameRecordCount) / effectivePageSize;
      if (lastFrameRecordCount) {
        frameCount += 1;
      }
      if (!frameCount) {
        frameCount = 1; // for the sake of getting the aggregation for the empty stream
      }
      const inputs: Array<{ id: string; offset: number; limit: number }> = [];
      for (let frameIdx = 0; frameIdx < frameCount; frameIdx += 1) {
        inputs.push({
          id: streamInfo.stream_id,
          offset: frameIdx * effectivePageSize,
          limit: effectivePageSize,
        });
      }
      const fetchResponses = await batchPromises(
        inputs,
        (req) => this.options.fetchFunction(req),
        numWorkers
      );
      let items: Array<TData> = [];
      let aggregation: TData | undefined = undefined;
      fetchResponses.forEach((response) => {
        if (response instanceof Error) {
          throw response;
        }
        if (response.success && response.data) {
          if (response.data.total_rows !== streamInfo.total_rows) {
            throw new Error("Assertion failed - total row counts don't match");
          }
          if (!aggregation) {
            aggregation = response.data.aggregation;
          }
          items = items.concat(response.data.items);
        } else {
          throw new Error(`fetchAll failed to download a page: ${JSON.stringify(response.errors)}`);
        }
      });
      if (!aggregation) {
        throw new Error('Assertion failed - aggregation is missing');
      }
      if (items.length !== streamInfo.total_rows) {
        throw new Error(
          "Assertion failed - downloaded record count doesn't match the reported record count"
        );
      }
      result = {
        total_rows: items.length,
        aggregation,
        offset: 0,
        items,
      };
    });

    if (!result) {
      throw new Error('Assertion failed - fetchAll empty result');
    }

    return result;
  }

  async downloadAll(): Promise<FetchDataStreamResponse<TData>> {
    this.ensureInitialized();

    let result: FetchDataStreamResponse<TData> | null = null;

    await this.withTempDataStream(true, async (streamInfo) => {
      const contents = await this.options.apiService.getFileAsJson<FetchDataStreamResponse<TData>>(
        streamInfo.bucket_key || ''
      );
      if (!contents) {
        throw new Error('Failed to download data file');
      }
      if (
        contents.total_rows !== streamInfo.total_rows ||
        !Array.isArray(contents.items) ||
        contents.items.length !== streamInfo.total_rows ||
        !contents.aggregation
      ) {
        throw new Error('Assertion failed - downloaded data is inconsistent');
      }
      result = contents;
    });

    if (!result) {
      throw new Error('Assertion failed - downloadAll empty result');
    }

    return result;
  }

  private async withTempDataStream(
    zip: boolean,
    action: (response: CreateDataStreamResponse, input: Record<string, unknown>) => Promise<void>
  ): Promise<void> {
    const input_json = JSON.stringify({
      ...this.requestModel,
      sort_model: this.sortModel,
    });
    let stream_id = '';
    try {
      const response = await firstValueFrom(
        this.options.gqlService.createDataStream$(this.options.endpoint, zip, input_json)
      );
      if (response.success && response.data) {
        stream_id = response.data.stream_id;
        await action(response.data, JSON.parse(input_json));
      } else {
        throw new Error(`Failed to create a stream: ${JSON.stringify(response.errors)}`);
      }
    } finally {
      if (stream_id) {
        firstValueFrom(this.options.gqlService.removeDataStream$(stream_id)).catch((err) =>
          console.error(`Failed to delete stream ${stream_id}: ${err}`)
        );
      }
    }
  }

  private applyTrialChange(): void {
    const currentTrialId = this.options.mainQuery.getValue().trialKey || '';
    if (currentTrialId !== this.requestTrialId) {
      this.requestTrialId = currentTrialId;
      this.removeCurrentStream(this.options.disableAutoRefresh);
    }
  }

  private applyFilterValues(): void {
    let filterValues = this.options.filterValues$.getValue();
    if (!filterValues || typeof filterValues !== 'object') {
      filterValues = {};
    }

    function getFilterArgs(propName: string): Array<unknown> {
      const filterValue = filterValues[propName];
      const args = Array.isArray(filterValue) ? filterValue : [filterValue];
      return args.filter((v) => v !== undefined && v !== null && v !== '');
    }

    const activeFilters: Array<[string, string, Array<unknown>]> = [];

    if (Array.isArray(this.options.filters)) {
      for (const filterInfo of this.options.filters) {
        if (filterInfo && typeof filterInfo === 'object') {
          let column = '';
          let op = '';
          let args: Array<unknown> = [];

          const columnFilterInfo = filterInfo as ServerSideColumnFilterInfo<TData>;
          if (columnFilterInfo.column) {
            if (serverSideOperatorLookup[columnFilterInfo.type]) {
              column = String(columnFilterInfo.column);
              op = serverSideOperatorLookup[columnFilterInfo.type];
              args = getFilterArgs(columnFilterInfo.inputPropertyName);
            }
          } else {
            const customFilterInfo = filterInfo as ServerSideCustomFilterInfo;
            if (customFilterInfo.targetPropertyName) {
              column = customFilterInfo.targetPropertyName;
              args = getFilterArgs(customFilterInfo.inputPropertyName);
            }
          }

          if (column && (!op || args.length)) {
            if (
              op === serverSideOperatorLookup[ServerSideColumnFilterType.IsEqualTo] &&
              args.length > 1
            ) {
              op = 'IN';
            }
            activeFilters.push([column, op, args]);
          }
        }
      }
    }

    activeFilters.sort((a, b) => {
      const cmp = (a[0] || '').localeCompare(b[0] || '');
      if (cmp) {
        return cmp;
      }
      return (a[1] || '').localeCompare(b[1] || '');
    });

    const signature = JSON.stringify(activeFilters);
    if (signature !== this.requestModelSignature) {
      const model = {} as Record<string, unknown>;
      const serverFilters: Array<unknown> = [];
      activeFilters.forEach(([column, op, values]) => {
        if (op) {
          // column filter
          serverFilters.push({
            column,
            op,
            values,
          });
        } else {
          // custom filter
          model[column] = values[0] !== undefined ? values[0] : null;
        }
      });
      model.filter_model = serverFilters;
      this.requestModel = model;
      this.requestModelSignature = signature;
      this.removeCurrentStream(this.options.disableAutoRefresh);
    }
  }

  private applySortChanges(): void {
    let model = this.options.sortModel$.getValue();
    if (!Array.isArray(model)) {
      model = [];
    }

    const signature = JSON.stringify(model);
    if (signature !== this.sortModelSignature) {
      this.sortModel = model;
      this.sortModelSignature = signature;
      // do a soft/fake refresh here because ag-grid is automatically calling getRows (i.e. refreshing itself)
      this.removeCurrentStream(true);
    }
  }

  private removeCurrentStream(softRefreshGrid = false): void {
    const sid = this.currentStreamId;
    if (sid) {
      firstValueFrom(this.options.gqlService.removeDataStream$(sid)).catch((err) => {
        console.error(`Failed to remove stream '${sid}': ${err}`);
      });
    }
    this.currentStreamId = '';
    this.currentStreamRowCount = 0;
    this.totalRows$.next(this.currentStreamRowCount);
    this.refreshGrid(softRefreshGrid).catch((err) => {
      console.error('Failed to refresh grid', err);
    });
  }

  private refreshGrid(soft: boolean): Promise<void> {
    let debounceMilliseconds = 500;
    if (
      Number.isFinite(this.options.refreshGridDebounceMilliseconds) &&
      (this.options.refreshGridDebounceMilliseconds || 0) >= 0
    ) {
      debounceMilliseconds = this.options.refreshGridDebounceMilliseconds || 0;
    }
    this.waitUntilTimestampBeforeInitiatingRefresh = new Date().getTime() + debounceMilliseconds;

    if (soft || this.pendingRefreshGrid) {
      return Promise.resolve();
    }

    this.pendingRefreshGrid = true;

    // eslint-disable-next-line no-async-promise-executor
    return new Promise<void>(async (resolve, reject) => {
      let err: unknown = undefined;
      try {
        // wait a bit for things to settle down (i.e. during form resets)
        while (new Date().getTime() <= this.waitUntilTimestampBeforeInitiatingRefresh) {
          // eslint-disable-next-line @typescript-eslint/no-loop-func
          await new Promise<void>((res) => setTimeout(res, 10));
        }
        // wait for the completion of the data request currently in progress
        const stopAfterTimestamp = new Date().getTime() + 45 * 1000;
        while (this.isGetRowsRunning && new Date().getTime() <= stopAfterTimestamp) {
          // eslint-disable-next-line @typescript-eslint/no-loop-func
          await new Promise<void>((res) => setTimeout(res, 10));
        }
      } catch (ex) {
        err = ex;
      } finally {
        try {
          this.pendingRefreshGrid = false;
          if (err) {
            reject(err);
          } else if (this.isGetRowsRunning) {
            reject('Wait getRows timed out');
          } else {
            resolve();
            this.refresh$.next();
          }
        } catch (e) {
          console.error(e);
        }
      }
    });
  }
}

//proxy class (for making the datasource implementation injectable as a property of a service)
export class ServerSideDatasource<TData> implements IDatasource {
  private proxyOptions = {} as ProxyOptions<TData>;

  private innerDatasource?: ServerSideDatasourceInner<TData> = undefined;

  private currentSubscriptions?: {
    untilDestroyedSubscription: Subscription;
    loadingSubscription: Subscription;
    totalRowsSubscription: Subscription;
    aggregationSubscription: Subscription;
    refreshSubscription: Subscription;
  } = undefined;

  private _empty$ = new Subject<void>();

  private _loading$ = new BehaviorSubject<boolean>(false);

  private _totalRows$ = new BehaviorSubject<number>(0);

  private _aggregation$ = new BehaviorSubject<TData>({} as TData);

  private _refresh$ = new Subject<void>();

  /**
   * Signals the state of the datasource. false means the datasource has become idle. true means it has started a backend request.
   */
  readonly loading$ = this._loading$.asObservable();

  /**
   * Gets the current state of the datasource. false means the datasource is idle. true means it is in the middle of a backend request.
   * undefined means datasource hasn't been initialized yet.
   */
  get loading(): boolean | undefined {
    return this.innerDatasource ? this._loading$.getValue() : undefined;
  }

  /**
   * Signals the number of records.
   */
  readonly totalRows$ = this._totalRows$.asObservable();

  /**
   * Gets the current number of records. undefined means datasource hasn't been initialized yet.
   */
  get totalRows(): number | undefined {
    return this.innerDatasource ? this._totalRows$.getValue() : undefined;
  }

  /**
   * Signals the value of the aggregation.
   */
  readonly aggregation$ = this._aggregation$.asObservable();

  /**
   * Gets the current value of the aggregation. undefined means datasource hasn't been initialized yet.
   */
  get aggregation(): TData | undefined {
    return this.innerDatasource ? this._aggregation$.getValue() : undefined;
  }

  /**
   * Signals that parent grid must refresh its data.
   */
  readonly refresh$ = this._refresh$.asObservable();

  constructor(options: ProxyOptions<TData>) {
    if (!options || typeof options !== 'object') {
      throw new Error('options is required');
    }
    this.proxyOptions = options;
  }

  private ensureInnerDatasource() {
    if (!this.innerDatasource) {
      throw new Error('Not initialized');
    }
    return this.innerDatasource;
  }

  /**
   * Gets the current value of the "input" argument that must be sent to the backend to retrieve data specified by
   * the current state of the applied filters, sort order etc.
   */
  get currentServerInput(): Record<string, unknown> | undefined {
    return this.innerDatasource?.currentServerInput;
  }

  /**
   * Initializes the datasource for use of the calling component. Note that this will immediately trigger a refresh. Hence,
   * the best place to call this method is from within your GridReady event handler (so that your grid can successfully handle
   * the refresh signal).
   */
  initialize(options: DatasourceOptions<TData>): void {
    if (this.currentSubscriptions) {
      this.currentSubscriptions.untilDestroyedSubscription.unsubscribe();
      this.currentSubscriptions.loadingSubscription.unsubscribe();
      this.currentSubscriptions.totalRowsSubscription.unsubscribe();
      this.currentSubscriptions.aggregationSubscription.unsubscribe();
      this.currentSubscriptions.refreshSubscription.unsubscribe();
      this.currentSubscriptions = undefined;
    }
    if (this.innerDatasource) {
      this.innerDatasource.destroy();
      this.innerDatasource = undefined;
    }
    if (
      options &&
      typeof options === 'object' &&
      typeof options.untilDestroyedPipeOperator === 'function'
    ) {
      this.innerDatasource = new ServerSideDatasourceInner<TData>();
      this.innerDatasource.initialize({
        ...options,
        ...this.proxyOptions,
        untilDestroyedPipeOperator: (prev) => {
          if (prev) {
            throw new Error('Assertion failed: inner datasource tried to use the pipe operator');
          }
          return prev;
        },
      });
      this.currentSubscriptions = {
        untilDestroyedSubscription: this._empty$
          .pipe(options.untilDestroyedPipeOperator)
          .subscribe({
            complete: () => {
              // deferred operation in case pipe operator is already in 'parent component destroyed' mode
              setTimeout(() => {
                this.initialize({} as ServerSideDatasourceOptions<TData>);
              }, 0);
            },
          }),
        loadingSubscription: this.innerDatasource.loading$.subscribe((v) => {
          this._loading$.next(v);
        }),
        totalRowsSubscription: this.innerDatasource.totalRows$.subscribe((v) => {
          this._totalRows$.next(v);
        }),
        aggregationSubscription: this.innerDatasource.aggregation$.subscribe((v) => {
          this._aggregation$.next(v);
        }),
        refreshSubscription: this.innerDatasource.refresh$.subscribe(() => {
          this._refresh$.next();
        }),
      };
    }
  }

  /**
   * Triggers a refresh.
   */
  forceRefresh(): void {
    this.innerDatasource?.forceRefresh();
  }

  /**
   * Retrieves the whole stream by downloading a single gzip-encoded json file.
   * Does NOT work with 'forAppSyncOnly' endpoints!
   * Prefer this method over 'fetchAll' if your endpoint allows it.
   */
  downloadAll(): Promise<FetchDataStreamResponse<TData>> {
    return this.ensureInnerDatasource().downloadAll();
  }

  /**
   * Retrieves the whole stream by fetching it page by page (the default page size is 250).
   * Therefore, the 'fetchAll' method is inefficient compared to the 'downloadAll' method.
   * Yet, 'fetchAll' is the only way to get all the data for 'forAppSyncOnly' endpoints.
   * This is because results from a 'forAppSyncOnly' endpoint has to pass through the backend graphql resolver pipeline.
   */
  fetchAll(pageSize?: number): Promise<FetchDataStreamResponse<TData>> {
    return this.ensureInnerDatasource().fetchAll(pageSize);
  }

  /**
   * ag-grid IDatasource implementation.
   */
  get rowCount(): number | undefined {
    return this.innerDatasource?.rowCount;
  }

  /**
   * ag-grid IDatasource implementation.
   */
  getRows(params: IGetRowsParams): void {
    if (this.innerDatasource) {
      this.innerDatasource.getRows(params);
    } else {
      params.failCallback();
    }
  }
}
