import {
  combineLatest,
  distinctUntilChanged,
  firstValueFrom,
  from,
  map,
  Observable,
  of,
  shareReplay,
  switchMap,
} from "rxjs";
import { collectionData, docData } from "~/utils/rxFireWrappers";
import {
  collectionGroupRef,
  collectionRef,
  docExists,
  docRef,
} from "~/firestore.service";
import {
  ASSERT_CURRENT_USER$,
  ASSERT_CURRENT_USER_ID$,
  catchNoCurrentUserError,
  getAndAssertCurrentUser,
} from "./user.service";
import { useObservable } from "~/utils/useObservable";
import {
  IChannelSubscriptionDoc,
  ISubscriptionDoc,
  IThreadDoc,
  IThreadSubscriptionDoc,
} from "@libs/firestore-models";
import { isEqual } from "@libs/utils/isEqual";
import {
  observeThread,
  removeCurrentUserFromThreadParticipants,
} from "./post.service";
import { validateNewSubscription } from "~/utils/decoders";
import {
  getCountFromServer,
  getDoc,
  limit,
  query,
  serverTimestamp,
  setDoc,
  updateDoc,
  where,
  WithFieldValue,
} from "firebase/firestore";
import { withPendingUpdate } from "./loading.service";
import { pick, throttle } from "lodash-es";
import {
  ALL_OTHER_ACCEPTED_MEMBERS_OF_USERS_ORGANIZATIONS$,
  IAcceptedOrganizationMemberDoc,
} from "./organization.service";
import { toast } from "./toast-service";
import { offlineAwareFirestoreCRUD } from "./network-connection.service";
import { isNonNullable } from "@libs/utils/predicates";
import { observeChannelMembers, USER_CHANNELS$ } from "./channels.service";
import {
  getSubscriptionPreferenceFromPriority,
  getSubscriptionPreferencePriority,
} from "@libs/firestore-models/utils";
import { UnreachableCaseError } from "@libs/utils/errors";
import { stringComparer } from "@libs/utils/comparers";

export const mergeSubscription = withPendingUpdate(
  async (
    args:
      | {
          type: "thread";
          subjectId: IThreadSubscriptionDoc["id"];
          preference: IThreadSubscriptionDoc["preference"];
        }
      | {
          type: "channel";
          subjectId: IChannelSubscriptionDoc["id"];
          preference?: IChannelSubscriptionDoc["preference"];
          isPinned?: IChannelSubscriptionDoc["isPinned"];
        },
  ) => {
    const currentUser = getAndAssertCurrentUser();

    const existingSubscriptionQuery = getDoc(
      docRef("users", currentUser.id, "subscriptions", args.subjectId),
    );

    if (args.type === "channel") {
      if (!(await docExists(docRef("channels", args.subjectId)))) {
        return;
      }
    } else if (args.type === "thread") {
      // Here we use `observeThread()` since it also checks for sent drafts
      if (!(await firstValueFrom(observeThread(args.subjectId)))) {
        return;
      }
    } else {
      // eslint-disable-next-line @typescript-eslint/no-explicit-any
      const msg = `Unexpected subscription type: ${(args as any).type}.`;
      alert(msg);
      console.error(msg, args);
      return;
    }

    const snap = await existingSubscriptionQuery;
    const doc = snap.data();

    if (doc?.type === "channel" && args.type === "channel") {
      const subscriptionDoc: Partial<WithFieldValue<IChannelSubscriptionDoc>> =
        {
          ...pick(args, "preference", "isPinned"),
          updatedAt: serverTimestamp(),
        };

      console.debug("updateSubscription", subscriptionDoc);

      await offlineAwareFirestoreCRUD(updateDoc(snap.ref, subscriptionDoc));
    } else if (doc?.type === "thread" && args.type === "thread") {
      const subscriptionDoc: Partial<WithFieldValue<IThreadSubscriptionDoc>> = {
        ...pick(args, "preference"),
        updatedAt: serverTimestamp(),
      };

      console.debug("updateSubscription", subscriptionDoc);

      await offlineAwareFirestoreCRUD(updateDoc(snap.ref, subscriptionDoc));
    } else if (!doc && args.type === "channel") {
      const subscriptionDoc: WithFieldValue<IChannelSubscriptionDoc> = {
        id: args.subjectId,
        type: args.type,
        userId: currentUser.id,
        preference: args.preference ?? "involved",
        isPinned: args.isPinned ?? false,
        updatedAt: serverTimestamp(),
        createdAt: serverTimestamp(),
      };

      const subscription = validateNewSubscription(subscriptionDoc);

      console.debug("createSubscription", subscription);

      await offlineAwareFirestoreCRUD(
        setDoc(
          docRef("users", currentUser.id, "subscriptions", subscription.id),
          subscription,
        ),
      );
    } else if (!doc && args.type === "thread") {
      const subscriptionDoc: WithFieldValue<ISubscriptionDoc> = {
        id: args.subjectId,
        type: args.type,
        userId: currentUser.id,
        preference: args.preference ?? "involved",
        updatedAt: serverTimestamp(),
        createdAt: serverTimestamp(),
      };

      const subscription = validateNewSubscription(subscriptionDoc);

      console.debug("createSubscription", subscription);

      await offlineAwareFirestoreCRUD(
        setDoc(
          docRef("users", currentUser.id, "subscriptions", subscription.id),
          subscription,
        ),
      );
    } else {
      // eslint-disable-next-line @typescript-eslint/no-explicit-any
      throw new Error(`Unknown subscription type "${(doc as any).type}"`);
    }
  },
);

export const USER_CHANNEL_SUBSCRIPTIONS$ = ASSERT_CURRENT_USER_ID$.pipe(
  switchMap((userId) =>
    collectionData(
      query(
        collectionRef<IChannelSubscriptionDoc>(
          "users",
          userId,
          "subscriptions",
        ),
        where("type", "==", "channel"),
      ),
    ),
  ),
  catchNoCurrentUserError(() => []),
  distinctUntilChanged(isEqual),
  shareReplay(1),
);

export function observeCurrentUserChannelSubscription(channelId: string) {
  return ASSERT_CURRENT_USER_ID$.pipe(
    switchMap((userId) =>
      docData(
        docRef<IChannelSubscriptionDoc>(
          "users",
          userId,
          "subscriptions",
          channelId,
        ),
      ),
    ),
    catchNoCurrentUserError(() => null),
    distinctUntilChanged(isEqual),
    shareReplay(1),
  );
}

export function useCurrentUserChannelSubscription(channelId?: string) {
  return useObservable(
    () => {
      if (!channelId) return of(null);

      return observeCurrentUserChannelSubscription(channelId);
    },
    {
      deps: [channelId],
    },
  );
}

export function observeCurrentUserThreadSubscription(
  threadId: string,
): Observable<ISubscriptionDoc | null> {
  return ASSERT_CURRENT_USER_ID$.pipe(
    switchMap((userId) =>
      observeThread(threadId).pipe(
        switchMap((threadDoc) => {
          if (!threadDoc) return of(null);

          if (threadDoc.permittedChannelIds.length === 0) {
            return docData(
              docRef<IThreadSubscriptionDoc>(
                "users",
                userId,
                "subscriptions",
                threadId,
              ),
            );
          }

          return combineLatest([
            docData(
              docRef<IThreadSubscriptionDoc>(
                "users",
                userId,
                "subscriptions",
                threadId,
              ),
            ),
            USER_CHANNEL_SUBSCRIPTIONS$.pipe(
              map((subscriptions) =>
                subscriptions.filter((sub) =>
                  threadDoc.permittedChannelIds.includes(sub.id),
                ),
              ),
            ),
          ]).pipe(
            map(([threadSub, channelSubs]) => {
              if (threadSub) return threadSub;
              if (!channelSubs || channelSubs.length === 0) return null;
              if (channelSubs.length === 1) return channelSubs[0] ?? null;

              const channelSub = channelSubs.reduce((prev, curr) => {
                const prevPriority = getSubscriptionPreferencePriority(
                  prev.preference,
                );
                const currPriority = getSubscriptionPreferencePriority(
                  curr.preference,
                );

                if (prevPriority < currPriority) {
                  return prev;
                } else {
                  return curr;
                }
              });

              return channelSub ?? null;
            }),
          );
        }),
      ),
    ),
    catchNoCurrentUserError(() => null),
  );
}

export function useCurrentUserThreadSubscription(threadId?: string) {
  return useObservable(
    () => {
      if (!threadId) return of(null);

      return observeCurrentUserThreadSubscription(threadId);
    },
    {
      deps: [threadId],
    },
  );
}

export function observeChannelSubscription(
  channelId: string,
  // We might want to subscribe to a user other than the current user
  userId: string,
) {
  return ASSERT_CURRENT_USER_ID$.pipe(
    switchMap(() =>
      collectionData(
        query(
          collectionGroupRef<IChannelSubscriptionDoc>("subscriptions"),
          where("type", "==", "channel"),
          where("id", "==", channelId),
          where("userId", "==", userId),
          limit(1),
        ),
      ),
    ),
    map((subs) => (subs[0] ?? null) as IChannelSubscriptionDoc | null),
    catchNoCurrentUserError(() => null),
    distinctUntilChanged(isEqual),
  );
}

export function observeChannelSubscriptions(channelId: string) {
  return ASSERT_CURRENT_USER_ID$.pipe(
    switchMap(() =>
      collectionData(
        query(
          collectionGroupRef<IChannelSubscriptionDoc>("subscriptions"),
          where("type", "==", "channel"),
          where("id", "==", channelId),
        ),
      ),
    ),
    catchNoCurrentUserError(() => []),
    distinctUntilChanged(isEqual),
  );
}

export function useChannelSubscribers(channelId?: string):
  | Array<
      IAcceptedOrganizationMemberDoc["user"] & {
        id: string;
        preference: IChannelSubscriptionDoc["preference"];
      }
    >
  | undefined {
  return useObservable(
    () => {
      if (!channelId) return of([]);

      return USER_CHANNELS$.pipe(
        switchMap((channels) => {
          const channel = channels.find((c) => c.id === channelId);

          if (!channel) return of([]);

          return combineLatest([
            observeChannelSubscriptions(channelId),
            observeChannelMembers(channelId),
          ]).pipe(
            map(([subscriptions, members]) => {
              return members.map((member) => {
                const subscription = subscriptions.find(
                  (s) => s.userId === member.id,
                );

                return {
                  id: member.id,
                  ...member.user,
                  // "involved" is the current default subscription level
                  // TODO: fix this if default subscription level becomes changable
                  preference: subscription?.preference || "involved",
                };
              });
            }),
          );
        }),
      );
    },
    {
      deps: [channelId],
    },
  );
}

/**
 * Returns a count of the number of channel subscribers
 * with the "all-new" subscription preference. Note that
 * this observable completes immediately after returning
 * it's first value.
 */
export function useChannelSubscribersCount(
  channelId?: string,
): number | undefined {
  return useObservable(
    () => {
      if (!channelId) return of(0);

      return from(
        getCountFromServer(
          query(
            collectionGroupRef("subscriptions"),
            where("type", "==", "channel"),
            where("id", "==", channelId),
            where("preference", "==", "all-new"),
          ),
        ).then((s) => s.data().count),
      );
    },
    {
      deps: [channelId],
    },
  );
}

/**
 * @returns observable of subscriptions
 */
export function observeSubscriptionsToChannels(channelIds: string[]) {
  if (channelIds.length === 0) return of([]);

  return ASSERT_CURRENT_USER_ID$.pipe(
    switchMap(() =>
      combineLatest(
        channelIds.map((channelId) =>
          collectionData(
            query(
              collectionGroupRef<IChannelSubscriptionDoc>("subscriptions"),
              where("type", "==", "channel"),
              where("id", "==", channelId),
            ),
          ),
        ),
      ),
    ),
    map((groupedSubscriptions) => groupedSubscriptions.flat()),
    catchNoCurrentUserError(() => []),
    distinctUntilChanged(isEqual),
  );
}

export function useSubscriptionsToChannels(channelIds?: string[]) {
  return useObservable(
    () => {
      if (!channelIds || channelIds.length === 0) return of([]);
      return observeSubscriptionsToChannels(channelIds);
    },
    {
      deps: [channelIds],
    },
  );
}

export function observeSubscriptionsToThread(threadId: string) {
  return ASSERT_CURRENT_USER_ID$.pipe(
    switchMap(() =>
      collectionData(
        query(
          collectionGroupRef<IThreadSubscriptionDoc>("subscriptions"),
          where("id", "==", threadId),
          where("type", "==", "thread"),
        ),
      ),
    ),
    catchNoCurrentUserError(() => []),
    distinctUntilChanged(isEqual),
  );
}

export function observeUserRecipientsForThread(threadId: string) {
  return combineLatest([
    ASSERT_CURRENT_USER$.pipe(
      map((u) => pick(u, "id", "name", "email", "photoURL")),
      distinctUntilChanged(isEqual),
    ),
    ALL_OTHER_ACCEPTED_MEMBERS_OF_USERS_ORGANIZATIONS$,
    observeThread(threadId),
    observeSubscriptionsToThread(threadId),
  ]).pipe(
    switchMap(
      ([currentUser, organizationMembers, thread, threadSubscriptions]) => {
        if (!thread) return of([]);

        const participatingUserIdsWOThreadSubscription =
          thread.participatingUserIds.filter(
            (userId) =>
              !threadSubscriptions.some((sub) => sub.userId === userId),
          );

        const participatingUserIdsSubscribedToAPermittedChannel$ =
          thread.permittedChannelIds.length === 0 ||
          participatingUserIdsWOThreadSubscription.length === 0
            ? // The default subscription is "involved".
              of(participatingUserIdsWOThreadSubscription)
            : combineLatest(
                participatingUserIdsWOThreadSubscription.map((userId) =>
                  combineLatest(
                    thread.permittedChannelIds.map((channelId) =>
                      observeChannelSubscription(channelId, userId),
                    ),
                  ).pipe(
                    map((nullableSubs) => {
                      const subs = nullableSubs.filter(isNonNullable);

                      // TODO:
                      // Update this when we allow configuring the default subscription.

                      const isSubscribedToAChannel =
                        // The default subscription is "involved".
                        subs.length === 0 ||
                        subs.some(
                          (sub) =>
                            sub.preference === "all-new" ||
                            sub.preference === "involved",
                        );

                      return isSubscribedToAChannel ? userId : null;
                    }),
                  ),
                ),
              ).pipe(map((userIds) => userIds.filter(isNonNullable)));

        return participatingUserIdsSubscribedToAPermittedChannel$.pipe(
          map((participatingUserIdsSubscribedToAPermittedChannel) => {
            const idsOfUsersSubscribedToThread = [
              ...threadSubscriptions
                .filter(
                  (sub) =>
                    sub.preference === "all" || sub.preference === "involved",
                )
                .map((sub) => sub.userId),
              ...participatingUserIdsSubscribedToAPermittedChannel,
            ];

            return idsOfUsersSubscribedToThread.map((userId) => {
              const member =
                (currentUser.id === userId && {
                  name: currentUser.name,
                  email: currentUser.email,
                  photoURL: currentUser.photoURL,
                }) ||
                thread.userPermissions[userId] ||
                organizationMembers.find((member) => member.id === userId)
                  ?.user;

              return {
                userId,
                member,
              };
            });
          }),
        );
      },
    ),
    catchNoCurrentUserError(() => []),
  );
}

export function useUserRecipientsForThread(threadId?: string) {
  return useObservable(
    () => {
      if (!threadId) return of([]);

      return observeUserRecipientsForThread(threadId);
    },
    {
      deps: [threadId],
    },
  );
}

export function observeUsersWhoAreSubscribedToThread(args: {
  thread: Pick<
    IThreadDoc,
    "id" | "permittedChannelIds" | "participatingUserIds"
  >;
  onlyCountTheseSubscriptionPreferences: readonly ISubscriptionDoc["preference"][];
  dontIncludeCurrentUser?: boolean;
}) {
  const {
    thread,
    onlyCountTheseSubscriptionPreferences,
    dontIncludeCurrentUser,
  } = args;

  return combineLatest([
    observeSubscriptionsToChannels(thread.permittedChannelIds),
    observeSubscriptionsToThread(thread.id),
  ]).pipe(
    map(([channelSubscriptions, threadSubscriptions]) => {
      const distinctUserIds = Array.from(
        new Set([
          ...channelSubscriptions.map((s) => s.userId),
          ...threadSubscriptions.map((s) => s.userId),
        ]),
      );

      const groupedUserIds: Record<ISubscriptionDoc["preference"], string[]> = {
        all: [] as string[],
        "all-new": [] as string[],
        involved: [] as string[],
      };

      for (const userId of distinctUserIds) {
        const threadSub = threadSubscriptions.find((s) => s.userId === userId);

        if (threadSub) {
          groupedUserIds[threadSub.preference].push(userId);
          continue;
        }

        const usersChannelSubs = channelSubscriptions.filter(
          (s) => s.userId === userId,
        );

        const channelSubValue = usersChannelSubs.reduce((prev, curr) => {
          const currValue = getSubscriptionPreferencePriority(curr.preference);

          if (prev > currValue) {
            return currValue;
          } else {
            return prev;
          }
        }, 3 as 1 | 2 | 3);

        const preference =
          getSubscriptionPreferenceFromPriority(channelSubValue);

        groupedUserIds[preference].push(userId);
      }

      return groupedUserIds;
    }),
    distinctUntilChanged(isEqual),
    switchMap((groupedUserIds) => {
      return ALL_OTHER_ACCEPTED_MEMBERS_OF_USERS_ORGANIZATIONS$.pipe(
        map((members) => {
          const currentUser = getAndAssertCurrentUser();

          const watchingUserIdsSet = new Set(thread.participatingUserIds);

          for (const preference of onlyCountTheseSubscriptionPreferences) {
            for (const userId of groupedUserIds[preference]) {
              watchingUserIdsSet.add(userId);
            }
          }

          if (dontIncludeCurrentUser) {
            watchingUserIdsSet.delete(currentUser.id);
          }

          const groupedUsers = {
            knownSubscribers: [] as Array<
              (typeof members)[number]["user"] & { id: string }
            >,
            unknownSubscribersCount: 0,
          };

          const getUserDataForId = (userId: string) => {
            if (currentUser.id === userId) {
              return { ...currentUser };
            }

            const member = members.find((m) => m.id === userId);

            if (!member) return;

            return {
              id: member.id,
              ...member.user,
            };
          };

          watchingUserIdsSet.forEach((userId) => {
            const userData = getUserDataForId(userId);

            if (userData) {
              groupedUsers.knownSubscribers.push(userData);
            } else {
              groupedUsers.unknownSubscribersCount++;
            }
          });

          groupedUsers.knownSubscribers.sort(
            (a, b) => stringComparer(a.name, b.name) || (a.id > b.id ? 1 : -1),
          );

          return groupedUsers;
        }),
      );
    }),
    distinctUntilChanged(isEqual),
  );
}

/**
 * Returns the known and unknown users who are subscribed to the
 * provided thread and will receive notifications.
 */
export function useUsersWhoAreSubscribedToThread(args: {
  thread:
    | Pick<IThreadDoc, "id" | "permittedChannelIds" | "participatingUserIds">
    | null
    | undefined;
  /**
   * **IMPORTANT:**
   * The object reference of this array should not change on each re-render. This
   * object is passed as a hook dependency, and if it changes, the hook will
   * re-run.
   */
  onlyCountTheseSubscriptionPreferences: readonly ISubscriptionDoc["preference"][];
  dontIncludeCurrentUser?: boolean;
}) {
  const {
    thread,
    onlyCountTheseSubscriptionPreferences,
    dontIncludeCurrentUser,
  } = args;

  return useObservable(
    () => {
      if (!thread) {
        return of({
          knownSubscribers: [],
          unknownSubscribersCount: 0,
        });
      }

      return observeUsersWhoAreSubscribedToThread({
        thread,
        onlyCountTheseSubscriptionPreferences,
        dontIncludeCurrentUser,
      });
    },
    {
      deps: [
        thread?.permittedChannelIds,
        thread?.participatingUserIds,
        thread?.id,
        onlyCountTheseSubscriptionPreferences,
      ],
    },
  );
}

export const updateThreadSubscription = throttle(
  async (args: { threadId: string; preference: "all" | "involved" }) => {
    const updates: Promise<unknown>[] = [];

    if (args.preference === "involved") {
      updates.push(
        removeCurrentUserFromThreadParticipants(args.threadId).then(
          () => `Thread participants update successful`,
        ),
      );
    }

    toast("vanilla", {
      subject: args.preference === "all" ? `Subscribed.` : `Unsubscribed.`,
      description:
        args.preference === "all"
          ? `You will receive notifications for every reply
             to this thread.`
          : `You won't receive notifications unless a reply is addressed
             to you or @mentions you.`,
    });

    updates.push(
      mergeSubscription({
        type: "thread",
        subjectId: args.threadId,
        ...pick(args, "preference"),
      }).then(() => console.debug("Subscription update successful")),
    );

    await Promise.all(updates);
  },
  500,
  {
    leading: true,
    trailing: false,
  },
);

export const updateChannelSubscription = throttle(
  async (args: {
    channelId: string;
    preference?: IChannelSubscriptionDoc["preference"];
    isPinned?: IChannelSubscriptionDoc["isPinned"];
  }) => {
    if (args.preference) {
      switch (args.preference) {
        case "all": {
          toast("vanilla", {
            subject: `Subscribed to All.`,
            description: `
              You will receive a notification for every message 
              in this channel.
            `,
          });
          break;
        }
        case "all-new": {
          toast("vanilla", {
            subject: `Subscribed.`,
            description: `
              You will receive a notification for every new
              thread created in this channel.
            `,
          });
          break;
        }
        case "involved": {
          toast("vanilla", {
            subject: `Unsubscribed.`,
            description: `
              You will only receive notifications for 
              threads you are participating or @mentioned in.
            `,
          });
          break;
        }
        default: {
          throw new UnreachableCaseError(args.preference);
        }
      }

      await mergeSubscription({
        type: "channel",
        subjectId: args.channelId,
        ...pick(args, "preference", "isPinned"),
      }).then(() => console.debug("Subscription update successful"));
    } else if (args.isPinned !== undefined) {
      toast("vanilla", {
        subject: args.isPinned
          ? `Channel pinned to sidebar.`
          : `Channel removed from sidebar.`,
      });

      await mergeSubscription({
        type: "channel",
        subjectId: args.channelId,
        isPinned: args.isPinned,
      }).then(() => console.debug("Subscription update successful"));
    } else {
      throw new Error(
        `Oops, it doesn't look like you passed an expected 
         argument to updateChannelSubscription`,
      );
    }
  },
  500,
  {
    leading: true,
    trailing: false,
  },
);
