import {
  InfiniteData,
  QueryObserverOptions,
  useInfiniteQuery,
  useMutation,
  useQuery,
  useQueryClient,
} from "@tanstack/react-query";
import { AxiosError } from "axios";
import { produce } from "immer";
import { keyBy } from "lodash";
import { useCallback } from "react";

import { loadVersion } from "src/api";
import { jobsApi, jobsApiV2 } from "src/api/exporterApi";
import { FlowT, FlowVersionT, GenericObjectT } from "src/api/flowTypes";
import { flowVersionKeys } from "src/api/flowVersionQueries";
import { DecisionEnvironment } from "src/api/types";
import { DELETED_VERSION_NAME } from "src/jobs/common/constants";
import {
  CreateJobDestination,
  Job,
  JobDestination,
  JobRun,
  JobRunStatusType,
  JobSource,
  JobStatus,
  JobTrafficPolicy,
  SQLJobSourceConfiguration,
} from "src/jobs/types";
import { queryClient } from "src/queryClient";
import { logger } from "src/utils/logger";

export const useJobs = (
  baseURL: string,
  flowId?: string,
  status: Nullable<JobStatus> = null,
  options?: Pick<QueryObserverOptions<Job[]>, "select">,
) => {
  return useQuery({
    queryKey: ["jobs", baseURL, flowId, status],
    queryFn: async () =>
      (
        await jobsApi.get<Job[]>("/jobs", {
          baseURL,
          params: { flow_id: flowId, status },
        })
      ).data,
    enabled: !!baseURL && !!flowId,
    select: options?.select,
  });
};

type JobsV2Page = {
  jobs: Job[];
  next_page_token: string | null;
};
export const getJobsByRunId = async (
  baseURL: string,
  runId: string,
  signal?: AbortSignal,
) => {
  return (
    await jobsApiV2.get<JobsV2Page>(`/jobs`, {
      baseURL,
      params: {
        run_id: runId,
      },
      signal,
    })
  ).data;
};

export const useJobsWithVersionsUsed = (
  baseURL: string,
  flow?: FlowT,
  status: Nullable<JobStatus> = null,
) => {
  const extendWithVersion = useCallback(
    (jobs: Job[]) => {
      const versionsById = keyBy(flow!.versions, "id");
      return jobs.map((job) => ({
        ...job,
        versionsUsed: job.active_traffic_policy
          ? job.active_traffic_policy.policy.flow_versions.map(
              ({ flow_version_id, percentage }) => ({
                versionName:
                  versionsById[flow_version_id]?.name ?? DELETED_VERSION_NAME,
                percentage,
              }),
            )
          : job.flow_version_id
            ? [
                {
                  versionName:
                    versionsById[job.flow_version_id]?.name ??
                    DELETED_VERSION_NAME,
                  percentage: 100,
                },
              ]
            : [],
      }));
    },
    [flow],
  );
  return useJobs(baseURL, flow?.id, status, { select: extendWithVersion });
};

export const useJob = (baseURL: string, jobId: string) =>
  useQuery({
    queryKey: ["job", baseURL, jobId],
    queryFn: async () =>
      (await jobsApi.get<Job>(`/jobs/${jobId}`, { baseURL })).data,
    enabled: !!baseURL && !!jobId,
  });

export const useDuplicateJob = (baseURL: string, job?: Job) => {
  const queryClient = useQueryClient();
  const jobId = job?.id;
  return useMutation({
    mutationFn: (details: Pick<Job, "name" | "description">) =>
      jobsApi.post<Job>(`/jobs/${jobId}/duplicate`, details, {
        baseURL,
      }),
    onSuccess: () => {
      queryClient.invalidateQueries({
        queryKey: ["jobs", baseURL, job?.flow_id],
      });
    },
  });
};

export const useUpdateJob = (baseURL: string, job?: Job) => {
  const queryClient = useQueryClient();
  const jobId = job?.id;
  const flowId = job?.flow_id;
  return useMutation({
    mutationFn: ({
      etag,
      ...patch
    }: Partial<
      Pick<
        Job,
        | "name"
        | "description"
        | "flow_version_id"
        | "schedule"
        | "schedule_timezone"
        | "configuration"
      >
    > & {
      etag: string;
      active_source?: string;
      active_destination?: string | null;
    }) =>
      jobsApiV2.patch<Job>(`/jobs/${jobId}`, patch, {
        baseURL,
        headers: { "If-Match": etag },
      }),
    onSuccess: () => {
      queryClient.invalidateQueries({
        queryKey: ["job", baseURL, jobId],
      });
      queryClient.invalidateQueries({
        queryKey: ["jobs", baseURL, flowId],
      });
    },
  });
};

export const useActivateJob = (baseURL: string) => {
  const queryClient = useQueryClient();

  return useMutation({
    mutationFn: ({ etag, ...job }: Job) =>
      jobsApi.post<Job>(
        `/jobs/${job.id}/activate`,
        {},
        { baseURL, headers: { "If-Match": etag } },
      ),

    onSuccess: (_, job) => {
      queryClient.invalidateQueries({
        queryKey: ["job", baseURL, job.id],
      });
      queryClient.invalidateQueries({
        queryKey: ["jobs", baseURL, job.flow_id],
      });
    },
  });
};

export const useDeactivateJob = (baseURL: string, flowId: string) => {
  const queryClient = useQueryClient();

  return useMutation({
    mutationFn: (jobId: string) =>
      jobsApi.post<Job>(`/jobs/${jobId}/deactivate`, {}, { baseURL }),
    onSuccess: (_, jobId) => {
      queryClient.invalidateQueries({
        queryKey: ["job", baseURL, jobId],
      });
      queryClient.invalidateQueries({
        queryKey: ["jobs", baseURL, flowId],
      });
    },
  });
};

export const useDeleteJob = (baseURL: string, flowId: string) => {
  const queryClient = useQueryClient();

  return useMutation({
    mutationFn: (jobId: string) =>
      jobsApi.delete<Job>(`/jobs/${jobId}`, { baseURL }),
    onSuccess: () => {
      queryClient.invalidateQueries({
        queryKey: ["jobs", baseURL, flowId],
      });
    },
  });
};

type CreateJob = {
  name: string;
  description?: string;
};
export const useCreateJob = (baseURL: string, flowId: string) => {
  const queryClient = useQueryClient();
  return useMutation({
    mutationFn: async (args: CreateJob) => {
      if (!baseURL) throw new Error("baseUrl must be defined");

      return (
        await jobsApi.post<Job>(
          "/jobs",
          { ...args, flow_id: flowId },
          { baseURL },
        )
      ).data;
    },
    onSuccess: () => {
      queryClient.invalidateQueries({
        queryKey: ["jobs", baseURL, flowId],
      });
    },
  });
};

export const useJobSources = (baseURL: string, jobId: string) =>
  useQuery({
    queryKey: ["job_sources", baseURL, jobId],
    queryFn: async () =>
      (
        await jobsApi.get<JobSource[]>(`/jobs/${jobId}/sources`, {
          baseURL,
        })
      ).data,

    enabled: !!baseURL && !!jobId,
  });

type CreateJobSource = Pick<JobSource, "job_id" | "name" | "configuration">;

export const useCreateJobSource = (baseURL: string, jobId: string) => {
  const queryClient = useQueryClient();
  return useMutation({
    mutationFn: async (args: CreateJobSource) => {
      return (
        await jobsApi.post<JobSource>(`/jobs/${jobId}/sources`, args, {
          baseURL,
        })
      ).data;
    },
    onSuccess: () => {
      queryClient.invalidateQueries({
        queryKey: ["job_sources", baseURL, jobId],
      });
    },
  });
};

type JobDestinations = {
  destinations: JobDestination[];
  next_page_token: string | null;
};

export const useJobDestinations = (baseURL: string, jobId: string) =>
  useQuery({
    queryKey: ["job_destinations", baseURL, jobId],
    queryFn: async () =>
      (
        await jobsApiV2.get<JobDestinations>(`/jobs/${jobId}/destinations`, {
          baseURL,
        })
      ).data,

    enabled: !!baseURL && !!jobId,
  });

export const useCreateJobDestination = (baseURL: string, jobId: string) => {
  const queryClient = useQueryClient();
  return useMutation({
    mutationFn: async (args: CreateJobDestination) => {
      return (
        await jobsApiV2.post<JobSource>(`/jobs/${jobId}/destinations`, args, {
          baseURL,
        })
      ).data;
    },
    onSuccess: () => {
      queryClient.invalidateQueries({
        queryKey: ["job_destinations", baseURL, jobId],
      });
    },
  });
};

type UpdateJobDestination = Partial<
  Pick<JobDestination, "name" | "configuration"> & {
    id: string;
    etag: string;
  }
>;

export const useUpdateJobDestination = (baseURL: string, jobId: string) => {
  const queryClient = useQueryClient();
  return useMutation({
    mutationFn: async ({ id, etag, ...args }: UpdateJobDestination) => {
      return (
        await jobsApiV2.patch<JobSource>(
          `/jobs/${jobId}/destinations/${id}`,
          args,
          {
            baseURL,
            headers: { "If-Match": etag },
          },
        )
      ).data;
    },
    onSuccess: () => {
      queryClient.invalidateQueries({
        queryKey: ["job_destinations", baseURL, jobId],
      });
    },
  });
};

export const useDeleteJobDestination = (baseURL: string, jobId: string) => {
  const queryClient = useQueryClient();
  return useMutation({
    mutationFn: async (destinationId: string) => {
      return (
        await jobsApiV2.delete<JobDestination>(
          `/jobs/${jobId}/destinations/${destinationId}`,
          {
            baseURL,
          },
        )
      ).data;
    },
    onSuccess: () => {
      queryClient.invalidateQueries({
        queryKey: ["job_destinations", baseURL, jobId],
      });
    },
  });
};

export const useDeleteJobSource = (baseURL: string, jobId: string) => {
  const queryClient = useQueryClient();
  return useMutation({
    mutationFn: async (sourceId: string) => {
      logger.log("deleting source", sourceId);
      return (
        await jobsApi.delete<JobSource>(`/jobs/${jobId}/sources/${sourceId}`, {
          baseURL,
        })
      ).data;
    },
    onSuccess: () => {
      queryClient.invalidateQueries({
        queryKey: ["job_sources", baseURL, jobId],
      });
    },
  });
};

export const usePatchJobSource = (baseURL: string, jobId: string) => {
  const queryClient = useQueryClient();
  return useMutation({
    mutationFn: async ({
      id,
      etag,
      ...data
    }: Pick<JobSource, "id" | "etag" | "name" | "configuration">) => {
      return (
        await jobsApi.patch<JobSource>(`/jobs/${jobId}/sources/${id}`, data, {
          baseURL,
          headers: { "If-Match": etag },
        })
      ).data;
    },
    onSuccess: (source) => {
      queryClient.setQueryData(
        ["job_sources", baseURL, jobId],
        produce<JobSource[]>((draft) => {
          if (draft) {
            const sourceIndex = draft.findIndex(
              (s: JobSource) => s.id === source.id,
            );
            draft[sourceIndex] = source;
          }
        }),
      );
      queryClient.setQueryData(
        ["job", baseURL, jobId],
        produce<Job>((draft) => {
          if (draft?.active_source?.id === source.id) {
            draft.active_source = source;
          }
        }),
      );
    },
  });
};

export type JobPreviewResult = {
  data: {
    result: GenericObjectT[];
  };
  insights: GenericObjectT;
  meta: GenericObjectT;
};

export type JobPreviewArgs = Pick<
  SQLJobSourceConfiguration,
  "query" | "connection_id"
>;

export type PreviewErrorResponse = {
  detail: {
    title: string;
    details: string;
    type: "sql_validation" | "connect_api" | "unknown";
  };
};

export const usePreviewJobSource = (baseURL: string, jobId: string) => {
  return useMutation<
    JobPreviewResult,
    AxiosError<PreviewErrorResponse>,
    JobPreviewArgs
  >({
    mutationFn: async (args) => {
      return (
        await jobsApi.post(
          `/jobs/${jobId}/source_preview`,
          {
            ...args,
            limit: 100,
          },
          { baseURL },
        )
      ).data;
    },
  });
};

type RunFilters = {
  environment?: DecisionEnvironment;
  status?: JobRunStatusType;
};

export const jobRunsKeys = {
  all: ["job_runs"] as const,
  job: (baseURL: string, jobId: string) =>
    [...jobRunsKeys.all, baseURL, jobId] as const,
  filtered: (baseURL: string, jobId: string, filters: RunFilters) =>
    [
      ...jobRunsKeys.job(baseURL, jobId),
      "filtered",
      JSON.stringify(filters),
    ] as const,
  inProgress: (baseURL: string) =>
    [...jobRunsKeys.all, baseURL, "in_progress"] as const,
  single: (baseURL: string, jobId: string, runId: string) =>
    [...jobRunsKeys.job(baseURL, jobId), runId] as const,
};

const RUNS_LIMIT = 100;
export const useRuns = (baseURL: string, jobId: string, filters: RunFilters) =>
  useInfiniteQuery<JobRun[], Error>({
    queryKey: jobRunsKeys.filtered(baseURL, jobId, filters),
    getNextPageParam: (lastPage, allPages) => {
      return lastPage.length === RUNS_LIMIT
        ? allPages.reduce((sum, page) => sum + page.length, 0)
        : undefined;
    },
    queryFn: async ({ pageParam }) =>
      (
        await jobsApi.get<JobRun[]>(`/jobs/${jobId}/runs`, {
          baseURL,
          params: {
            limit: RUNS_LIMIT,
            offset: pageParam,
            environment: filters.environment,
            status: filters.status,
          },
        })
      ).data,
    initialPageParam: 0,
    enabled: !!baseURL && !!jobId,
    staleTime: 0,
    // If we don't refetch on mount, changing the filters may show old of date data
    refetchOnMount: true,
    refetchInterval: (query) => {
      return query.state.data &&
        query.state.data.pages.some((page) =>
          page.some((run) => run.status.type === "running"),
        )
        ? 5000
        : 60 * 1000;
    },
  });

export const useRunJob = (baseURL: string, jobId: string, flowId: string) => {
  const queryClient = useQueryClient();
  return useMutation({
    mutationFn: async (environment: DecisionEnvironment) => {
      if (!baseURL) throw new Error("baseUrl must be defined");

      return (
        await jobsApi.post<JobRun>(
          `/jobs/${jobId}/runs`,
          { environment },
          { baseURL },
        )
      ).data;
    },
    onSuccess: (newRun) => {
      const existingRunsQuery = queryClient
        .getQueriesData<InfiniteData<JobRun[]>>({
          queryKey: [...jobRunsKeys.job(baseURL, jobId), "filtered"],
          type: "active",
        })
        .at(0);
      if (existingRunsQuery) {
        const [queryKey, queryData] = existingRunsQuery;
        if (queryData) {
          const updatedQueryData = produce(queryData, (draft) => {
            draft.pages[0].unshift(newRun);
          });
          queryClient.setQueryData(queryKey, updatedQueryData);
        }
      }

      queryClient.invalidateQueries({
        queryKey: ["jobs", baseURL, flowId],
      });
      queryClient.invalidateQueries({
        queryKey: ["job", baseURL, jobId],
      });
    },
  });
};

export const useStopRun = (baseURL: string) => {
  const queryClient = useQueryClient();

  return useMutation({
    mutationFn: (args: { jobId: string; runId: string }) =>
      jobsApi.post<void>(
        `/jobs/${args.jobId}/runs/${args.runId}/stop`,
        {},
        {
          baseURL,
        },
      ),
    onSuccess: (_response, args) => {
      queryClient.invalidateQueries({
        queryKey: jobRunsKeys.job(baseURL, args.jobId),
      });
    },
  });
};

export const usePollRunsInProgress = (baseURL: string, runs: JobRun[]) => {
  const runsInProgress = runs.filter(
    (run) =>
      run.status.type === JobRunStatusType.Running ||
      run.status.type === JobRunStatusType.Writing,
  );

  return useQuery<JobRun[], Error>({
    queryKey: jobRunsKeys.inProgress(baseURL),
    queryFn: () => {
      const pollRequests = runsInProgress.map(
        async (run) =>
          (
            await jobsApi.get<JobRun>(
              `/jobs/${run.job_id}/runs/${run.id}/poll`,
              {
                baseURL,
              },
            )
          ).data,
      );
      return Promise.all(pollRequests);
    },
    enabled: !!baseURL && runsInProgress.length !== 0,
    staleTime: 0,
    refetchOnMount: true,
    refetchInterval: 5000,
  });
};

export const useJobRun = ({
  baseURL,
  enabled = true,
  jobId,
  runId,
}: {
  baseURL: string;
  jobId: string;
  runId: string;
  enabled: boolean;
}) =>
  useQuery({
    queryKey: jobRunsKeys.single(baseURL, jobId, runId),
    queryFn: async () =>
      (
        await jobsApi.get<JobRun>(`/jobs/${jobId}/runs/${runId}/poll`, {
          baseURL,
        })
      ).data,
    enabled: !!baseURL && enabled,
  });

export const useCreateTrafficPolicy = (
  baseURL: string,
  jobId: string,
  flowId: string,
) => {
  const queryClient = useQueryClient();
  return useMutation({
    mutationFn: (
      newTrafficPolicy?: Pick<JobTrafficPolicy, "policy" | "stable_routing">,
    ) =>
      jobsApiV2.post<Job>(`/jobs/${jobId}/traffic_policies`, newTrafficPolicy, {
        baseURL,
      }),
    onSuccess: () => {
      queryClient.invalidateQueries({
        queryKey: ["job", baseURL, jobId],
      });
      queryClient.invalidateQueries({
        queryKey: ["jobs", baseURL, flowId],
      });
    },
  });
};

export const useFlowVersions = (flowVersionIds: string[]) => {
  return useQuery<FlowVersionT[], AxiosError>({
    queryKey: ["jobs_flow_versions", flowVersionIds],
    queryFn: async () => {
      const promises = flowVersionIds.map((flowVersionId) => {
        const cachedVersion: FlowVersionT | undefined =
          queryClient.getQueryData(flowVersionKeys.detail(flowVersionId));
        if (cachedVersion) {
          return cachedVersion;
        }
        return loadVersion(flowVersionId);
      });
      return Promise.all(promises);
    },
    enabled: flowVersionIds.length > 0,
  });
};
