import type { ServiceStream, ServiceStreamOptions, ServiceStreamCallback } from "../ServiceDefinition"
import { ServiceStreamValueInterceptor } from "./private"
import { ServiceRuntimePrivate } from "../private"
import { ServiceError } from "../ServiceErrors"
import { ServiceDebugging } from "../ServiceDebugging"

/**
 * ServiceEventEmitter is a stream implementation for a (potentially infinite) sequence of events
 * not associated with any particular task or service message. It supports any number of attached
 * AsyncIterators, with no guarantee as to which event will appear in the first iteration. Use for
 * publish/subscribe or state observing behavior.
 */
export class ServiceEventEmitter<T extends object> {
    /**
     * Callback that's invoked when a new stream is created.
     * Optionally returns a custom "latest" value for the new stream.
     */
    onNewStream?: (options?: ServiceStreamOptions) => { latest: T } | undefined

    /**
     * Returns a new ServiceStream that reads the events emitted by this ServiceEventEmitter.
     */
    readonly newStream = (options?: ServiceStreamOptions): ServiceStream<T> => {
        const id = ServiceRuntimePrivate.generateUniqueId()

        return new ServiceStreamIterator(
            // Individual stream
            (update, done) => {
                this.iterators.push({ id, update, done })

                // Invoke the callback, optionally returning extra info
                const callbackReturnValue = this.onNewStream?.(options)

                // Some streams may want an initial value instead of waiting for a new emit
                if (options?.replay === "latest") {
                    const latestValue = callbackReturnValue?.latest ?? this.latestValue
                    if (latestValue) {
                        update(latestValue)
                    } else {
                        // Since value replay is opt-in for service definitions, enforce correct implementation
                        throw new ServiceError.Implementation(
                            `ServiceEventEmitter needs a "latest" value, but nothing has been emitted or returned by the onNewStream callback`
                        )
                    }
                } else if (callbackReturnValue) {
                    // Make sure the callback doesn't calculate things unnecessarily
                    throw new ServiceError.Implementation(
                        `ServiceEventEmitter received a "latest" value from the onNewStream callback for a stream that didn't need it`
                    )
                }
            },
            // Ended iteration (either from client or server side)
            () => {
                const i = this.iterators.findIndex(iterator => iterator.id === id)
                if (i >= 0) {
                    this.iterators.splice(i, 1)
                } else {
                    throw new ServiceError.BadRequest(
                        `ServiceEventEmitter couldn't find cancelled iterator with id: ${id}`
                    )
                }
            }
        )
    }

    private readonly iterators: {
        id: string
        update: (value: T) => void
        done: () => void
    }[] = []
    private latestValue?: T

    readonly emit = (value: T) => {
        this.latestValue = value
        for (const iterator of this.iterators) {
            iterator.update(value)
        }
    }

    readonly latest = () => {
        return this.latestValue
    }
}

/**
 * ServiceStreamIterator is a stream implementation for a finite sequence of updates or data packets, sent as a result
 * of one specific service message. It can be iterated only once. Use for one-off task status updates or data transfer.
 */
class ServiceStreamIterator<T extends object> implements ServiceStream<T> {
    private readonly log = ServiceDebugging.log.extend("ServiceStreamIterator")

    private hasAsyncIterator = false
    private updatesBeforeAsyncIterator: T[] = []
    private onUpdate?: ServiceStreamValueInterceptor<T>

    constructor(
        task: (update: (value: T) => void, done: () => void) => void,
        private readonly onIteratorEnd?: () => void
    ) {
        this.promises = [ServiceRuntimePrivate.newResolvablePromise()]
        task(this.update, this.update)
    }

    // Note: the implementation of Router.onRequest implicitly knows about the callback argument
    [Symbol.asyncIterator](onUpdate?: ServiceStreamValueInterceptor<T>): AsyncIterator<T> {
        if (this.hasAsyncIterator) {
            throw new Error("ServiceStreamIterator.asyncIterator() may only be called once")
        }
        this.onUpdate = onUpdate
        this.hasAsyncIterator = true
        this.updatesBeforeAsyncIterator.forEach(this.update)
        this.updatesBeforeAsyncIterator = []
        return this
    }

    private readonly doneResult: IteratorResult<T> = { done: true, value: (undefined as unknown) as T }
    private readonly promises: ServiceRuntimePrivate.ResolvablePromise<IteratorResult<T>>[] = []
    private returnedNextPromise: ServiceRuntimePrivate.ResolvablePromise<IteratorResult<T>> | undefined

    private readonly update = (value?: T | ServiceError) => {
        const { hasAsyncIterator, updatesBeforeAsyncIterator, promises, returnedNextPromise } = this

        // When receiving updates before anyone is reading, just queue them for replay when the iteration starts.
        // Recall that every update needs to have the appropriate onUpdate called, which isn't available yet.
        if (!hasAsyncIterator) {
            if (!value || value instanceof ServiceError) {
                // Only the internals call update() with a non-T value, and this shouldn't happen before iteration
                throw new ServiceError.BadRequest("ServiceStream received return or throw before being read")
            }

            updatesBeforeAsyncIterator.push(value)
            return
        }

        // Figure out which promise to resolve (depending on whether the writer or reader is "ahead")
        let lastPromise = (promises as Partial<typeof promises>)[this.promises.length - 1]
        if (value && lastPromise === undefined) {
            if (!returnedNextPromise) {
                this.log.warn("lastPromise and returnedNextPromise should never both be undefined")
                return
            }

            lastPromise = returnedNextPromise
        }

        if (value === undefined) {
            // Done: clean up the last next() if necessary (may already have happened if cancel/return calls this)
            lastPromise?.resolve(this.doneResult)
            // Note: this depends on multiple resolves/rejects on a promise having no effect
            returnedNextPromise?.resolve(this.doneResult)
        } else if (value instanceof ServiceError) {
            // Error: clean up the last next() if necessary (may already have happened if cancel/return calls this)
            lastPromise?.reject(value)
            // Note: this depends on multiple resolves/rejects on a promise having no effect
            returnedNextPromise?.reject(value)
        } else {
            // Update: values may be intercepted (unlike returns/throws)
            if (this.onUpdate?.(value).ignore) {
                return
            }

            // Prepare a promise for the next next()
            promises.push(ServiceRuntimePrivate.newResolvablePromise())
            lastPromise?.resolve({ done: false, value })
        }
    }

    readonly next = async () => {
        const promise = this.promises.shift()
        this.returnedNextPromise = promise
        return promise || this.doneResult
    }

    readonly return = async () => {
        this.update(undefined)
        this.onIteratorEnd?.()
        return this.doneResult
    }

    readonly throw = async (error: ServiceError) => {
        this.update(error)
        this.onIteratorEnd?.()
        return this.doneResult
    }

    readonly read = async (callback: ServiceStreamCallback<T>): Promise<void> => {
        for await (const event of this) {
            await callback(event)
        }
    }

    readonly cancel = async () => {
        await this.return()
    }
}
