import { EMPTY, Observable, of } from 'rxjs'
import { concatMap, materialize } from 'rxjs/operators'

export function batchOnConsecutive<T, P>(propertySelector: (item: T) => P) {

    let currentProperty: P
    let currentBatch: T[] = []

    return (source: Observable<T>): Observable<T[]> => {
        return source.pipe(
            materialize(),
            concatMap(notification => {
                switch (notification.kind) {
                    case 'N': {
                        const item = notification.value
                        const property = propertySelector(item)
                        if (property === currentProperty) {
                            currentBatch.push(item)
                            return EMPTY
                        } else {
                            currentProperty = property
                            const batch = currentBatch
                            currentBatch = [item]
                            return batch.length === 0 ? EMPTY : of(batch)
                        }
                    }
                    case 'E': {
                        throw notification.error
                    }
                    case 'C': {
                        return currentBatch.length === 0 ? EMPTY : of(currentBatch)
                    }
                }
            })
        )
    }
}