import {bufferCount, concatMap, EMPTY, expand, from, Observable} from 'rxjs'


export type Observation<T> = () => Promise<T>

export type Condition<T> = (x: T) => boolean


export function sampleWhile<T>(observation: Observation<T>, condition: Condition<T>) {
    return from(observation()).pipe(
        expand(response => condition(response) ? from(observation()) : EMPTY)
    )
}

export function sampleUntil<T>(observation: Observation<T>, condition: Condition<T>) {
    return sampleWhile(observation, x => !condition(x))
}


export function bufferMap<T, R>(
    batchSize: number,
    batchHandler: (batch: T[]) => Observable<R[]>
){
    return (o: Observable<T>): Observable<R>  => {
        return o.pipe(
            bufferCount(batchSize),
            concatMap(batch => batchHandler(batch)),
            concatMap(batch => from(batch)),
        )
    }
}
