完善 Stream

This commit is contained in:
xing 2023-01-16 17:59:42 +08:00
parent d90060c6a2
commit 3db3276082
2 changed files with 187 additions and 1 deletions

View File

@ -68,6 +68,43 @@ func (r SimpleSliceStream[T]) ParallelMap(fn func(T) T, c int) SimpleSliceStream
p.Wait()
return SimpleSliceStream[T]{rr.Load()}
}
func SimpleParallelFilterAndMap[R, T any](a SimpleSliceStream[T], fn func(T) (R, bool), c int) SimpleSliceStream[R] {
p := taskPools.NewPools(c)
var x []R
rr := safety.NewSlice(x)
for _, t := range a.arr {
t := t
p.Execute(func() {
y, ok := fn(t)
if ok {
rr.Append(y)
}
})
}
p.Wait()
return SimpleSliceStream[R]{rr.Load()}
}
func SimpleStreamFilterAndMap[R, T any](a SimpleSliceStream[T], fn func(T) (R, bool)) SimpleSliceStream[R] {
return NewSimpleSliceStream(helper.SliceFilterAndMap(a.arr, fn))
}
func SimpleParallelMap[R, T any](a SimpleSliceStream[T], fn func(T) R, c int) SimpleSliceStream[R] {
p := taskPools.NewPools(c)
var x []R
rr := safety.NewSlice(x)
for _, t := range a.arr {
t := t
p.Execute(func() {
rr.Append(fn(t))
})
}
p.Wait()
return SimpleSliceStream[R]{rr.Load()}
}
func SimpleStreamMap[R, T any](a SimpleSliceStream[T], fn func(T) R) SimpleSliceStream[R] {
return NewSimpleSliceStream(helper.SliceMap(a.arr, fn))
}
func (r SimpleSliceStream[T]) Map(fn func(T) T) SimpleSliceStream[T] {
r.arr = helper.SliceMap(r.arr, fn)
return r
@ -78,6 +115,10 @@ func (r SimpleSliceStream[T]) Sort(fn func(i, j T) bool) SimpleSliceStream[T] {
return r
}
func (r SimpleSliceStream[T]) Len() int {
return len(r.arr)
}
func (r SimpleSliceStream[T]) Limit(limit, offset int) SimpleSliceStream[T] {
return SimpleSliceStream[T]{r.arr[offset : offset+limit]}
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"github/fthvgb1/wp-go/helper"
"reflect"
"strconv"
"testing"
)
@ -278,7 +279,7 @@ func TestSimpleSliceStream_ParallelMap(t *testing.T) {
if got := tt.r.ParallelMap(tt.args.fn, tt.args.c).Sort(func(i, j int) bool {
return i < j
}); !reflect.DeepEqual(got, tt.want) {
t.Errorf("ParallelMap() = %v, want %v", got, tt.want)
t.Errorf("SimpleParallelMap() = %v, want %v", got, tt.want)
}
})
}
@ -337,3 +338,147 @@ func TestSimpleSliceStream_Reverse(t *testing.T) {
})
}
}
var x = helper.RangeSlice(1, 100000, 1)
func TestSimpleStreamMap(t *testing.T) {
type args[T int, R string] struct {
a SimpleSliceStream[T]
fn func(T) R
}
type testCase[T int, R string] struct {
name string
args args[T, R]
want SimpleSliceStream[R]
}
tests := []testCase[int, string]{
{
name: "t1",
args: args[int, string]{
a: NewSimpleSliceStream(x),
fn: strconv.Itoa,
},
want: SimpleSliceStream[string]{
helper.SliceMap(x, strconv.Itoa),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := SimpleStreamMap(tt.args.a, tt.args.fn); !reflect.DeepEqual(got, tt.want) {
t.Errorf("SimpleStreamMap() = %v, want %v", got, tt.want)
}
})
}
}
func TestSimpleParallelMap(t *testing.T) {
type args[T string, R int] struct {
a SimpleSliceStream[string]
fn func(T) R
c int
}
type testCase[T string, R int] struct {
name string
args args[T, R]
want SimpleSliceStream[R]
}
tests := []testCase[string, int]{
{
name: "t1",
args: args[string, int]{
a: NewSimpleSliceStream(helper.SliceMap(x, strconv.Itoa)),
fn: func(s string) int {
i, _ := strconv.Atoi(s)
return i
},
c: 6,
},
want: NewSimpleSliceStream(x),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := SimpleParallelMap(tt.args.a, tt.args.fn, tt.args.c).Sort(func(i, j int) bool {
return i < j
}); !reflect.DeepEqual(got, tt.want) {
t.Errorf("SimpleParallelMap() = %v, want %v", got, tt.want)
}
})
}
}
func TestSimpleParallelFilterAndMap(t *testing.T) {
type args[T string, R int] struct {
a SimpleSliceStream[string]
fn func(T) (R, bool)
c int
}
type testCase[T string, R int] struct {
name string
args args[T, R]
want SimpleSliceStream[R]
}
tests := []testCase[string, int]{
{
name: "t1",
args: args[string, int]{
a: NewSimpleSliceStream(helper.SliceMap(x, strconv.Itoa)),
fn: func(s string) (int, bool) {
i, _ := strconv.Atoi(s)
if i > 50000 {
return i, true
}
return 0, false
},
c: 6,
},
want: NewSimpleSliceStream(x[50000:]),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := SimpleParallelFilterAndMap(tt.args.a, tt.args.fn, tt.args.c).Sort(func(i, j int) bool {
return i < j
}); !reflect.DeepEqual(got, tt.want) {
t.Errorf("SimpleParallelFilterAndMap() = %v, want %v", got, tt.want)
}
})
}
}
func TestSimpleStreamFilterAndMap(t *testing.T) {
type args[T string, R int] struct {
a SimpleSliceStream[T]
fn func(T) (R, bool)
}
type testCase[T any, R any] struct {
name string
args args[string, int]
want SimpleSliceStream[R]
}
tests := []testCase[string, int]{
{
name: "t1",
args: args[string, int]{
a: NewSimpleSliceStream(helper.SliceMap(x, strconv.Itoa)),
fn: func(s string) (int, bool) {
i, _ := strconv.Atoi(s)
if i > 50000 {
return i, true
}
return 0, false
},
},
want: NewSimpleSliceStream(x[50000:]),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := SimpleStreamFilterAndMap(tt.args.a, tt.args.fn); !reflect.DeepEqual(got, tt.want) {
t.Errorf("SimpleStreamFilterAndMap() = %v, want %v", got, tt.want)
}
})
}
}