import { Message } from "google-protobuf"
import { getAuthMetadata } from "/src/auth"
import { ABORTED_STATUS_CODE } from "/src/utils/grpc"
import type { GrpcCallOptions, GrpcCallResponse, GrpcRequest, IGrpcRequestQueue } from "./IGrpcRequestQueue"
import { ClientReadableStream } from "grpc-web"

class GrpcRequestQueue implements IGrpcRequestQueue {
    private queue: GrpcRequest<Message,  Message | Message[]>[] = []
    private activeCount = 0
    private readonly maxConcurrentRequests: number

    constructor(maxConcurrentRequests: number = 5) {
        this.maxConcurrentRequests = maxConcurrentRequests
    }

    enqueue<T extends Message, U extends Message | Message[]>(grpcRequest: GrpcRequest<T, U>): void {
        this.queue.push(grpcRequest)
        this.processQueue()
    }

    execute<T extends Message, U extends Message | Message[]>(grpcRequest: GrpcRequest<T, U>): void {
        this.executeRequest(grpcRequest)
            .then(grpcRequest.resolve)
            .catch(grpcRequest.reject)
    }

    private processQueue() {
        while (this.activeCount < this.maxConcurrentRequests && this.queue.length > 0) {
            const request = this.queue.shift()
            if (request) {
                this.activeCount++
                this.executeRequest(request)
                    .then(request.resolve)
                    .catch(request.reject)
                    .finally(() => {
                        this.activeCount--
                        this.processQueue()
                    })
            }
        }
    }

    private executeRequest<T extends Message, U extends Message | Message[]>({ client, method, request, controller, metadata }: GrpcRequest<T, U>): Promise<U | U[]> {
        return new Promise<U | U[]>((resolve, reject) => {
            const call = client[method](request, metadata)

            if (controller) {
                controller.signal.addEventListener("abort", () => {
                    reject({ code: ABORTED_STATUS_CODE })
                })
            }

            call.then(
                // Add type for when it's a stream
                (response: U | ClientReadableStream<U>) => {
                    // If is stream
                    if ("on" in response) {
                        const responseData: U[] = []
                        response.on("data", (data: U) => {
                            responseData.push(data)
                        })
                        response.on("end", () => {
                            resolve(responseData)
                        })
                        response.on("error", reject)
                    } else {
                        resolve(response as U)
                    }
                },
            ).catch(reject)
        })
    }
}

const requestQueue = new GrpcRequestQueue()

export function enqueueGrpcCall<T extends Message, U extends Message>(options: GrpcCallOptions<T>): GrpcCallResponse<U> {
    return makeGrpcCall<T, U>({ ...options, enqueue: true })
}

export function executeGrpcCall<T extends Message, U extends Message>(options: GrpcCallOptions<T>): GrpcCallResponse<U> {
    return makeGrpcCall<T, U>({ ...options, enqueue: false })
}

function makeGrpcCall<T extends Message, U extends Message>(options: GrpcCallOptions<T> & { enqueue: boolean }): GrpcCallResponse<U> {
    const { client, method, request, controller = new AbortController(), metadata } = options

    const promise = new Promise<U>((resolve, reject) => {
        const requestParams = {
            client,
            method,
            request,
            resolve,
            reject,
            metadata,
            controller,
        }

        getAuthMetadata().then((authMetadata) => {
            requestParams.metadata = { ...authMetadata, ...metadata }
            if (options.enqueue) {
                requestQueue.enqueue(requestParams)
            } else {
                requestQueue.execute(requestParams)
            }
        })
    })

    return { promise, cancel: controller.abort }
}
