import {
  InfiniteData,
  QueryClient,
  useMutation,
  useQueries,
  useQuery,
  useQueryClient,
} from "@tanstack/react-query";
import { AxiosError } from "axios";
import { differenceInMinutes, differenceInSeconds } from "date-fns";
import { produce } from "immer";
import { maxBy } from "lodash";

import { ResourceSamplesEndpoint } from "src/api/connectApi/endpoints";
import { ResourceSample } from "src/api/connectApi/types";
import { DecisionsOutcomeFilter } from "src/api/decisionHistoryV2/decisionHistoryQueries";
import {
  ExporterDatasetJobsEndpoint,
  ExporterDatasetsEndpoint,
  ExporterHistoryEndpoint,
} from "src/api/endpoints";
import {
  CreateDatasetFromHistory,
  CreateDatasetFromScratch,
  FlowVersionT,
} from "src/api/flowTypes";
import * as s3Api from "src/api/s3";
import {
  Dataset,
  DatasetColumn,
  DatasetColumnGroupToRowDataGroupMap,
  DatasetColumnGroups,
  DatasetFileUpload,
  DatasetPage,
  DatasetRow,
  DesiredType,
  CreateDuplicateDatasetJob,
  DatasetJob,
  CreateDownloadDatasetJob,
  DownloadDatasetJob,
  DatasetPurpose,
} from "src/api/types";
import { JSONValue } from "src/datasets/DatasetTable/types";
import { SUB_COLUMN_SEPARATOR } from "src/datasets/DatasetTable/utils";
import { HistoryDataResponse } from "src/datasets/api/types";
import {
  DATASET_JOB_TIMEOUT,
  DatasetIntegrationNode,
  useInputColumnsFromSchema,
  useOutcomeColumnsFromOutcomeTypes,
} from "src/datasets/utils";
import { toastActions } from "src/design-system/Toast/utils";
import { queryClient } from "src/queryClient";
import { useCurrentUserId } from "src/store/AuthStore";
import { logger } from "src/utils/logger";

export const datasetJobsKey = {
  all: ["datasetJobs"] as const,
  detail: (baseUrl: string, flowId: string) =>
    [...datasetJobsKey.all, baseUrl, flowId] as const,
};

const updateDatasetInCache = (
  baseUrl: string,
  flowId: string,
  dataset: Dataset,
  queryClient: QueryClient,
) => {
  queryClient.setQueryData<Dataset[]>(
    ["datasets", baseUrl, flowId],
    (datasets) => {
      if (datasets) {
        const datasetIndex = datasets.findIndex((d) => d.id === dataset.id);
        return produce(datasets, (draft) => {
          draft[datasetIndex] = dataset;
          draft.sort(
            (a, b) =>
              new Date(b.updated_at).valueOf() -
              new Date(a.updated_at).valueOf(),
          );
        });
      }
    },
  );
};

const optimisticallyBumpUpdatedAt = (
  baseUrl: string,
  flowId: string,
  datasetId: string,
  queryClient: QueryClient,
  additionalUpdates?: (draft: Dataset) => void,
) => {
  queryClient.setQueryData<Dataset[]>(
    ["datasets", baseUrl, flowId],
    (datasets) => {
      if (datasets) {
        const datasetIndex = datasets.findIndex((d) => d.id === datasetId);
        return produce(datasets, (draft) => {
          draft[datasetIndex].updated_at = new Date().toISOString();
          additionalUpdates?.(draft[datasetIndex]);
          draft.sort(
            (a, b) =>
              new Date(b.updated_at).valueOf() -
              new Date(a.updated_at).valueOf(),
          );
        });
      }
    },
  );
};

export const useHistoryData = ({
  baseUrl,
  flowVersionIds,
  timeWindow,
  statusCodes,
  flowSlug,
  trafficPolicyId,
  subSamplingSize,
  outcomeFilters,
}: {
  baseUrl: string;
  flowVersionIds: string[];
  timeWindow: [string, string];
  statusCodes: string[];
  flowSlug: string;
  trafficPolicyId?: string;
  subSamplingSize?: number;
  outcomeFilters?: DecisionsOutcomeFilter[];
}) => {
  const flowVersionIdsKey = flowVersionIds.join(",");
  const timeWindowKey = timeWindow.join("-");
  const statusesKey = statusCodes.join(",");

  return useQuery<HistoryDataResponse, Error>({
    queryKey: [
      "historyData",
      flowVersionIdsKey,
      timeWindowKey,
      statusesKey,
      baseUrl,
      flowSlug,
      trafficPolicyId,
      subSamplingSize,
      outcomeFilters,
    ],
    queryFn: async () =>
      (
        await ExporterHistoryEndpoint.getDecisionHistoryPreview({
          flowVersionIds,
          timeWindow,
          statusCodes,
          flowSlug,
          baseUrl,
          trafficPolicyId,
          subSamplingSize,
          outcomeFilters,
        })
      ).data,
    enabled: flowVersionIds.length !== 0 && !!baseUrl,
    retry: (errorCount, error) => {
      return (
        errorCount < 3 &&
        !(error instanceof AxiosError && error.response?.status === 408)
      );
    },
  });
};

export type DatasetRowPatch = Partial<Omit<DatasetRow, "id">>;
export type DatasetRowPatchWithId = DatasetRowPatch & { id: string };

export const useCreateDatasetFromScratchMutation = (
  baseUrl: string | undefined,
) => {
  const queryClient = useQueryClient();
  return useMutation({
    mutationFn: (args: CreateDatasetFromScratch) => {
      if (!baseUrl) throw new Error("baseUrl must be defined");

      return ExporterDatasetsEndpoint.createDataset(baseUrl, args);
    },
    onSuccess: (dataset) => {
      queryClient.invalidateQueries({
        queryKey: ["datasets", baseUrl, dataset.flow_id],
      });
    },
  });
};

export const useDatasets = ({
  flowId,
  baseUrl,
}: {
  flowId: string;
  baseUrl?: string;
}) => {
  return useQuery<Dataset[], Error>({
    queryKey: ["datasets", baseUrl, flowId],
    queryFn: () => {
      if (baseUrl) {
        return ExporterDatasetsEndpoint.getDatasets(baseUrl, flowId);
      } else {
        return Promise.resolve([]);
      }
    },
  });
};

export const usePatchRow = (
  datasetId: string,
  flowId: string,
  baseUrl?: string,
) => {
  const queryClient = useQueryClient();

  const updateRowInPlace = (
    infiniteData: InfiniteData<DatasetPage>,
    { id, ...patch }: DatasetRowPatchWithId,
  ) => {
    for (let i = 0; i < infiniteData.pages.length; i++) {
      const currentPage = infiniteData.pages[i];
      for (let j = 0; j < currentPage.rows.length; j++) {
        const currentRow = currentPage.rows[j];
        if (currentRow.id === id) {
          currentPage.rows[j] = { ...currentRow, ...patch };
          return;
        }
      }
    }
    throw new Error("Couldn't find the row to update");
  };

  return useMutation({
    mutationFn: ({ id, ...patch }: DatasetRowPatchWithId) => {
      if (!baseUrl) throw new Error("baseUrl must be defined");

      return ExporterDatasetsEndpoint.patchRow(baseUrl, datasetId, id, patch);
    },
    onMutate: async (rowPatch: DatasetRowPatchWithId) => {
      // Cancel any outgoing refetches (so they don't overwrite our optimistic update)
      await queryClient.cancelQueries({
        queryKey: ["datasetRows", datasetId],
      });

      // Snapshot the previous value
      const oldDatasetInifiniteData = queryClient.getQueryData<
        InfiniteData<DatasetPage>
      >(["datasetRows", datasetId]);

      if (oldDatasetInifiniteData) {
        const updatedInifiniteData = produce(
          oldDatasetInifiniteData,
          (draft) => {
            // Optimistically update to the new value
            updateRowInPlace(draft, rowPatch);
          },
        );

        queryClient.setQueryData(
          ["datasetRows", datasetId],
          updatedInifiniteData,
        );
        optimisticallyBumpUpdatedAt(baseUrl!, flowId, datasetId, queryClient);
        return { oldDatasetInifiniteData };
      }
    },
    onError: (_, __, context) => {
      if (context) {
        queryClient.setQueryData(
          ["datasetRows", datasetId],
          context.oldDatasetInifiniteData,
        );
      }
    },
  });
};

export const useDeleteDataset = (baseUrl?: string) => {
  const queryClient = useQueryClient();

  return useMutation({
    mutationFn: (datasetId: string) => {
      if (!baseUrl) throw new Error("baseUrl must be defined");

      return ExporterDatasetsEndpoint.deleteDataset(baseUrl, datasetId);
    },
    onSettled: () => {
      queryClient.invalidateQueries({
        queryKey: ["datasets", baseUrl],
      });
    },
  });
};

export type DeleteColumnPayload = {
  name: string;
  group: DatasetColumnGroups;
};

export const useDeleteColumn = (
  datasetId: string,
  flowId: string,
  baseUrl?: string,
) => {
  const queryClient = useQueryClient();

  return useMutation({
    mutationFn: async ({ name, group }: DeleteColumnPayload) => {
      if (!baseUrl) throw new Error("baseUrl must be defined");

      const result = await ExporterDatasetsEndpoint.deleteColumn(
        baseUrl,
        datasetId,
        group,
        name,
      );
      updateDatasetInCache(baseUrl, flowId, result, queryClient);

      return result;
    },
    onMutate: async ({ name, group }: DeleteColumnPayload) => {
      // Cancel any outgoing refetches (so they don't overwrite our optimistic update)
      await queryClient.cancelQueries({
        queryKey: ["datasetRows", datasetId],
      });

      const oldDatasets = queryClient.getQueryData<Dataset[]>([
        "datasets",
        baseUrl,
        flowId,
      ])!;

      const updatedDatasets = produce(oldDatasets, (draft) => {
        const dataset = draft.find((d) => d.id === datasetId);
        if (dataset) {
          const updatedDatasetColumnGroup = dataset[group].filter(
            (c) => c.name !== name,
          );
          dataset[group] = updatedDatasetColumnGroup;
        }
      });
      queryClient.setQueryData<Dataset[]>(
        ["datasets", baseUrl, flowId],
        updatedDatasets,
      );
      optimisticallyBumpUpdatedAt(baseUrl!, flowId, datasetId, queryClient);
      return { oldDatasets };
    },
    onError: (_, __, context) => {
      if (context) {
        // Rollback to the previous value
        queryClient.setQueryData<Dataset[]>(
          ["datasets", baseUrl, flowId],
          context.oldDatasets,
        );
      }
    },
  });
};

export const useUploadDataset = (
  baseUrl: string,
  flowId: string,
  inputColumnsToOverwrite: DatasetColumn[],
) => {
  return useMutation({
    mutationFn: async ({
      file,
      purpose,
    }: {
      file: File;
      purpose: DatasetPurpose;
    }) => {
      const dataFileUpload =
        await ExporterDatasetsEndpoint.createDatasetFileUpload(
          baseUrl,
          flowId,
          file.name,
          inputColumnsToOverwrite,
          purpose,
        );

      s3Api.uploadFile(
        {
          url: dataFileUpload.s3_presigned_url,
          fields: dataFileUpload.s3_presigned_fields,
        },
        file,
      );

      return dataFileUpload;
    },
  });
};

export const useDatasetFileUploads = (
  baseUrl: string,
  uploads: DatasetFileUpload[],
  onSettled?: (data: DatasetFileUpload) => void,
) => {
  return useQueries({
    queries: uploads.map(({ id: uploadId }) => ({
      queryKey: ["datasetFileUpload", uploadId],
      queryFn: () =>
        ExporterDatasetsEndpoint.getDatasetFileUpload(baseUrl, uploadId),
      enabled: !!uploadId && !!baseUrl,
      onSuccess: (data: any) => {
        if ((data as DatasetFileUpload).status === "COMPLETED") {
          queryClient.invalidateQueries({
            queryKey: ["datasets", baseUrl],
          });
        }
      },
      onSettled,
      refetchInterval: (datasetFileUpload: DatasetFileUpload) => {
        if (datasetFileUpload?.status === "PENDING") {
          return 5 * 1_000;
        }

        return false;
      },
    })),
  });
};

export type RenameColumnPayload = {
  oldName: string;
  newName: string;
  desiredType: DesiredType;
  group: DatasetColumnGroups;
  hasSubflowMocks: boolean;
};
export const useRenameColumn = (
  datasetId: string,
  flowId: string,
  baseUrl?: string,
) => {
  const queryClient = useQueryClient();

  return useMutation({
    mutationFn: async ({
      oldName,
      newName,
      group,
      desiredType,
      hasSubflowMocks,
    }: RenameColumnPayload) => {
      if (!baseUrl) throw new Error("baseUrl must be defined");

      const result = await ExporterDatasetsEndpoint.putColumn(
        baseUrl,
        datasetId,
        group,
        newName,
        desiredType,
        oldName,
        hasSubflowMocks,
      );
      updateDatasetInCache(baseUrl, flowId, result, queryClient);

      return result;
    },
    onMutate: async ({ oldName, newName, group }: RenameColumnPayload) => {
      // Cancel any outgoing refetches (so they don't overwrite our optimistic update)
      await queryClient.cancelQueries({
        queryKey: ["datasetRows", datasetId],
      });

      const oldDatasetInifiniteData = queryClient.getQueryData<
        InfiniteData<DatasetPage>
      >(["datasetRows", datasetId]);

      const oldDatasets = queryClient.getQueryData<Dataset[]>([
        "datasets",
        baseUrl,
        flowId,
      ])!;

      const datasetIsFound = oldDatasets.find((d) => d.id === datasetId);

      if (!datasetIsFound || !oldDatasetInifiniteData) return;

      const updatedDatasets = produce(oldDatasets, (draft) => {
        const dataset = draft.find((d) => d.id === datasetId)!;
        const columnToRename = dataset[group].find((c) => c.name === oldName);
        const oldParentName = `${oldName}${SUB_COLUMN_SEPARATOR}`;
        const childColumnsToRename = dataset[group].filter((c) =>
          c.name.startsWith(oldParentName),
        );

        if (!columnToRename) {
          logger.error(
            `Could not find column with name ${oldName} in group ${group} to update optimistically`,
          );
          return;
        }

        columnToRename.name = newName;

        for (const childColumn of childColumnsToRename) {
          const newChildName = childColumn.name.replace(
            oldParentName,
            `${newName}${SUB_COLUMN_SEPARATOR}`,
          );
          childColumn.name = newChildName;
        }
      });

      const updatedInfiniteData = produce(oldDatasetInifiniteData, (draft) => {
        const rowDataGroupToCheck = DatasetColumnGroupToRowDataGroupMap[group];

        for (const page of draft.pages) {
          for (const row of page.rows) {
            const data = row[rowDataGroupToCheck];

            if (data && oldName in data) {
              data[newName] = data[oldName];
              delete data[oldName];

              const oldParentName = `${oldName}${SUB_COLUMN_SEPARATOR}`;
              const newParentName = `${newName}${SUB_COLUMN_SEPARATOR}`;

              for (const key in data) {
                if (key.startsWith(oldParentName)) {
                  const newKey = key.replace(oldParentName, newParentName);
                  data[newKey] = data[key];
                  delete data[key];
                }
              }
            }
          }
        }
      });

      // Optimistically update to the new values
      queryClient.setQueryData<InfiniteData<DatasetPage>>(
        ["datasetRows", datasetId],
        updatedInfiniteData,
      );
      queryClient.setQueryData<Dataset[]>(
        ["datasets", baseUrl, flowId],
        updatedDatasets,
      );

      optimisticallyBumpUpdatedAt(baseUrl!, flowId, datasetId, queryClient);
      // Return a context object with the snapshotted value
      return { oldDatasets, oldDatasetInifiniteData };
    },
    onError: (_, __, context) => {
      if (context) {
        // Rollback to the previous value
        queryClient.setQueryData<InfiniteData<DatasetPage>>(
          ["datasetRows", datasetId],
          context.oldDatasetInifiniteData,
        );
        queryClient.setQueryData<Dataset[]>(
          ["datasets", baseUrl, flowId],
          context.oldDatasets,
        );
      }
    },
  });
};

export type PutColumnPayload = {
  name: string;
  desiredType: DesiredType;
  group: DatasetColumnGroups;
  hasSubflowMocks: boolean;
};

export const usePutColumns = (
  datasetId: string,
  flowId: string,
  baseUrl?: string,
) => {
  const queryClient = useQueryClient();

  return useMutation({
    mutationFn: async (columns: PutColumnPayload[]) => {
      if (!baseUrl) throw new Error("baseUrl must be defined");
      const responses = [];

      for (const { name, group, desiredType, hasSubflowMocks } of columns) {
        responses.push(
          await ExporterDatasetsEndpoint.putColumn(
            baseUrl,
            datasetId,
            group,
            name,
            desiredType,
            undefined,
            hasSubflowMocks,
          ),
        );
      }

      const lastUpdate = responses.at(-1);
      if (lastUpdate) {
        updateDatasetInCache(baseUrl, flowId, lastUpdate, queryClient);
      }

      return responses;
    },
    onMutate: async (columns: PutColumnPayload[]) => {
      // Cancel any outgoing refetches (so they don't overwrite our optimistic update)
      await queryClient.cancelQueries({
        queryKey: ["datasetRows", datasetId],
      });

      const oldDatasets = queryClient.getQueryData<Dataset[]>([
        "datasets",
        baseUrl,
        flowId,
      ])!;

      const updatedDatasets = produce(oldDatasets, (draft) => {
        const dataset = draft.find((d) => d.id === datasetId);

        if (dataset) {
          for (const { name, group, desiredType, hasSubflowMocks } of columns) {
            const columns = dataset[group];

            const columnToReplaceIndex = columns.findIndex(
              (c) => c.name === name,
            );

            const columnToPut = {
              desired_type: desiredType,
              name,
              use_subflow_mocks: hasSubflowMocks,
            };

            if (columnToReplaceIndex === -1) {
              columns.push(columnToPut);
            } else {
              columns[columnToReplaceIndex] = columnToPut;
            }
          }
        }
      });

      // Optimistically update to the new value
      queryClient.setQueryData<Dataset[]>(
        ["datasets", baseUrl, flowId],
        updatedDatasets,
      );

      optimisticallyBumpUpdatedAt(baseUrl!, flowId, datasetId, queryClient);
      // Return a context object with the snapshotted value
      return { oldDatasets };
    },
    onError: (_, __, context) => {
      if (context) {
        // Rollback to the previous value
        queryClient.setQueryData<Dataset[]>(
          ["datasets", baseUrl, flowId],
          context.oldDatasets,
        );
      }
    },
  });
};

export const useResourceSample = ({
  baseUrl,
  integrationNode,
  enabled = true,
}: {
  baseUrl: string;
  integrationNode: DatasetIntegrationNode;
  enabled?: boolean;
}) => {
  return useQuery<ResourceSample[]>({
    queryKey: [
      "resourceSample",
      baseUrl,
      integrationNode.provider,
      integrationNode.resource,
      ...(integrationNode.localSampleReports ? [integrationNode.id] : []),
    ],
    queryFn: async () => {
      try {
        if (integrationNode.localSampleReports || !integrationNode.resource) {
          return integrationNode.localSampleReports || [];
        }
        return await ResourceSamplesEndpoint.get(
          baseUrl,
          integrationNode.provider,
          integrationNode.resource,
        );
      } catch {
        logger.warn(
          "Request to fetch sample reports failed, falling back to base report",
        );
        return [
          {
            name: "Base",
            sample: {
              insights: {},
              data: {},
            },
          },
        ];
      }
    },
    enabled,
    cacheTime: integrationNode.localSampleReports ? 0 : undefined,
  });
};

export type DatasetRowsPost =
  | { source: "blank"; new_row_ids: string[] }
  | { source: "row_id"; row: DatasetRow; new_row_ids: string[] }
  | {
      source: "decision_id";
      decision_id: string;
      new_row_ids: string[];
    };

export type DatasetPatch = Partial<
  Pick<Dataset, "name" | "input_columns" | "mock_columns" | "output_columns">
>;

export type DatasetPatchPayload = {
  id: string;
  patch: DatasetPatch;
  etag?: string;
};

export const usePatchDataset = (flowId: string, baseUrl?: string) => {
  const queryClient = useQueryClient();

  return useMutation({
    mutationFn: async ({ id, patch, etag }: DatasetPatchPayload) => {
      if (!baseUrl) throw new Error("baseUrl must be defined");

      const result = await ExporterDatasetsEndpoint.patchDataset(
        baseUrl,
        id,
        patch,
        etag,
      );
      updateDatasetInCache(baseUrl, flowId, result, queryClient);

      return result;
    },
    onMutate: async ({ id, patch }: DatasetPatchPayload) => {
      // Cancel any outgoing refetches (so they don't overwrite our optimistic update)
      await queryClient.cancelQueries({
        queryKey: ["datasets", baseUrl, flowId],
      });

      // Snapshot the previous value
      const oldDatasets = queryClient.getQueryData<Dataset[]>([
        "datasets",
        baseUrl,
        flowId,
      ]);

      queryClient.setQueryData<Dataset[]>(
        ["datasets", baseUrl, flowId],
        (datasets) => {
          if (datasets) {
            // Optimistically update to the new value
            return produce(datasets, (draft) => {
              const dataset = draft.find((d) => d.id === id);

              if (dataset) {
                Object.assign(dataset, patch);
              }
            });
          }
        },
      );

      optimisticallyBumpUpdatedAt(baseUrl!, flowId, id, queryClient);
      return { oldDatasets };
    },
    onError: (_, __, context) => {
      if (context) {
        queryClient.setQueryData<Dataset[]>(
          ["datasets", baseUrl, flowId],
          context.oldDatasets,
        );
      }
    },
  });
};

export const usePostRows = (
  datasetId: string,
  flowId: string,
  baseUrl?: string,
) => {
  const queryClient = useQueryClient();

  return useMutation({
    mutationFn: (payload: DatasetRowsPost) => {
      if (!baseUrl) throw new Error("baseUrl must be defined");
      return ExporterDatasetsEndpoint.postRows(
        baseUrl,
        datasetId,
        payload.source === "row_id"
          ? { ...payload, row_id: payload.row.id }
          : payload,
      );
    },
    onMutate: async (payload: DatasetRowsPost) => {
      // Cancel any outgoing refetches (so they don't overwrite our optimistic update)
      await queryClient.cancelQueries({
        queryKey: ["datasetRows", datasetId],
      });

      // Snapshot the previous value
      const oldDatasetInifiniteData = queryClient.getQueryData<
        InfiniteData<DatasetPage>
      >(["datasetRows", datasetId]);

      if (oldDatasetInifiniteData && payload.source !== "decision_id") {
        const updatedInifiniteData = produce(
          oldDatasetInifiniteData,
          (draft) => {
            // Optimistically update to the new value
            let lastPage = draft.pages.at(-1);

            if (lastPage) {
              // Only if the existing last page is really the last page of dataset
              if (lastPage.size + lastPage.from_ >= lastPage.total) {
                const input_data =
                  payload.source === "row_id" ? payload.row.input_data : {};
                const output_data =
                  payload.source === "row_id"
                    ? payload.row.output_data
                    : undefined;
                const mock_data =
                  payload.source === "row_id"
                    ? payload.row.mock_data
                    : undefined;
                const outcome_data =
                  payload.source === "row_id"
                    ? payload.row.outcome_data
                    : undefined;

                for (const row_id of payload.new_row_ids) {
                  const lastPage = draft.pages.at(-1) as DatasetPage;
                  const newRow = {
                    id: row_id,
                    input_data,
                    output_data,
                    mock_data,
                    outcome_data,
                  };

                  if (lastPage && lastPage.rows.length < lastPage.size) {
                    lastPage.rows.push(newRow);
                  } else {
                    draft.pages.push({
                      from_: draft.pages.length * lastPage.size,
                      rows: [newRow],
                      size: lastPage.size,
                      total: lastPage.total + 1,
                      fetchedAt: Date.now(),
                    });
                  }
                }
              }

              lastPage.fetchedAt = Date.now();
              lastPage.total = lastPage.total + payload.new_row_ids.length;
            }
          },
        );

        queryClient.setQueryData(
          ["datasetRows", datasetId],
          updatedInifiniteData,
        );
        optimisticallyBumpUpdatedAt(
          baseUrl!,
          flowId,
          datasetId,
          queryClient,
          (draft) => {
            if (typeof draft.row_count === "number")
              draft.row_count = draft.row_count + payload.new_row_ids.length;
          },
        );
        return { oldDatasetInifiniteData };
      }
    },
    onError: (_, __, context) => {
      if (context) {
        queryClient.setQueryData(
          ["datasetRows", datasetId],
          context.oldDatasetInifiniteData,
        );
      }
    },
  });
};

export const useDeleteRowMutation = (
  datasetId: string,
  flowId: string,
  baseUrl?: string,
) => {
  const queryClient = useQueryClient();

  return useMutation({
    mutationFn: (rowId: string) => {
      if (!baseUrl) throw new Error("baseUrl must be defined");
      return ExporterDatasetsEndpoint.deleteRow(baseUrl, datasetId, rowId);
    },
    onMutate: async (rowId: string) => {
      // Cancel any outgoing refetches (so they don't overwrite our optimistic update)
      await queryClient.cancelQueries({
        queryKey: ["datasetRows", datasetId],
      });

      // Snapshot the previous value
      const oldDatasetInifiniteData = queryClient.getQueryData<
        InfiniteData<DatasetPage>
      >(["datasetRows", datasetId]);
      if (oldDatasetInifiniteData) {
        const updatedInifiniteData = produce(
          oldDatasetInifiniteData,
          (draft) => {
            if (draft.pages.length > 0) {
              // Optimistically update to the new value
              let rows = draft.pages.flatMap((p) => p.rows);
              const pageSize = draft.pages[0].size;
              const index = rows.findIndex((r) => r.id === rowId);
              rows.splice(index, 1);
              const updatePagesFromIndex = Math.floor(index / pageSize);

              for (let i = updatePagesFromIndex; i < draft.pages.length; i++) {
                const page = draft.pages[i];
                const from = i * pageSize;
                const to = from + pageSize;
                page.rows = rows.slice(from, to);
              }

              // Last updated page with the most actual data
              const lastUpdatedPage = maxBy(draft.pages, "fetchedAt");

              // Remove last page if it's empty now
              if (draft.pages.at(-1)?.rows.length === 0) {
                draft.pages.pop();
              }

              // Update last page with the most actual data
              const theLastPage = draft.pages.at(-1);
              if (theLastPage) {
                theLastPage.fetchedAt = Date.now();
                theLastPage.total = lastUpdatedPage
                  ? lastUpdatedPage.total - 1
                  : 0;
              }
            }

            return draft;
          },
        );

        queryClient.setQueryData(
          ["datasetRows", datasetId],
          updatedInifiniteData,
        );
        optimisticallyBumpUpdatedAt(
          baseUrl!,
          flowId,
          datasetId,
          queryClient,
          (draft) => {
            if (typeof draft.row_count === "number") draft.row_count--;
          },
        );
        return { oldDatasetInifiniteData };
      }
    },
    onError: (_, __, context) => {
      if (context) {
        queryClient.setQueryData(
          ["datasetRows", datasetId],
          context.oldDatasetInifiniteData,
        );
      }
    },
  });
};

export type FillColumnPayload = {
  name: string;
  group: DatasetColumnGroups;
  fillValue: JSONValue;
};

export const useFillColumn = (
  datasetId: string,
  flowId: string,
  baseUrl?: string,
) => {
  const queryClient = useQueryClient();

  return useMutation({
    mutationFn: (payload: FillColumnPayload) => {
      if (!baseUrl) throw new Error("baseUrl must be defined");
      return ExporterDatasetsEndpoint.fillColumn(
        baseUrl,
        datasetId,
        payload.group,
        payload.name,
        payload.fillValue,
      );
    },
    onMutate: async ({ fillValue, group, name }: FillColumnPayload) => {
      // Cancel any outgoing refetches (so they don't overwrite our optimistic update)
      await queryClient.cancelQueries({
        queryKey: ["datasetRows", datasetId],
      });

      const oldDatasetInifiniteData = queryClient.getQueryData<
        InfiniteData<DatasetPage>
      >(["datasetRows", datasetId]);

      if (!oldDatasetInifiniteData) return;

      const updatedInfiniteData = produce(oldDatasetInifiniteData, (draft) => {
        const rowDataGroupToCheck = DatasetColumnGroupToRowDataGroupMap[group];

        for (const page of draft.pages) {
          for (const row of page.rows) {
            const currentData = row[rowDataGroupToCheck];
            if (!currentData) {
              row[rowDataGroupToCheck] = { [name]: fillValue };
            } else {
              currentData[name] = fillValue;
            }
          }
        }
      });

      // Optimistically update to the new values
      queryClient.setQueryData<InfiniteData<DatasetPage>>(
        ["datasetRows", datasetId],
        updatedInfiniteData,
      );

      optimisticallyBumpUpdatedAt(baseUrl!, flowId, datasetId, queryClient);
      // Return a context object with the snapshotted value
      return { oldDatasetInifiniteData };
    },
    onError: (_, __, context) => {
      if (context) {
        // Rollback to the previous value
        queryClient.setQueryData<InfiniteData<DatasetPage>>(
          ["datasetRows", datasetId],
          context.oldDatasetInifiniteData,
        );
      }
    },
  });
};

const insertNewJobInCache = (
  baseUrl: string,
  newJob: DatasetJob,
  queryClient: QueryClient,
) => {
  const datasetJobsQueryKey = datasetJobsKey.detail(baseUrl, newJob.flow_id);
  const existingDatasetJobs =
    queryClient.getQueryData<DatasetJob[]>(datasetJobsQueryKey);
  if (!existingDatasetJobs) return;

  const newJobIsAlreadyPresent = existingDatasetJobs.find(
    (job) => job.id === newJob.id,
  );

  if (!newJobIsAlreadyPresent) {
    queryClient.setQueryData(datasetJobsQueryKey, [
      newJob,
      ...existingDatasetJobs,
    ]);
  }
};

export const useCreateAssembleDatasetJob = (
  baseUrl: string | undefined,
  version: FlowVersionT | undefined,
) => {
  const inputColumnsToOverwrite = useInputColumnsFromSchema(version);
  const outcomeDatasetColumns = useOutcomeColumnsFromOutcomeTypes();

  return useMutation({
    mutationFn: (
      args: Omit<
        CreateDatasetFromHistory,
        "input_columns_to_overwrite" | "outcome_columns"
      >,
    ) => {
      if (!baseUrl) throw new Error("baseUrl must be defined");

      return ExporterDatasetJobsEndpoint.createAssembleDatasetJob(baseUrl, {
        flow_id: args.flow_id,
        request: {
          ...args,
          input_columns_to_overwrite: inputColumnsToOverwrite,
          outcome_columns: outcomeDatasetColumns,
        },
      });
    },
    onSuccess: (newJob) => {
      insertNewJobInCache(baseUrl!, newJob, queryClient);
    },
  });
};

export const useCreateDuplicateDatasetJob = (baseUrl: string | undefined) => {
  const queryClient = useQueryClient();
  return useMutation({
    mutationFn: (args: CreateDuplicateDatasetJob) => {
      if (!baseUrl) throw new Error("baseUrl must be defined");

      return ExporterDatasetJobsEndpoint.createDuplicateDatasetJob(
        baseUrl,
        args,
      );
    },

    onSuccess: (newJob) => {
      insertNewJobInCache(baseUrl!, newJob, queryClient);
    },
  });
};

export const useCreateDownloadDatasetJob = (baseUrl: string | undefined) => {
  const queryClient = useQueryClient();
  return useMutation({
    mutationFn: (args: CreateDownloadDatasetJob) => {
      if (!baseUrl) throw new Error("baseUrl must be defined");

      return ExporterDatasetJobsEndpoint.createDownloadDatasetJob(
        baseUrl,
        args,
      );
    },

    onSuccess: (newJob) => {
      insertNewJobInCache(baseUrl!, newJob, queryClient);
    },
  });
};

export const useDatasetJobs = (baseUrl: string, flowId: string) => {
  const loggedUserId = useCurrentUserId();
  return useQuery({
    queryKey: datasetJobsKey.detail(baseUrl, flowId),
    queryFn: async () => {
      const jobs = await ExporterDatasetJobsEndpoint.getDatasetJobs({
        baseURL: baseUrl,
        flowId,
        limit: 100,
        offset: 0,
        statuses: ["FAILED", "COMPLETED", "PENDING"],
      });

      // We declare this function so we can react only once
      // even if the hook is used in multiple places,
      // React query's onSuccess gets called once per observer.
      // We react if any job has finished recently
      // by invalidating the datasets list (creation job)
      // or using the download link (download job)
      const onSuccess = async (data: DatasetJob[]) => {
        const recentlyFinishedJobs = data.filter(
          (job) =>
            job.status !== "PENDING" &&
            differenceInSeconds(new Date(), new Date(job.updated_at)) < 60,
        );
        const recentlyCompletedJobs = recentlyFinishedJobs.filter(
          (job) => job.status === "COMPLETED",
        );
        const someCreationJobsJustCompleted = recentlyCompletedJobs.some(
          (job) => job.type === "duplicate" || job.type === "assemble",
        );
        if (someCreationJobsJustCompleted) {
          queryClient.invalidateQueries({
            queryKey: ["datasets", baseUrl],
          });
        }

        const downloadJob = recentlyFinishedJobs.find(
          (job): job is DownloadDatasetJob =>
            job.type === "download" && job.created_by === loggedUserId,
        );
        if (downloadJob) {
          // Hidden dataset jobs don't get
          // included in the get endpoint
          await ExporterDatasetJobsEndpoint.hideDatasetJob(
            baseUrl,
            downloadJob.id,
          );
          const s3Url = downloadJob.response?.s3_url;

          if (s3Url) {
            toastActions.success({
              id: downloadJob.id,
              duration: Infinity,
              title: "Download ready",
              description: `Your ${downloadJob.request.format.toUpperCase()} file is ready for download.`,
              actionText: "Click here to download",
              onActionClick: () => window.open(s3Url, "_blank"),
            });
          } else if (downloadJob.status === "FAILED") {
            toastActions.failure({
              id: downloadJob.id,
              duration: Infinity,
              title: "Download failed",
              description: downloadJob.error?.message ?? DATASET_JOB_TIMEOUT,
            });
          }
        }
      };
      // We don't block the query function's
      // return on purpose
      onSuccess(jobs);
      return jobs;
    },
    // If there are pending jobs created less than 15 minutes
    // ago we keep polling to update the progress
    refetchInterval: (data) => {
      if (data) {
        return data.some(
          (job) =>
            job.status === "PENDING" &&
            differenceInMinutes(new Date(), new Date(job.created_at)) < 15,
        )
          ? 5 * 1000
          : false;
      }
      return false;
    },
  });
};

export const useHideDatasetJob = (baseUrl: string, flowId: string) => {
  const queryClient = useQueryClient();
  return useMutation({
    mutationFn: (id: string) => {
      if (!baseUrl) throw new Error("baseUrl must be defined");

      return ExporterDatasetJobsEndpoint.hideDatasetJob(baseUrl, id);
    },

    onMutate: (id) => {
      const existingDatasetJobs = queryClient.getQueryData<DatasetJob[]>(
        datasetJobsKey.detail(baseUrl, flowId),
      );
      if (!existingDatasetJobs) return;

      const filteredJobs = existingDatasetJobs.filter((job) => job.id !== id);

      queryClient.setQueryData(
        datasetJobsKey.detail(baseUrl, flowId),
        filteredJobs,
      );
    },
  });
};
