import {Socket} from './socket-kit'
import {firstOrTimeout} from '@peachy/utility-kit-rxjs'
import {concatMap, filter, Observable, Subject, Subscription} from 'rxjs'
import {CallMessage, CallPacket, CallReceipt, isCallMessage, isCallPacket, isCallReceipt, MessageId} from './call-kit'

import {ofType} from '../index'
import {AsyncDisposableResource} from '@peachy/utility-kit-disposable'

export class Switchboard<M> implements AsyncDisposableResource {

    private readonly incomingPacket$ = new Subject<CallPacket<M>>()
    private readonly incomingMessage$ = new Subject<CallMessage<M>>()
    private readonly incomingReceipt$ = new Subject<CallReceipt>()

    private subscriptions: Subscription[] = []

    constructor(
        private socket: Socket<CallPacket<M>>,
        private socketTimeout: number,
        private receiptTimeout: number,
    ) {
        this.subscriptions.push(
            socket.in.pipe(
                filter(isCallPacket<M>)
            ).subscribe(this.incomingPacket$)
        )

        this.subscriptions.push(
            this.incomingPacket$.pipe(
                filter(isCallReceipt),
            ).subscribe(this.incomingReceipt$)
        )
        this.subscriptions.push(
            this.incomingPacket$.pipe(
                filter(isCallMessage<M>),
                concatMap((message: CallMessage<M>) => {
                    return socket.out(receiptFor(message), socketTimeout).then(() => message)
                }),
            ).subscribe(this.incomingMessage$)
        )
    }

    public async sendCallMessage(packet: CallMessage<M>, timeout?: number) {
        const receiptPromise = this.awaitReceipt(packet.messageId, timeout)

        await this.socket.out(packet, this.socketTimeout)
        const receipt = await receiptPromise

        if (receipt.error) {
            throw receipt.error
        }
    }

    private awaitReceipt = (messageId: MessageId, timeout: number = this.receiptTimeout) => {
        return firstOrTimeout(this.incomingReceipt$.pipe(
            filter(p => p.messageId === messageId),
        ), timeout)
    }

    public incomingMessages(): Observable<CallMessage<M>> {
        const incoming = new Subject<CallMessage<M>>()
        this.incomingMessage$.subscribe(incoming)
        return incoming
    }

    public async asyncDispose(): Promise<void> {
        this.subscriptions.forEach(sub => sub.unsubscribe())
        await this.socket.asyncDispose()
    }

    public async [Symbol.asyncDispose]() {
        await this.asyncDispose()
    }
}


function receiptFor(packet: CallPacket<unknown>, error?: any): CallReceipt {
    const receipt = ofType<CallReceipt>({
        type: 'CallReceipt',
        for: packet.type,
        callId: packet.callId,
        messageId: packet.messageId,
    })
    if (error) receipt.error = error
    return receipt
}
