import {Call, OnCallEnd} from './Call'
import {
    concatMap,
    dematerialize,
    filter,
    from,
    lastValueFrom,
    map,
    materialize,
    Observable,
    of,
    startWith,
    Subject,
    tap
} from 'rxjs'
import {
    bufferJson,
    completeNotification,
    errorNotification,
    firstOrTimeout,
    nextNotification,
} from '@peachy/utility-kit-rxjs'
import {CallCenter} from './CallCenter'
import {ActionApi, ActionApiDefinition, Socket} from './socket-kit'
import {CallPacket, ofType} from '../index'
import {AsyncDisposableResource} from '@peachy/utility-kit-disposable'


export type DuplexConfig<I extends ActionApi, O extends ActionApi> = {
    incomingApiDef: ActionApiDefinition<I>,
    outgoingApiDef: ActionApiDefinition<O>,
    incomingApiProvider: (outgoingApi: O) => Promise<I>,
    socket: Socket<CallPacket<ActionNote>>
    batchByteSize: number
    socketSendTimeoutMillis: number,
    socketReceiptTimeoutMillis: number,
    onCallEnd?: OnCallEnd
}


export class Duplex<I extends ActionApi, O extends ActionApi> implements AsyncDisposableResource {

    private callCenter: CallCenter<ActionNote>

    constructor(
        private incomingApiDef: ActionApiDefinition<I>,
        private outgoingApiDef: ActionApiDefinition<O>,
        private incomingApiProvider: (outgoingApi: O) => Promise<I>,
        private socket: Socket<CallPacket<ActionNote>>,
        private batchByteSize: number,
        private socketSendTimeoutMillis: number,
        private socketReceiptTimeoutMillis: number,
        private onCallEnd?: OnCallEnd
    ) {
        this.callCenter = new CallCenter<ActionNote>(
            socket,
            (call) => this.receiveCall(call),
            socketSendTimeoutMillis,
            socketReceiptTimeoutMillis,
            onCallEnd
        )
    }

    public async makeCall(): Promise<[O, () => Promise<void>]> {
        const call = await this.callCenter.makeCall()
        const bridge = new DuplexBridge(
            this.incomingApiDef,
            this.outgoingApiDef,
            this.incomingApiProvider,
            call,
            this.batchByteSize,
            this.socketSendTimeoutMillis,
            this.socketReceiptTimeoutMillis,
            this.onCallEnd,
        )

        const outgoingApi = bridge.outgoingApi
        // @ts-ignore
        outgoingApi['endCall'] = () => call.callEnd()

        return [outgoingApi, () => call.callEnd()]
    }

    private async receiveCall(call: Call<ActionNote>) {
        new DuplexBridge(
            this.incomingApiDef,
            this.outgoingApiDef,
            this.incomingApiProvider,
            call,
            this.batchByteSize,
            this.socketSendTimeoutMillis,
            this.socketReceiptTimeoutMillis,
            this.onCallEnd,
        )
    }


    public async asyncDispose(): Promise<void> {
        await this.callCenter.asyncDispose()
    }

    public async [Symbol.asyncDispose]() {
        await this.asyncDispose()
    }
}


class DuplexBridge<IncomingApi extends ActionApi, OutgoingApi extends ActionApi> {

    public readonly outgoingApi: OutgoingApi
    private incomingApi: IncomingApi

    constructor(
        private incomingApiDef: ActionApiDefinition<IncomingApi>,
        private outgoingApiDef: ActionApiDefinition<OutgoingApi>,
        private incomingApiProvider: (outgoingApi: OutgoingApi) => Promise<IncomingApi>,
        private call: Call<ActionNote>,
        private batchByteSize: number,
        private socketSendTimeoutMillis: number,
        private socketReceiptTimeoutMillis: number,
        private onCallEnd?: OnCallEnd
    ) {

        this.outgoingApi = Object.keys(this.outgoingApiDef).reduce((raw, action) => {
            raw[action] = this.implOutgoingAction(action)
            return raw
        }, {} as any) as OutgoingApi

        call.in.pipe(
            filter(isInvocationNote),
            tap(invocation => {
                switch (invocation.type) {
                    case 'InvokeValueValue':
                        return from(this.handleValueValue(invocation))
                    case 'InvokeValueStream':
                        return from(this.handleValueStream(invocation))
                    case 'InvokeStreamValue':
                        return from(this.handleStreamValue(invocation))
                    case 'InvokeStreamStream':
                        return from(this.handleStreamStream(invocation))
                }
            })
        ).subscribe()
    }

    public async getIncomingApi() {
        if (!this.incomingApi) {
            this.incomingApi = await this.incomingApiProvider(this.outgoingApi)
        }
        return this.incomingApi
    }


    private implOutgoingAction(
        action: string
    ) {
        switch (this.responseTypeForAction(action)) {
            case 'VALUE':
                return async (value?: unknown) => {
                    switch (this.invokeTypeForValue(value)) {
                        case 'VALUE' :
                            return this.invokeValueValue(action, value)
                        case 'STREAM':
                            return this.invokeStreamValue(action, value as Observable<any>)
                    }
                }
            case 'STREAM':
                return (value?: unknown) => {
                    switch (this.invokeTypeForValue(value)) {
                        case 'VALUE' :
                            return this.invokeValueStream(action, value)
                        case 'STREAM':
                            return this.invokeStreamStream(action, value as Observable<unknown>)
                    }
                }
        }
    }

    private async invokeValueValue(action: string, value?: unknown) {
        const responsePromise = this.responsePromise(action)
        await this.call.out(ofType<InvokeValueValue>({
            type: 'InvokeValueValue',
            action,
            value
        }))
        const response = await responsePromise
        if (response.type === 'ValueReject') {
            throw response.error
        }
        return response.value
    }

    private invokeValueStream(action: string, value?: unknown) {
        const responseObservable = new Subject()


        const sub = this.incomingStream(action).pipe(
            mapFromStreamNote()
        ).subscribe(responseObservable)

        this.call.out(ofType<InvokeValueStream>({
            type: 'InvokeValueStream',
            action,
            value
        })).catch((error) => {
            sub.unsubscribe()
            responseObservable.error(error)
        })
        return responseObservable
    }

    private async invokeStreamValue(action: string, stream: Observable<unknown>) {

        const responsePromise = this.responsePromise(action)

        await lastValueFrom(
            stream.pipe(
                mapToStreamNote(action, this.batchByteSize),
                startWith(ofType<InvokeStreamValue>({
                    type: 'InvokeStreamValue',
                    action,
                })),
                concatMap(
                    m => from(this.call.out(m))
                )
            )
        )

        const response = await responsePromise
        if (response.type === 'ValueReject') {
            throw response.error
        }
        return response
    }

    private invokeStreamStream(action: string, incomingStream: Observable<unknown>) {

        const responseStream = new Subject()
        const sub = this.incomingStream(action).pipe(
            mapFromStreamNote()
        ).subscribe(responseStream)

        lastValueFrom(
            incomingStream.pipe(
                mapToStreamNote(action, this.batchByteSize),
                startWith(ofType<InvokeStreamStream>({
                    type: 'InvokeStreamStream',
                    action,
                })),
                concatMap(
                    m => from(this.call.out(m))
                )
            )
        ).catch(e => {
            responseStream.error(e)
            sub.unsubscribe()
        })

        return responseStream
    }


    private responsePromise(action: string) {
        return firstOrTimeout(
            this.call.in.pipe(
                filter(isForAction(action)),
                filter(isValueResponse),
            )
        )
    }

    private incomingStream(action: string) {
        return this.call.in.pipe(
            filter(isForAction(action)),
            filter(isStreamNote),
        )
    }

    private async handleValueValue(
        invocation: InvokeValueValue
    ) {

        const incomingApiImpl = await this.getIncomingApi()
        const rawResponse = incomingApiImpl[invocation.action](invocation.value as any)

        if (rawResponse instanceof Observable) {
            throw 'a wobbly'
        }

        return rawResponse.then(
            (response) => {
                return this.call.out(ofType<ValueResolve>({
                    type: 'ValueResolve',
                    action: invocation.action,
                    value: response
                }))
            }
        ).catch(
            (error) => {

                console.log('Error in handleValueValue', error, typeof error, error?.toString())

                return this.call.out(ofType<ValueReject>({
                    type: 'ValueReject',
                    action: invocation.action,
                    error
                }))
            }
        )
    }

    private async handleValueStream(
        invocation: InvokeValueStream
    ) {

        const incomingApiImpl = await this.getIncomingApi()
        const rawResponse = incomingApiImpl[invocation.action](invocation.value as any)
        if (rawResponse instanceof Promise) {
            throw 'a wobbly'
        }
        await lastValueFrom(
            rawResponse.pipe(
                mapToStreamNote(invocation.action, this.batchByteSize),
                concatMap(
                    m => from(this.call.out(m))
                )
            )
        )
    }

    private async handleStreamValue(
        invocation: InvokeStreamValue
    ) {
        const stream = new Subject()

        const incomingApiImpl = await this.getIncomingApi()
        const rawResponse = incomingApiImpl[invocation.action](stream as any)

        if (rawResponse instanceof Observable) {
            throw 'a wobbly'
        }

        const sub = this.incomingStream(invocation.action).pipe(
            mapFromStreamNote()
        ).subscribe(stream)

        return rawResponse.then(
            (response: unknown) => {
                sub.unsubscribe()
                return this.call.out(ofType<ValueResolve>({
                    type: 'ValueResolve',
                    action: invocation.action,
                    value: response
                }))
            }
        ).catch(
            (error: any) => {
                sub.unsubscribe()
                return this.call.out(ofType<ValueReject>({
                    type: 'ValueReject',
                    action: invocation.action,
                    error
                }))
            }
        )
    }


    private async handleStreamStream(
        invocation: InvokeStreamStream
    ) {
        const stream = new Subject()

        const incomingApiImpl = await this.getIncomingApi()
        const rawResponse = incomingApiImpl[invocation.action](stream as any)

        if (rawResponse instanceof Promise) {
            throw 'a wobbly'
        }

        const responseStream = rawResponse as Observable<unknown>

        const sub = this.incomingStream(invocation.action).pipe(
            mapFromStreamNote()
        ).subscribe(stream)

        await lastValueFrom(
            responseStream.pipe(
                mapToStreamNote(invocation.action, this.batchByteSize),
                concatMap(
                    m => {
                        return from(this.call.out(m))
                    }
                )
            )
        ).finally(
            () => sub.unsubscribe()
        )
    }

    private hasPayload(action: string) {
        return !!this.incomingApiDef[action].length
    }

    private invokeTypeForValue(value?: any): NoteType {
        return value instanceof Observable ? 'STREAM' : 'VALUE'
    }

    private responseTypeForAction(action: string): NoteType {
        const responseType = this.outgoingApiDef[action](null)
        switch (true) {
            case responseType instanceof Promise:
                return 'VALUE'
            case responseType instanceof Observable:
                return 'STREAM'
        }
    }
}


function mapToStreamNote(action: string, batchByteSize: number) {
    return (o: Observable<unknown>): Observable<StreamNote> => {
        return o.pipe(
            bufferJson(batchByteSize),
            materialize(),
            map(n => {
                switch (n.kind) {
                    case 'N':
                        return ofType<StreamNext>({
                            type: 'StreamNext',
                            action,
                            batch: n.value.items()
                        })
                    case 'C': {
                        return ofType<StreamComplete>({
                            type: 'StreamComplete',
                            action,
                        })
                    }
                    case 'E':
                        return ofType<StreamError>({
                            type: 'StreamError',
                            action,
                            error: n.error
                        })
                }
            }),
        )
    }
}

function mapFromStreamNote() {
    return (o: Observable<StreamNote>): Observable<unknown> => {
        return o.pipe(
            concatMap(n => {
                switch (n.type) {
                    case 'StreamNext': {
                        return from(n.batch.map(n => nextNotification(n)))
                    }
                    case 'StreamComplete': {
                        return of(completeNotification())
                    }
                    case 'StreamError': {
                        return of(errorNotification(n.error))
                    }
                }
            }),
            dematerialize(),
        )
    }
}


function isForAction(action: string) {
    return (n: ActionNote) => {
        return n.action === action
    }
}


function isValueResponse(n: ActionNote): n is ValueResponse {
    return n.type === 'ValueResolve'
        || n.type === 'ValueReject'
}

function isStreamNote(n: ActionNote): n is StreamNote {
    return n.type === 'StreamNext'
        || n.type === 'StreamComplete'
        || n.type === 'StreamError'
}

function isInvocationNote(n: ActionNote): n is InvocationNote {
    return n.type === 'InvokeValueValue'
        || n.type === 'InvokeValueStream'
        || n.type === 'InvokeStreamValue'
        || n.type === 'InvokeStreamStream'
}

type NoteType = 'VALUE' | 'STREAM'

type InvokeValueValue = {
    type: 'InvokeValueValue'
    action: string
    value?: unknown
}

type InvokeValueStream = {
    type: 'InvokeValueStream'
    action: string
    value?: unknown
}

type InvokeStreamValue = {
    type: 'InvokeStreamValue'
    action: string
}

type InvokeStreamStream = {
    type: 'InvokeStreamStream'
    action: string
}

type ValueResolve = {
    type: 'ValueResolve'
    action: string
    value?: unknown
}

type ValueReject = {
    type: 'ValueReject'
    action: string
    error: any
}


type StreamNext = {
    type: 'StreamNext'
    action: string
    batch: unknown[]
}

type StreamComplete = {
    type: 'StreamComplete'
    action: string
}

type StreamError = {
    type: 'StreamError'
    action: string
    error: any
}

export type ActionNote =
    | InvokeValueValue
    | InvokeValueStream
    | InvokeStreamValue
    | InvokeStreamStream
    | ValueResolve
    | ValueReject
    | StreamNext
    | StreamComplete
    | StreamError

type ValueResponse =
    | ValueResolve
    | ValueReject

type StreamNote =
    | StreamNext
    | StreamComplete
    | StreamError

type InvocationNote =
    | InvokeValueValue
    | InvokeValueStream
    | InvokeStreamValue
    | InvokeStreamStream
