import {
  of,
  map,
  distinctUntilChanged,
  combineLatest,
  switchMap,
  Observable,
  firstValueFrom,
  shareReplay,
  filter,
  BehaviorSubject,
} from "rxjs";
import { collectionData, docData, fromRef } from "~/utils/rxFireWrappers";
import { collectionGroupRef, collectionRef, docRef } from "~/firestore.service";
import {
  IChannelDoc,
  ICommsPostDoc,
  ICommsThreadDoc,
  IEmailPostDoc,
  IEmailThreadDoc,
  IPostDoc,
  ISecretEmailPostDoc,
  ISecretEmailThreadDoc,
  IThreadDoc,
  IThreadReadStatusDoc,
  IUserDoc,
  WithLocalData,
} from "@libs/firestore-models";
import {
  query,
  where,
  orderBy,
  updateDoc,
  serverTimestamp,
  arrayRemove,
  deleteField,
  setDoc,
  limit,
  endAt,
} from "firebase/firestore";
import { useObservable } from "~/utils/useObservable";
import { isEqual } from "@libs/utils/isEqual";
import {
  postComparer,
  stringComparer,
  timestampComparer,
} from "~/utils/comparers";
import {
  ICommsThreadDocFromDraft,
  IEmailThreadDocFromDraft,
  IPostDocFromDraft,
  IThreadDocFromDraft,
  SENT_DRAFTS_AS_POSTS$,
  SENT_DRAFTS_AS_THREADS$,
} from "./draft.service";
import {
  ASSERT_CURRENT_USER$,
  ASSERT_CURRENT_USER_ID$,
  catchNoCurrentUserError,
  getAndAssertCurrentUser,
} from "./user.service";
import { withPendingUpdate } from "./loading.service";
import { debounce, pick, uniq } from "lodash-es";
import { offlineAwareFirestoreCRUD } from "./network-connection.service";
import { getTypedCallableFn } from "~/firebase";
import {
  ALL_OTHER_ACCEPTED_MEMBERS_OF_USERS_ORGANIZATIONS$,
  IAcceptedOrganizationMemberDoc,
} from "./organization.service";
import {
  IChannelDocWithCurrentUserData,
  USER_CHANNELS$,
} from "./channels.service";
import { isNonNullable } from "@libs/utils/predicates";
import { Merge } from "type-fest";
import { RefObject, useMemo } from "react";
import { getPagedQuery, useListPaging } from "~/utils/useListPaging";
import type { Timestamp } from "@firebase/firestore-types";
import { parsePostHTML } from "@libs/utils/parseHTMLProse";
import {
  IEmailThreadDocFromSecretEmailThread,
  mapSecretEmailPostToEmailPost,
  mapSecretEmailThreadToEmailThread,
} from "@libs/firestore-models/utils";
import { UnreachableCaseError } from "@libs/utils/errors";
import { cacheReplayForTime } from "@libs/utils/rxjs-operators";

export const updatePost = getTypedCallableFn("postupdate");
export const updateThread = getTypedCallableFn("threadupdate");
export const linkGmailEmailAccount = getTypedCallableFn("linkgmailemail");

export type IPostDocWithChannelRecipients = WithLocalData<
  IPostDoc,
  "IPostDoc",
  { knownRecipientChannels: IChannelDoc[] }
>;

export type IObservePost = WithLocalData<
  ICommsPostDoc | IEmailPostDoc,
  "IPostDoc",
  Merge<
    {
      knownRecipientChannels: IChannelDoc[];
      fromSecretPost?: ISecretEmailPostDoc;
    },
    Partial<IPostDocFromDraft["__local"]>
  >
>;

export type IThreadDocWithPermittedChannels = WithLocalData<
  IThreadDoc,
  "IThreadDoc",
  { knownPermittedChannels: IChannelDoc[] }
>;

export type IObserveThread = WithLocalData<
  ICommsThreadDoc | IEmailThreadDoc,
  "IThreadDoc",
  Merge<
    {
      knownPermittedChannels: IChannelDoc[];
      fromSecretThread?: ISecretEmailThreadDoc;
    },
    Partial<IThreadDocFromDraft["__local"]>
  >
>;

/**
 * Provided postId cannot be for a secret post. This is just
 * a safety mechanism since, in general, we don't want to directly
 * observe secret posts.
 */
export function observePost(postId: string): Observable<IObservePost | null> {
  return ASSERT_CURRENT_USER_ID$.pipe(
    switchMap(() => SENT_DRAFTS_AS_POSTS$),
    map((posts) => posts.find((post) => post.id === postId)),
    switchMap((post) =>
      post
        ? of(post)
        : docData(docRef("posts", postId)).pipe(
            switchMap((post) => {
              if (post) {
                if (post.type === "EMAIL_SECRET") {
                  console.error(
                    "observePost: provided postId is for a secret post",
                    postId,
                  );

                  return of(null);
                }

                return of(post);
              }

              // If null, that indicates we encountered a Firebase error. At
              // time of writing, I'm not sure if Firebase will throw a security
              // rules error if the post doesn't exist (we've tried to prevent
              // this in the security rules, but I'm not sure if we're successful) or
              // if undefined will be returned.
              // If the post does exist but the current user doesn't have permission
              // to view it, then it will definitely throw a security rules
              // error. In either case, if we fail to get the post then we check to
              // see if the current user has access to a "secret copy" of this post.
              // We could check for a secret copy in parallel (which would be faster),
              // but since the existance of a secret post is expected to be uncommon
              // we're choosing to reduce unnecessary database reads at the cost of
              // some performance (I don't consider this a premature optimization since
              // we're already suffering from higher-than-acceptable database read
              // counts each month).
              return ASSERT_CURRENT_USER_ID$.pipe(
                switchMap((currentUserId) =>
                  collectionData(
                    query(
                      collectionRef<ISecretEmailPostDoc>("posts"),
                      where("type", "==", "EMAIL_SECRET"),
                      where("forUserId", "==", currentUserId),
                      where("forPostId", "==", postId),
                      limit(1),
                    ),
                  ),
                ),
                map((secretPosts) => {
                  const secretPost = secretPosts[0];
                  if (!secretPost) return null;
                  return mapSecretEmailPostToEmailPost(secretPost);
                }),
              );
            }),
          ),
    ),
    switchMap((post) =>
      !post
        ? of(post)
        : USER_CHANNELS$.pipe(
            map((channels) => {
              return {
                ...post,
                __docType: "IPostDoc" as const,
                __local: {
                  ...("__local" in post ? (post.__local as {}) : {}),
                  knownRecipientChannels: post.recipientChannelIds
                    .map((channelId) =>
                      channels.find((c) => c.id === channelId),
                    )
                    .filter(isNonNullable),
                },
              } as IObservePost;
            }),
          ),
    ),
  ).pipe(
    catchNoCurrentUserError(() => null),
    distinctUntilChanged(isEqual),
  );
}

export function getPost(postId: string): Promise<IPostDoc | null> {
  return firstValueFrom(observePost(postId));
}

export function usePost(postId?: string): IObservePost | null | undefined {
  return useObservable(
    () => {
      if (!postId) return of(null);

      return observePost(postId);
    },
    {
      deps: [postId],
    },
  );
}

/**
 * Returns the thread doc associated with the provided ID. The returned thread
 * doc will include information from unsent drafts as appropriate.
 *
 * Provided threadId cannot be for a secret thread. This is just
 * a safety mechanism since, in general, we don't want to directly
 * observe secret threads.
 */
export function observeThread(
  threadId: string,
): Observable<IObserveThread | null> {
  let obs = observeThreadCache.get(threadId);

  if (obs) return obs;

  obs = ASSERT_CURRENT_USER_ID$.pipe(
    switchMap(() => SENT_DRAFTS_AS_THREADS$),
    map((threads) => threads.find((thread) => thread.id === threadId)),
    switchMap<
      IThreadDocFromDraft | undefined,
      Observable<
        | IThreadDoc
        | IEmailThreadDocFromSecretEmailThread
        | IThreadDocFromDraft
        | null
      >
    >((thread) =>
      thread
        ? of(thread)
        : docData(docRef("threads", threadId)).pipe(
            switchMap((thread) => {
              if (thread) {
                if (thread.type === "EMAIL_SECRET") {
                  console.error(
                    "observeThread: provided threadId is for a secret thread",
                    threadId,
                  );

                  return of(null);
                }

                return of(thread);
              }

              // If null, that indicates we encountered a Firebase error. At
              // time of writing, I'm not sure if Firebase will throw a security
              // rules error if the post doesn't exist (we've tried to prevent
              // this in the security rules, but I'm not sure if we're successful) or
              // if undefined will be returned.
              // If the post does exist but the current user doesn't have permission
              // to view it, then it will definitely throw a security rules
              // error. In either case, if we fail to get the post then we check to
              // see if the current user has access to a "secret copy" of this post.
              // We could check for a secret copy in parallel (which would be faster),
              // but since the existance of a secret post is expected to be uncommon
              // we're choosing to reduce unnecessary database reads at the cost of
              // some performance (I don't consider this a premature optimization since
              // we're already suffering from higher-than-acceptable database read
              // counts each month).
              return ASSERT_CURRENT_USER_ID$.pipe(
                switchMap((currentUserId) =>
                  collectionData(
                    query(
                      collectionRef<ISecretEmailThreadDoc>("threads"),
                      where("type", "==", "EMAIL_SECRET"),
                      where(
                        "permittedUserIds",
                        "array-contains",
                        currentUserId,
                      ),
                      where("forThreadId", "==", threadId),
                      limit(1),
                    ),
                  ),
                ),
                map((secretThreads) => {
                  const secretThread = secretThreads[0];
                  if (!secretThread) return null;
                  return mapSecretEmailThreadToEmailThread(secretThread);
                }),
              );
            }),
          ),
    ),
  ).pipe(
    distinctUntilChanged(isEqual),
    switchMap((thread) => {
      if (!thread) return of(null);

      const isThreadFromSecretThread =
        thread.type === "EMAIL" &&
        "__local" in thread &&
        "fromSecretThread" in thread.__local &&
        !!thread.__local.fromSecretThread;

      // Users can't draft "secret posts" (instead the server
      // automatically generates them in some situations).
      if (isThreadFromSecretThread) return of(thread);
      // Users can't draft "secret posts" (instead the server
      // automatically generates them in some situations).
      else if (thread.type === "EMAIL_SECRET") return of(thread);

      return SENT_DRAFTS_AS_POSTS$.pipe(
        map((allUnsentPosts) => {
          const unsentPostsForThread = allUnsentPosts
            .filter((unsentPost) => unsentPost.threadId === threadId)
            .sort(
              (a, b) =>
                timestampComparer(a.sentAt, b.sentAt) ||
                timestampComparer(
                  a.scheduledToBeSentAt,
                  b.scheduledToBeSentAt,
                ) ||
                stringComparer(a.id, b.id),
            );

          if (unsentPostsForThread.length === 0) {
            return thread;
          }

          const initialValue = {
            participatingUserIds: thread.participatingUserIds,
            participatingUsers: thread.participatingUsers,
            permittedChannelIds: thread.permittedChannelIds,
            permittedUserIds: thread.permittedUserIds,
            userPermissions: thread.userPermissions,
          };

          const threadRecipientDetails = unsentPostsForThread.reduce(
            (store, post) => {
              const participatingUsers = {
                ...store.participatingUsers,
                ...post.recipientUsers,
              };

              const userPermissions = {
                ...store.userPermissions,
                ...post.recipientUsers,
              };

              if (post.creatorId && post.creatorName && post.creatorEmail) {
                const creatorData = {
                  name: post.creatorName,
                  email: post.creatorEmail,
                  photoURL: post.creatorPhotoURL,
                };

                participatingUsers[post.creatorId] = creatorData;
                userPermissions[post.creatorId] = creatorData;
              }

              return {
                participatingUserIds: Object.keys(participatingUsers),
                participatingUsers,
                permittedChannelIds: uniq([
                  ...store.permittedChannelIds,
                  ...post.recipientChannelIds,
                ]),
                permittedUserIds: Object.keys(userPermissions),
                userPermissions,
              };
            },
            initialValue,
          );

          const lastUnsentPost = unsentPostsForThread.at(-1);

          const isLastUnsentPostMoreRecent =
            !!lastUnsentPost &&
            (timestampComparer(thread.lastPost.sentAt, lastUnsentPost.sentAt) ||
              timestampComparer(
                thread.lastPost.scheduledToBeSentAt,
                lastUnsentPost.scheduledToBeSentAt,
              )) < 0;

          const lastPost = isLastUnsentPostMoreRecent
            ? lastUnsentPost
            : thread.lastPost;

          switch (thread.type) {
            case "COMMS": {
              return {
                ...thread,
                lastPost: lastPost as ICommsPostDoc,
                ...threadRecipientDetails,
                participatingUserIds:
                  threadRecipientDetails.participatingUserIds,
                permittedChannelIds: threadRecipientDetails.permittedChannelIds,
                permittedUserIds: threadRecipientDetails.permittedUserIds,
              } satisfies ICommsThreadDoc | ICommsThreadDocFromDraft;
            }
            case "EMAIL": {
              return {
                ...thread,
                lastPost: lastPost as IEmailPostDoc,
                ...threadRecipientDetails,
                participatingUserIds:
                  threadRecipientDetails.participatingUserIds,
                permittedChannelIds: threadRecipientDetails.permittedChannelIds,
                permittedUserIds: threadRecipientDetails.permittedUserIds,
              } satisfies IEmailThreadDoc | IEmailThreadDocFromDraft;
            }
            default: {
              throw new UnreachableCaseError(thread);
            }
          }
        }),
      );
    }),
    switchMap(
      (
        thread:
          | IThreadDoc
          | IEmailThreadDocFromSecretEmailThread
          | IThreadDocFromDraft
          | null,
      ) =>
        !thread
          ? of(thread)
          : USER_CHANNELS$.pipe(
              switchMap(async (channels) => {
                const { parsePostHTML } = await import(
                  "@libs/utils/parseHTMLProse"
                );

                return {
                  ...thread,
                  firstPost: {
                    ...thread.firstPost,
                    bodyHTML: await parsePostHTML(thread.firstPost.bodyHTML),
                  },
                  lastPost: {
                    ...thread.lastPost,
                    bodyHTML: await parsePostHTML(thread.lastPost.bodyHTML),
                  },
                  __docType: "IThreadDoc" as const,
                  __local: {
                    ...("__local" in thread ? thread.__local : {}),
                    knownPermittedChannels: thread.permittedChannelIds
                      .map((channelId) =>
                        channels.find((c) => c.id === channelId),
                      )
                      .filter(isNonNullable),
                  },
                } as IObserveThread;
              }),
            ),
    ),
    catchNoCurrentUserError(() => null),
    distinctUntilChanged(isEqual),
    cacheReplayForTime({
      timeMs: 5000,
      onInit() {
        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
        observeThreadCache.set(threadId, obs!);
      },
      onCleanup() {
        observeThreadCache.delete(threadId);
      },
    }),
  );

  return obs;
}

const observeThreadCache = new Map<string, Observable<IObserveThread | null>>();

export function getThread(threadId: string): Promise<IThreadDoc | null> {
  return firstValueFrom(observeThread(threadId));
}

function observeThreadsBranchedFromPost(postId: string) {
  const idsOfThreadsBranchedFromPost$ = observePost(postId).pipe(
    map((post) => post?.branchedThreadIds || []),
  );

  const threadIdsOfSendDraftsBranchedFromPost$ = SENT_DRAFTS_AS_THREADS$.pipe(
    map((threads) =>
      threads
        .filter((thread) => thread.branchedFrom?.postId === postId)
        .map((thread) => thread.id),
    ),
  );

  const combinedThreadIdsBranchedFromPost$ = combineLatest([
    idsOfThreadsBranchedFromPost$,
    threadIdsOfSendDraftsBranchedFromPost$,
  ]).pipe(
    map(
      ([
        idsOfThreadsBranchedFromPost,
        threadIdsOfSendDraftsBranchedFromPost,
      ]) => [
        ...idsOfThreadsBranchedFromPost,
        ...threadIdsOfSendDraftsBranchedFromPost,
      ],
    ),
    distinctUntilChanged(isEqual),
  );

  return combinedThreadIdsBranchedFromPost$.pipe(
    switchMap((threadIds) => {
      return threadIds.length === 0
        ? of([])
        : combineLatest(threadIds.map((threadId) => observeThread(threadId)));
    }),
    map((threads) => {
      return threads.filter(isNonNullable).sort((a, b) => {
        return (
          timestampComparer(a.lastPost.sentAt, b.lastPost.sentAt) ||
          timestampComparer(
            a.lastPost.scheduledToBeSentAt,
            b.lastPost.scheduledToBeSentAt,
          ) ||
          stringComparer(a.id, b.id)
        );
      });
    }),
  );
}

export function useThreadsBranchedFromPost(postId?: string) {
  return useObservable(
    () => {
      if (!postId) return of([]);

      return observeThreadsBranchedFromPost(postId);
    },
    { deps: [postId] },
  );
}

export function useThread(threadId: string | undefined) {
  return useObservable(
    () => {
      if (!threadId) return of(null);
      return observeThread(threadId);
    },
    { deps: [threadId] },
  );
}

/**
 * Provided threadId cannot be for a secret thread. This is just
 * a safety mechanism since, in general, we don't want to directly
 * observe secret threads.
 */
export function observeThreadPosts(
  threadId: string,
  options: {
    endAt?: [postSentAt: Timestamp, postScheduledToBeSentAt: Timestamp];
    /**
     * Typically, Firestore will cache documents on the client and, when
     * possible, immediately return them for queries without waiting to
     * hear back from the server. While this is often desirable, sometimes
     * this can result in undesireable behavior because a query will
     * initially return with a few results before emitting shortly later
     * with the full results of the query. Set this option to `true` to
     * require query results to come from the server.
     */
    noFromCache?: boolean;
  } = {},
): Observable<IObservePost[]> {
  const mapPostsToObservePosts = (posts: Array<IPostDoc | IPostDocFromDraft>) =>
    USER_CHANNELS$.pipe(
      map((channels) =>
        posts.map((post) => {
          return {
            ...post,
            __docType: "IPostDoc" as const,
            __local: {
              ...("__local" in post ? post.__local : {}),
              knownRecipientChannels: post.recipientChannelIds
                .map((channelId) => channels.find((c) => c.id === channelId))
                .filter(isNonNullable),
            },
          } as IObservePost;
        }),
      ),
    );

  // These are drafts which the user has "sent" but which
  // haven't actually been picked up by our firebase function
  // and "sent" on the backend yet. This means that none of
  // the recipients have been notified yet.
  const postsCurrentUserHasSubmittedForSending: Observable<IObservePost[]> =
    SENT_DRAFTS_AS_POSTS$.pipe(
      map((posts) => {
        let filteredPosts = posts.filter((post) => post.threadId === threadId);

        if (options.endAt) {
          const [postSentAt, postScheduledToBeSentAt] = options.endAt;

          filteredPosts = filteredPosts.filter(
            (post) =>
              post.sentAt.valueOf() <= postSentAt.valueOf() &&
              post.scheduledToBeSentAt.valueOf() <=
                postScheduledToBeSentAt.valueOf(),
          );
        }

        return filteredPosts;
      }),
      switchMap(mapPostsToObservePosts),
    );

  const thread$ = observeThread(threadId);

  // If the thread being accessed is a draft, attempting to query for
  // posts with that `threadId` will throw a security rules error. We
  // try to only query for posts if the thread isn't a draft.
  const existingThreadPosts: Observable<IObservePost[]> = thread$.pipe(
    switchMap((threadDoc) =>
      !threadDoc ||
      ("__local" in threadDoc && threadDoc.__local.fromUnsafeDraft)
        ? of([])
        : fromRef(
            query(
              collectionRef("posts"),
              where(`threadId`, "==", threadDoc.id),
              orderBy("sentAt", "asc"),
              orderBy("scheduledToBeSentAt", "asc"),
              ...(options.endAt ? [endAt(...options.endAt)] : []),
            ),
            { includeMetadataChanges: !!options.noFromCache },
          ).pipe(
            filter(
              (snaps) => !options.noFromCache || !snaps?.metadata.fromCache,
            ),
            map((snaps) => snaps?.docs.map((s) => s.data()) || []),
            switchMap(mapPostsToObservePosts),
          ),
    ),
  );

  // If the thread being accessed is a draft, attempting to query for
  // posts with that `threadId` will throw a security rules error. We
  // try to only query for posts if the thread isn't a draft.
  const existingSecretThreadPosts: Observable<IObservePost[]> = thread$.pipe(
    switchMap((threadDoc) =>
      !threadDoc ||
      ("__local" in threadDoc && threadDoc.__local.fromUnsafeDraft)
        ? of([])
        : ASSERT_CURRENT_USER_ID$.pipe(
            switchMap((currentUserId) =>
              fromRef(
                query(
                  collectionRef<ISecretEmailPostDoc>("posts"),
                  where(`type`, "==", "EMAIL_SECRET"),
                  where(`forUserId`, "==", currentUserId),
                  where(`forThreadId`, "==", threadDoc.id),
                  orderBy("sentAt", "asc"),
                  orderBy("scheduledToBeSentAt", "asc"),
                  ...(options.endAt ? [endAt(...options.endAt)] : []),
                ),
                { includeMetadataChanges: !!options.noFromCache },
              ),
            ),
            filter(
              (snaps) => !options.noFromCache || !snaps?.metadata.fromCache,
            ),
            map(
              (snaps) =>
                snaps?.docs.map((s) =>
                  mapSecretEmailPostToEmailPost(s.data()),
                ) || [],
            ),
            switchMap(mapPostsToObservePosts),
          ),
    ),
  );

  return ASSERT_CURRENT_USER_ID$.pipe(
    switchMap(() =>
      combineLatest([
        postsCurrentUserHasSubmittedForSending,
        existingThreadPosts,
        existingSecretThreadPosts,
      ]),
    ),
    map(([pendingPosts, sentPosts, sentSecretPosts]) => {
      return sentPosts
        .concat(
          sentSecretPosts,
          // After a pendingPost is sent for "real", it's possible for the new
          // "real" post to sync to the client before the old draft's deletion
          // is synced to the client.
          pendingPosts.filter(
            (post) => !sentPosts.some((p) => p.id === post.id),
          ),
        )
        .sort(postComparer);
    }),
    switchMap(async (posts) => {
      return Promise.all(
        posts.map(async (post) => ({
          ...post,
          bodyHTML: await parsePostHTML(post.bodyHTML),
        })),
      );
    }),
    catchNoCurrentUserError(() => []),
    distinctUntilChanged(isEqual),
    shareReplay({ bufferSize: 1, refCount: true }),
  );
}

export function getThreadPosts(threadId: string) {
  return firstValueFrom(observeThreadPosts(threadId));
}

export function useBranchedThreadPosts(
  branchedFrom?: NonNullable<IThreadDoc["branchedFrom"]>,
) {
  return useObservable(
    () => {
      if (!branchedFrom) return of([]);

      return observeThreadPosts(branchedFrom.threadId, {
        endAt: [branchedFrom.postSentAt, branchedFrom.postScheduledToBeSentAt],
        // The branched posts will be quoted at the top of a thread but we
        // generally want to focus the first non-quoted post. As such, it's
        // important for us to know when all the quoted threads have finished
        // loading so that we can focus the first non-quoted post.
        noFromCache: true,
      });
    },
    {
      deps: [
        branchedFrom?.threadId,
        branchedFrom?.postSentAt.valueOf(),
        branchedFrom?.postScheduledToBeSentAt.valueOf(),
      ],
    },
  );
}

export function observeSentPosts(page: number | BehaviorSubject<number>) {
  // These are drafts which the user has "sent" but which
  // haven't actually been picked up by our firebase function
  // and "sent" on the backend yet. This means that none of
  // the recipients have been notified yet.
  const postsCurrentUserHasSubmittedForSending = SENT_DRAFTS_AS_POSTS$;

  const { docs$: sentPosts, ...other } = getPagedQuery(
    (limitPostCountTo) =>
      ASSERT_CURRENT_USER_ID$.pipe(
        switchMap((userId) =>
          collectionData(
            query(
              collectionRef("posts"),
              where("creatorId", "==", userId),
              orderBy("sentAt", "desc"),
              orderBy("scheduledToBeSentAt", "desc"),
              limit(limitPostCountTo),
            ),
          ),
        ),
      ),
    page,
  );

  const posts$ = combineLatest([
    postsCurrentUserHasSubmittedForSending,
    sentPosts,
  ]).pipe(
    map(([pendingPosts, sentPosts]) => {
      return sentPosts
        .concat(
          // After a pendingPost is sent for "real", it's possible for the new
          // "real" post to sync to the client before the old draft's deletion
          // is synced to the client.
          pendingPosts.filter(
            (post) => !sentPosts.some((p) => p.id === post.id),
          ),
        )
        .sort((a, b) => {
          return (
            timestampComparer(b.sentAt, a.sentAt) ||
            timestampComparer(b.scheduledToBeSentAt, a.scheduledToBeSentAt)
          );
        });
    }),
    catchNoCurrentUserError(() => []),
    distinctUntilChanged(isEqual),
  );

  return { posts$, ...other };
}

export function useSentPosts(options: {
  /**
   * Loads an initial chunk of posts and then loads
   * more when the user scrolls to the bottom of the
   * element associated with this scrollboxRef.
   */
  pagingScrollboxRef: RefObject<HTMLElement>;
}) {
  const { pagingScrollboxRef } = options;

  const { posts$, getNextPage, loading$ } = useMemo(() => {
    return observeSentPosts(1);
  }, []);

  useListPaging({ getNextPage, pagingScrollboxRef, loading$ });

  return useObservable(() => posts$ || of([]), {
    deps: [posts$],
  });
}

export function observeThreadReadStatus(threadId: string) {
  return ASSERT_CURRENT_USER_ID$.pipe(
    switchMap((userId) => {
      return docData(docRef("threads", threadId, "readStatus", userId));
    }),
    catchNoCurrentUserError(() => null),
    shareReplay({ bufferSize: 1, refCount: true }),
  );
}

export function getThreadReadStatus(threadId: string) {
  return firstValueFrom(observeThreadReadStatus(threadId));
}

export function observePostReactionForCurrentUser(postId: string) {
  return ASSERT_CURRENT_USER_ID$.pipe(
    switchMap((userId) =>
      docData(docRef("users", userId, "postReactions", postId)),
    ),
  ).pipe(
    catchNoCurrentUserError(() => null),
    distinctUntilChanged(isEqual),
  );
}

export type IUsePostReaction = {
  id: IUserDoc["id"];
} & IAcceptedOrganizationMemberDoc["user"];

export function observePostReactions(
  postId: string,
): Observable<Map<string, IUsePostReaction[]>> {
  return ASSERT_CURRENT_USER$.pipe(
    map((user) => pick(user, "id", "name", "email", "photoURL", "phoneNumber")),
    distinctUntilChanged(isEqual),
    switchMap((currentUser) =>
      combineLatest([
        collectionData(
          query(collectionGroupRef("postReactions"), where("id", "==", postId)),
        ),
        ALL_OTHER_ACCEPTED_MEMBERS_OF_USERS_ORGANIZATIONS$,
        // We also subscribe to the specific post reactions of the current user
        // just so that we have this data cached. We'll need this data when editing
        // post reactions and having it cached will make the UI snappier.
        observePostReactionForCurrentUser(postId),
      ]).pipe(
        map(([reactions, members]) => {
          const store = new Map<string, IUsePostReaction[]>();

          reactions.forEach((reaction) => {
            let userDetails: IUsePostReaction;

            if (reaction.userId === currentUser.id) {
              userDetails = {
                id: currentUser.id,
                name: currentUser.name,
                email: currentUser.email,
                photoURL: currentUser.photoURL,
                phoneNumber: currentUser.phoneNumber,
              };
            } else {
              const member = members.find(
                (member) => member.id === reaction.userId,
              );

              if (!member) return;

              userDetails = {
                id: member.id,
                ...member.user,
              };
            }

            reaction.reactions.forEach((emoji) => {
              if (!store.has(emoji)) {
                store.set(emoji, []);
              }

              // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
              store.get(emoji)!.push(userDetails);
            });
          });

          return store;
        }),
      ),
    ),
  ).pipe(
    catchNoCurrentUserError(() => new Map()),
    distinctUntilChanged(isEqual),
  );
}

export function usePostReactions(
  postId?: string,
): Array<[string, IUsePostReaction[]]> | undefined {
  return useObservable(
    () =>
      postId
        ? observePostReactions(postId).pipe(
            map((store) => Array.from(store.entries())),
          )
        : of([]),
    {
      deps: [postId],
    },
  );
}

export function observeChannelThreads(
  channelId: string,
  /** First page is 1 */
  page: number | BehaviorSubject<number>,
) {
  // These are drafts which the user has "sent" but which
  // haven't actually been picked up by our firebase function
  // and "sent" on the backend yet. This means that none of
  // the recipients have been notified yet.
  const threadsCurrentUserHasSubmittedForSending = SENT_DRAFTS_AS_THREADS$.pipe(
    map((threads) =>
      threads.filter((thread) =>
        thread.permittedChannelIds.includes(channelId),
      ),
    ),
  );

  // If you have access to a channel, then you have access to all the
  // threads in that channel. For this reason, we don't need to query
  // for "EMAIL_SECRET" threads. If a user has any "EMAIL_SECRET"
  // threads associated with any of these channel threads, then
  // the user will see them when they navigate to that specific thread.
  const getQuery = (limitThreadCountTo: number) =>
    collectionData(
      query(
        collectionRef("threads"),
        where("permittedChannelIds", "array-contains", channelId),
        orderBy("lastPost.sentAt", "desc"),
        orderBy("lastPost.scheduledToBeSentAt", "desc"),
        limit(limitThreadCountTo),
      ),
    );

  const {
    docs$: channelThreads,
    end$,
    loading$,
    page$,
    getNextPage,
  } = getPagedQuery(getQuery, page);

  const threads$ = ASSERT_CURRENT_USER_ID$.pipe(
    switchMap(() =>
      combineLatest([threadsCurrentUserHasSubmittedForSending, channelThreads]),
    ),
    map(([pendingThreads, sentThreads]) => {
      return sentThreads
        .concat(
          // After a pendingThreads is picked up and sent for "real" by the server,
          // it's possible for the new "real" thread to sync to the client before
          // the old draft's deletion is synced to the client.
          pendingThreads.filter(
            (thread) => !sentThreads.some((t) => t.id === thread.id),
          ),
        )
        .sort((a, b) => {
          return (
            // Note that we're sorting in DESC order
            timestampComparer(b.lastPost.sentAt, a.lastPost.sentAt) ||
            timestampComparer(
              b.lastPost.scheduledToBeSentAt,
              a.lastPost.scheduledToBeSentAt,
            )
          );
        });
    }),
    catchNoCurrentUserError(() => []),
    distinctUntilChanged(isEqual),
  );

  return { threads$, page$, end$, loading$, getNextPage };
}

export function useChannelThreads(options: {
  channelId?: string;
  /**
   * Loads an initial chunk of threads and then loads
   * more when the user scrolls to the bottom of the
   * element associated with this scrollboxRef.
   */
  pagingScrollboxRef: RefObject<HTMLElement>;
}): IThreadDoc[] | undefined {
  const { channelId, pagingScrollboxRef } = options;

  const [threads$, getNextPage, loading$] = useMemo(() => {
    if (!channelId) return [];

    const { threads$, getNextPage, loading$ } = observeChannelThreads(
      channelId,
      1,
    );

    return [threads$, getNextPage, loading$] as const;
  }, [channelId]);

  useListPaging({ getNextPage, pagingScrollboxRef, loading$ });

  return useObservable(() => threads$ || of([]), {
    deps: [threads$],
  });
}

export function observeSharedMessagesThreads(
  page: number | BehaviorSubject<number>,
) {
  const sharedChannelIds$ = ASSERT_CURRENT_USER$.pipe(
    map((currentUser) => Object.keys(currentUser.organizationPermissions)),
    distinctUntilChanged(isEqual),
    switchMap((organizationIds) => {
      const organizations$ =
        organizationIds.length === 0
          ? of([])
          : combineLatest(
              organizationIds.map((orgId) =>
                docData(docRef("organizations", orgId)),
              ),
            );

      return organizations$;
    }),
    map((organizations) => {
      if (organizations.length === 0) return [];

      const sharedChannelIds = organizations
        .filter(isNonNullable)
        .map((org) => org.sharedChannelId);

      if (sharedChannelIds.length > 6) {
        // TODO:
        // Firebase only supports up to 10 entries in an `array-contains-any`
        // clause. In the long run, we'll probably need to lean into our search
        // functionality to render the shared messages page (at time of writing
        // we don't have search).

        alert(`
          It looks like you're a member of more than 6 organizations.
          We weren't expecting any users to join more than 6 organizations yet.
          Please reach out to the Comms team by emailing john.carroll.p@gmail.com
          and let him know that you saw this message. If you join too many
          more organizations, the "Shared Messages" page will stop working. 
          Note, you haven't done anything wrong. We just weren't expecting 
          anyone to join this many orgs so quickly (our bad!).
        `);

        return [];
      }

      return sharedChannelIds;
    }),
    shareReplay(1),
  );

  // These are drafts which the user has "sent" but which
  // haven't actually been picked up by our firebase function
  // and "sent" on the backend yet. This means that none of
  // the recipients have been notified yet.
  const sharedThreadsCurrentUserHasSubmittedForSending = combineLatest([
    SENT_DRAFTS_AS_THREADS$,
    sharedChannelIds$,
  ]).pipe(
    map(([threads, sharedChannelIds]) =>
      threads.filter((thread) =>
        thread.permittedChannelIds.some((id) => sharedChannelIds.includes(id)),
      ),
    ),
  );

  const {
    docs$: sharedChannelThreads,
    page$,
    end$,
    loading$,
    getNextPage,
  } = getPagedQuery(
    (limitThreadCountTo) =>
      sharedChannelIds$.pipe(
        switchMap((sharedChannelIds) =>
          collectionData(
            query(
              collectionRef("threads"),
              where(
                "permittedChannelIds",
                "array-contains-any",
                sharedChannelIds,
              ),
              orderBy("lastPost.sentAt", "desc"),
              orderBy("lastPost.scheduledToBeSentAt", "desc"),
              limit(limitThreadCountTo),
            ),
          ),
        ),
      ),
    page,
  );

  const threads$ = combineLatest([
    sharedThreadsCurrentUserHasSubmittedForSending,
    sharedChannelThreads,
  ]).pipe(
    map(([pendingThreads, sentThreads]) => {
      return sentThreads
        .concat(
          // After a pendingThreads is picked up and sent for "real" by the server,
          // it's possible for the new "real" thread to sync to the client before
          // the old draft's deletion is synced to the client.
          pendingThreads.filter(
            (thread) => !sentThreads.some((t) => t.id === thread.id),
          ),
        )
        .sort((a, b) => {
          return (
            // Note that we're sorting in DESC order
            timestampComparer(b.lastPost.sentAt, a.lastPost.sentAt) ||
            timestampComparer(
              b.lastPost.scheduledToBeSentAt,
              a.lastPost.scheduledToBeSentAt,
            )
          );
        });
    }),
    catchNoCurrentUserError(() => []),
    distinctUntilChanged(isEqual),
    shareReplay(1),
  );

  return { threads$, page$, end$, loading$, getNextPage };
}

export function useSharedMessagesThreads(options: {
  /**
   * Loads an initial chunk of threads and then loads
   * more when the user scrolls to the bottom of the
   * element associated with this scrollboxRef.
   */
  pagingScrollboxRef: RefObject<HTMLElement>;
}) {
  const { pagingScrollboxRef } = options;

  const { threads$, getNextPage, loading$ } = useMemo(() => {
    return observeSharedMessagesThreads(1);
  }, []);

  useListPaging({ getNextPage, pagingScrollboxRef, loading$ });

  return useObservable(() => threads$ || of([]), {
    deps: [threads$],
  });
}

/**
 *
 * @returns Array of channels or `null` if a thread with this
 *   ID doesn't exist
 */
export function observeThreadChannels(threadId: string) {
  return combineLatest([observeThread(threadId), USER_CHANNELS$]).pipe(
    map(([thread, channels]) => {
      if (!thread) return null;

      return thread.permittedChannelIds
        .map((channelId) => channels.find((c) => c.id === channelId))
        .filter(
          (c): c is IChannelDocWithCurrentUserData =>
            !!c && !c.isOrganizationSharedChannel,
        );
    }),
    catchNoCurrentUserError(() => null),
    distinctUntilChanged(isEqual),
  );
}

export const removeCurrentUserFromThreadParticipants = withPendingUpdate(
  async (threadId: string) => {
    const currentUser = getAndAssertCurrentUser();

    const thread = await firstValueFrom(observeThread(threadId));

    if (!thread) return;

    if ("__local" in thread && thread.__local.fromUnsafeDraft) {
      // In the case of an unsent draft, there isn't a participatingUserIds
      // array to update.
      return;
    }

    const promise = updateDoc(docRef("threads", threadId), {
      participatingUserIds: arrayRemove(currentUser.id),
      [`participatingUsers.${currentUser.id}`]: deleteField(),
      updatedAt: serverTimestamp(),
    });

    await offlineAwareFirestoreCRUD(promise);
  },
);

/**
 * Returns a function that can be used to indicate that a particular
 * post has scrolled into view and was "read". Will only actually
 * update the "read" status of the thread if the post that scrolled
 * into view hadn't previously been read.
 *
 * `getThreadReadStatusUpdaterFn()` should be called inside a `useMemo()`
 * hook after a thread's initial "readStatus" document has loaded and
 * should be called again whenever the thread changes.
 */
export function getThreadSeenUpdaterFn({
  threadId,
  initialThreadReadStatus,
}: {
  threadId: string;
  initialThreadReadStatus: IThreadReadStatusDoc | null;
}): (post: IPostDoc) => Promise<void> | undefined {
  if (
    initialThreadReadStatus &&
    initialThreadReadStatus.threadId !== threadId
  ) {
    const msg = `getPostWasReadUpdateFn: Provided an IThreadReadStatus document for the wrong thread`;
    console.error(msg, threadId, initialThreadReadStatus);
    throw new Error(msg);
  }

  let seenToSentAt = initialThreadReadStatus?.seenToSentAt;

  let seenToScheduledToBeSentAt =
    initialThreadReadStatus?.seenToScheduledToBeSentAt;

  const currentUser = getAndAssertCurrentUser();

  // We're not bothing to using PendingUpdate for read status. We don't want
  // block the browser closing just for this.
  const updateThreadReadStatus = debounce(async () => {
    if (!seenToSentAt || !seenToScheduledToBeSentAt) return;

    return setDoc(
      docRef("threads", threadId, "readStatus", currentUser.id),
      {
        id: currentUser.id,
        threadId,
        seenToSentAt,
        seenToScheduledToBeSentAt,
        updatedAt: serverTimestamp(),
      },
      {
        merge: true,
      },
    );
  }, 1500);

  return (post: IPostDoc) => {
    if (!seenToSentAt || !seenToScheduledToBeSentAt) {
      seenToSentAt = post.sentAt;
      seenToScheduledToBeSentAt = post.scheduledToBeSentAt;
      return updateThreadReadStatus();
    }

    let changed = false;

    if (seenToSentAt.valueOf() < post.sentAt.valueOf()) {
      seenToSentAt = post.sentAt;
      changed = true;
    }

    if (
      seenToScheduledToBeSentAt.valueOf() < post.scheduledToBeSentAt.valueOf()
    ) {
      seenToScheduledToBeSentAt = post.scheduledToBeSentAt;
      changed = true;
    }

    if (!changed) return;

    return updateThreadReadStatus();
  };
}

export const mergePostReactions = withPendingUpdate(
  async (postId: string, reactions: string[]) => {
    const currentUser = getAndAssertCurrentUser();

    const promise = setDoc(
      docRef("users", currentUser.id, "postReactions", postId),
      {
        id: postId,
        userId: currentUser.id,
        reactions: reactions,
        updatedAt: serverTimestamp(),
      },
    );

    await offlineAwareFirestoreCRUD(promise);
  },
);

/* -----------------------------------------------------------------------------------------------*/
