import {concatMap, filter, map, Observable} from 'rxjs'

import {AsyncPredicate} from './pipeline-types'

export function filterAsync<T, F extends T>(
    predicate: AsyncPredicate<T>
) {
    return (o: Observable<T>): Observable<F> => {
        return o.pipe(
            concatMap(async (x, index) => {
                const include = await predicate(x, index)
                return {x, include}
            }),
            filter(({x, include}) => include),
            map(({x, include}) => x)
        ) as Observable<F>
    }
}
