From 3db327608205a4c6380344c281fe7c7a7c50fd15 Mon Sep 17 00:00:00 2001 From: xing Date: Mon, 16 Jan 2023 17:59:42 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=20Stream?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- stream/simpleStream.go | 41 ++++++++++ stream/simpleStream_test.go | 147 +++++++++++++++++++++++++++++++++++- 2 files changed, 187 insertions(+), 1 deletion(-) diff --git a/stream/simpleStream.go b/stream/simpleStream.go index 101b66d..d780f2f 100644 --- a/stream/simpleStream.go +++ b/stream/simpleStream.go @@ -68,6 +68,43 @@ func (r SimpleSliceStream[T]) ParallelMap(fn func(T) T, c int) SimpleSliceStream p.Wait() return SimpleSliceStream[T]{rr.Load()} } +func SimpleParallelFilterAndMap[R, T any](a SimpleSliceStream[T], fn func(T) (R, bool), c int) SimpleSliceStream[R] { + p := taskPools.NewPools(c) + var x []R + rr := safety.NewSlice(x) + for _, t := range a.arr { + t := t + p.Execute(func() { + y, ok := fn(t) + if ok { + rr.Append(y) + } + }) + } + p.Wait() + return SimpleSliceStream[R]{rr.Load()} +} + +func SimpleStreamFilterAndMap[R, T any](a SimpleSliceStream[T], fn func(T) (R, bool)) SimpleSliceStream[R] { + return NewSimpleSliceStream(helper.SliceFilterAndMap(a.arr, fn)) +} + +func SimpleParallelMap[R, T any](a SimpleSliceStream[T], fn func(T) R, c int) SimpleSliceStream[R] { + p := taskPools.NewPools(c) + var x []R + rr := safety.NewSlice(x) + for _, t := range a.arr { + t := t + p.Execute(func() { + rr.Append(fn(t)) + }) + } + p.Wait() + return SimpleSliceStream[R]{rr.Load()} +} +func SimpleStreamMap[R, T any](a SimpleSliceStream[T], fn func(T) R) SimpleSliceStream[R] { + return NewSimpleSliceStream(helper.SliceMap(a.arr, fn)) +} func (r SimpleSliceStream[T]) Map(fn func(T) T) SimpleSliceStream[T] { r.arr = helper.SliceMap(r.arr, fn) return r @@ -78,6 +115,10 @@ func (r SimpleSliceStream[T]) Sort(fn func(i, j T) bool) SimpleSliceStream[T] { return r } +func (r SimpleSliceStream[T]) Len() int { + return len(r.arr) +} + func (r SimpleSliceStream[T]) Limit(limit, offset int) SimpleSliceStream[T] { return SimpleSliceStream[T]{r.arr[offset : offset+limit]} } diff --git a/stream/simpleStream_test.go b/stream/simpleStream_test.go index 6b4b692..3c8d92a 100644 --- a/stream/simpleStream_test.go +++ b/stream/simpleStream_test.go @@ -4,6 +4,7 @@ import ( "fmt" "github/fthvgb1/wp-go/helper" "reflect" + "strconv" "testing" ) @@ -278,7 +279,7 @@ func TestSimpleSliceStream_ParallelMap(t *testing.T) { if got := tt.r.ParallelMap(tt.args.fn, tt.args.c).Sort(func(i, j int) bool { return i < j }); !reflect.DeepEqual(got, tt.want) { - t.Errorf("ParallelMap() = %v, want %v", got, tt.want) + t.Errorf("SimpleParallelMap() = %v, want %v", got, tt.want) } }) } @@ -337,3 +338,147 @@ func TestSimpleSliceStream_Reverse(t *testing.T) { }) } } + +var x = helper.RangeSlice(1, 100000, 1) + +func TestSimpleStreamMap(t *testing.T) { + type args[T int, R string] struct { + a SimpleSliceStream[T] + fn func(T) R + } + type testCase[T int, R string] struct { + name string + args args[T, R] + want SimpleSliceStream[R] + } + tests := []testCase[int, string]{ + { + name: "t1", + args: args[int, string]{ + a: NewSimpleSliceStream(x), + fn: strconv.Itoa, + }, + want: SimpleSliceStream[string]{ + helper.SliceMap(x, strconv.Itoa), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := SimpleStreamMap(tt.args.a, tt.args.fn); !reflect.DeepEqual(got, tt.want) { + t.Errorf("SimpleStreamMap() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSimpleParallelMap(t *testing.T) { + type args[T string, R int] struct { + a SimpleSliceStream[string] + fn func(T) R + c int + } + type testCase[T string, R int] struct { + name string + args args[T, R] + want SimpleSliceStream[R] + } + + tests := []testCase[string, int]{ + { + name: "t1", + args: args[string, int]{ + a: NewSimpleSliceStream(helper.SliceMap(x, strconv.Itoa)), + fn: func(s string) int { + i, _ := strconv.Atoi(s) + return i + }, + c: 6, + }, + want: NewSimpleSliceStream(x), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := SimpleParallelMap(tt.args.a, tt.args.fn, tt.args.c).Sort(func(i, j int) bool { + return i < j + }); !reflect.DeepEqual(got, tt.want) { + t.Errorf("SimpleParallelMap() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSimpleParallelFilterAndMap(t *testing.T) { + type args[T string, R int] struct { + a SimpleSliceStream[string] + fn func(T) (R, bool) + c int + } + type testCase[T string, R int] struct { + name string + args args[T, R] + want SimpleSliceStream[R] + } + tests := []testCase[string, int]{ + { + name: "t1", + args: args[string, int]{ + a: NewSimpleSliceStream(helper.SliceMap(x, strconv.Itoa)), + fn: func(s string) (int, bool) { + i, _ := strconv.Atoi(s) + if i > 50000 { + return i, true + } + return 0, false + }, + c: 6, + }, + want: NewSimpleSliceStream(x[50000:]), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := SimpleParallelFilterAndMap(tt.args.a, tt.args.fn, tt.args.c).Sort(func(i, j int) bool { + return i < j + }); !reflect.DeepEqual(got, tt.want) { + t.Errorf("SimpleParallelFilterAndMap() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSimpleStreamFilterAndMap(t *testing.T) { + type args[T string, R int] struct { + a SimpleSliceStream[T] + fn func(T) (R, bool) + } + type testCase[T any, R any] struct { + name string + args args[string, int] + want SimpleSliceStream[R] + } + tests := []testCase[string, int]{ + { + name: "t1", + args: args[string, int]{ + a: NewSimpleSliceStream(helper.SliceMap(x, strconv.Itoa)), + fn: func(s string) (int, bool) { + i, _ := strconv.Atoi(s) + if i > 50000 { + return i, true + } + return 0, false + }, + }, + want: NewSimpleSliceStream(x[50000:]), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := SimpleStreamFilterAndMap(tt.args.a, tt.args.fn); !reflect.DeepEqual(got, tt.want) { + t.Errorf("SimpleStreamFilterAndMap() = %v, want %v", got, tt.want) + } + }) + } +}