wp-go/stream/simpleMapStream.go

86 lines
1.7 KiB
Go
Raw Permalink Normal View History

2023-01-16 13:14:42 +00:00
package stream
import (
2023-01-25 18:26:36 +00:00
"github.com/fthvgb1/wp-go/helper/maps"
"github.com/fthvgb1/wp-go/taskPools"
2023-01-16 13:14:42 +00:00
"sync"
)
type mapX[K comparable, V any] struct {
m map[K]V
mut sync.Mutex
}
func (r *mapX[K, V]) set(k K, v V) {
r.mut.Lock()
defer r.mut.Unlock()
r.m[k] = v
}
func newMapX[K comparable, V any]() mapX[K, V] {
return mapX[K, V]{
m: map[K]V{},
mut: sync.Mutex{},
}
}
2023-01-28 14:36:15 +00:00
func SimpleMapFilterAndMapToSlice[R any, K comparable, V any](mm MapStream[K, V], fn func(K, V) (R, bool)) Stream[R] {
return NewStream(maps.FilterToSlice(mm.m, fn))
2023-01-16 13:14:42 +00:00
}
2023-01-28 14:36:15 +00:00
func SimpleMapParallelFilterAndMapToMap[K comparable, V any, KK comparable, VV any](mm MapStream[KK, VV], fn func(KK, VV) (K, V, bool), c int) MapStream[K, V] {
2023-01-16 13:14:42 +00:00
m := newMapX[K, V]()
mm.ParallelForEach(func(kk KK, vv VV) {
k, v, ok := fn(kk, vv)
if ok {
m.set(k, v)
}
}, c)
2023-01-28 14:36:15 +00:00
return MapStream[K, V]{m.m}
2023-01-16 13:14:42 +00:00
}
2023-01-28 14:36:15 +00:00
func SimpleMapStreamFilterAndMapToMap[K comparable, V any, KK comparable, VV comparable](a MapStream[KK, VV], fn func(KK, VV) (K, V, bool)) (r MapStream[K, V]) {
r = MapStream[K, V]{make(map[K]V)}
2023-01-16 13:14:42 +00:00
for k, v := range a.m {
kk, vv, ok := fn(k, v)
if ok {
r.m[kk] = vv
}
}
return
}
2023-01-28 14:36:15 +00:00
func NewSimpleMapStream[K comparable, V any](m map[K]V) MapStream[K, V] {
return MapStream[K, V]{m}
2023-01-16 13:14:42 +00:00
}
2023-01-28 14:36:15 +00:00
type MapStream[K comparable, V any] struct {
2023-01-16 13:14:42 +00:00
m map[K]V
}
2023-01-28 14:36:15 +00:00
func (r MapStream[K, V]) ForEach(fn func(K, V)) {
2023-01-16 13:14:42 +00:00
for k, v := range r.m {
fn(k, v)
}
}
2023-01-28 14:36:15 +00:00
func (r MapStream[K, V]) ParallelForEach(fn func(K, V), c int) {
2023-01-16 13:14:42 +00:00
p := taskPools.NewPools(c)
for k, v := range r.m {
k := k
v := v
p.Execute(func() {
fn(k, v)
})
}
p.Wait()
}
2023-01-28 14:36:15 +00:00
func (r MapStream[K, V]) Len() int {
2023-01-16 13:14:42 +00:00
return len(r.m)
}
2023-01-28 14:36:15 +00:00
func (r MapStream[K, V]) Result() map[K]V {
2023-01-16 13:14:42 +00:00
return r.m
}