diff --git a/stream/simpleStream.go b/stream/simpleStream.go index d780f2f..c12a6be 100644 --- a/stream/simpleStream.go +++ b/stream/simpleStream.go @@ -6,18 +6,56 @@ import ( "github/fthvgb1/wp-go/taskPools" ) +func SimpleParallelFilterAndMap[R, T any](a SimpleSliceStream[T], fn func(T) (R, bool), c int) SimpleSliceStream[R] { + p := taskPools.NewPools(c) + var x []R + rr := safety.NewSlice(x) + for _, t := range a.arr { + t := t + p.Execute(func() { + y, ok := fn(t) + if ok { + rr.Append(y) + } + }) + } + p.Wait() + return SimpleSliceStream[R]{rr.Load()} +} + +func SimpleStreamFilterAndMap[R, T any](a SimpleSliceStream[T], fn func(T) (R, bool)) SimpleSliceStream[R] { + return NewSimpleSliceStream(helper.SliceFilterAndMap(a.arr, fn)) +} + +func SimpleParallelMap[R, T any](a SimpleSliceStream[T], fn func(T) R, c int) SimpleSliceStream[R] { + p := taskPools.NewPools(c) + var x []R + rr := safety.NewSlice(x) + for _, t := range a.arr { + t := t + p.Execute(func() { + rr.Append(fn(t)) + }) + } + p.Wait() + return SimpleSliceStream[R]{rr.Load()} +} +func SimpleStreamMap[R, T any](a SimpleSliceStream[T], fn func(T) R) SimpleSliceStream[R] { + return NewSimpleSliceStream(helper.SliceMap(a.arr, fn)) +} + func Reduce[T any, S any](s SimpleSliceStream[S], fn func(S, T) T, init T) (r T) { return helper.SliceReduce(s.arr, fn, init) } -type SimpleSliceStream[T any] struct { - arr []T -} - func NewSimpleSliceStream[T any](arr []T) SimpleSliceStream[T] { return SimpleSliceStream[T]{arr: arr} } +type SimpleSliceStream[T any] struct { + arr []T +} + func (r SimpleSliceStream[T]) ForEach(fn func(T)) { for _, t := range r.arr { fn(t) @@ -68,43 +106,7 @@ func (r SimpleSliceStream[T]) ParallelMap(fn func(T) T, c int) SimpleSliceStream p.Wait() return SimpleSliceStream[T]{rr.Load()} } -func SimpleParallelFilterAndMap[R, T any](a SimpleSliceStream[T], fn func(T) (R, bool), c int) SimpleSliceStream[R] { - p := taskPools.NewPools(c) - var x []R - rr := safety.NewSlice(x) - for _, t := range a.arr { - t := t - p.Execute(func() { - y, ok := fn(t) - if ok { - rr.Append(y) - } - }) - } - p.Wait() - return SimpleSliceStream[R]{rr.Load()} -} -func SimpleStreamFilterAndMap[R, T any](a SimpleSliceStream[T], fn func(T) (R, bool)) SimpleSliceStream[R] { - return NewSimpleSliceStream(helper.SliceFilterAndMap(a.arr, fn)) -} - -func SimpleParallelMap[R, T any](a SimpleSliceStream[T], fn func(T) R, c int) SimpleSliceStream[R] { - p := taskPools.NewPools(c) - var x []R - rr := safety.NewSlice(x) - for _, t := range a.arr { - t := t - p.Execute(func() { - rr.Append(fn(t)) - }) - } - p.Wait() - return SimpleSliceStream[R]{rr.Load()} -} -func SimpleStreamMap[R, T any](a SimpleSliceStream[T], fn func(T) R) SimpleSliceStream[R] { - return NewSimpleSliceStream(helper.SliceMap(a.arr, fn)) -} func (r SimpleSliceStream[T]) Map(fn func(T) T) SimpleSliceStream[T] { r.arr = helper.SliceMap(r.arr, fn) return r @@ -120,7 +122,15 @@ func (r SimpleSliceStream[T]) Len() int { } func (r SimpleSliceStream[T]) Limit(limit, offset int) SimpleSliceStream[T] { - return SimpleSliceStream[T]{r.arr[offset : offset+limit]} + l := len(r.arr) + if offset >= l { + return SimpleSliceStream[T]{} + } + ll := offset + limit + if ll > l { + ll = l + } + return SimpleSliceStream[T]{r.arr[offset:ll]} } func (r SimpleSliceStream[T]) Reverse() SimpleSliceStream[T] { diff --git a/stream/simpleStream_test.go b/stream/simpleStream_test.go index 615acb0..5030433 100644 --- a/stream/simpleStream_test.go +++ b/stream/simpleStream_test.go @@ -92,6 +92,24 @@ func TestSimpleSliceStream_Limit(t *testing.T) { }, want: SimpleSliceStream[int]{helper.RangeSlice(6, 8, 1)}, }, + { + name: "t2", + r: s, + args: args{ + limit: 3, + offset: 9, + }, + want: SimpleSliceStream[int]{helper.RangeSlice(10, 10, 1)}, + }, + { + name: "t3", + r: s, + args: args{ + limit: 3, + offset: 11, + }, + want: SimpleSliceStream[int]{}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {