From 7cc974a33509e0cf3d5f3173c792b5b1fc7c528e Mon Sep 17 00:00:00 2001 From: xing Date: Mon, 16 Jan 2023 21:14:42 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=20stream?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- stream/simpleMapStream.go | 92 ++++++++++++ stream/simpleMapStream_test.go | 261 +++++++++++++++++++++++++++++++++ stream/simpleStream.go | 73 ++++----- stream/simpleStream_test.go | 38 +++++ 4 files changed, 423 insertions(+), 41 deletions(-) create mode 100644 stream/simpleMapStream.go create mode 100644 stream/simpleMapStream_test.go diff --git a/stream/simpleMapStream.go b/stream/simpleMapStream.go new file mode 100644 index 0000000..1beddee --- /dev/null +++ b/stream/simpleMapStream.go @@ -0,0 +1,92 @@ +package stream + +import ( + "github/fthvgb1/wp-go/safety" + "github/fthvgb1/wp-go/taskPools" + "sync" +) + +type mapX[K comparable, V any] struct { + m map[K]V + mut sync.Mutex +} + +func (r *mapX[K, V]) set(k K, v V) { + r.mut.Lock() + defer r.mut.Unlock() + r.m[k] = v +} + +func newMapX[K comparable, V any]() mapX[K, V] { + return mapX[K, V]{ + m: map[K]V{}, + mut: sync.Mutex{}, + } +} + +func SimpleMapFilterAndMapToSlice[R any, K comparable, V any](mm SimpleMapStream[K, V], fn func(K, V) (R, bool), c int) SimpleSliceStream[R] { + rr := safety.NewSlice([]R{}) + mm.ParallelForEach(func(k K, v V) { + vv, ok := fn(k, v) + if ok { + rr.Append(vv) + } + }, c) + return NewSimpleSliceStream(rr.Load()) +} + +func SimpleMapParallelFilterAndMapToMap[K comparable, V any, KK comparable, VV any](mm SimpleMapStream[KK, VV], fn func(KK, VV) (K, V, bool), c int) SimpleMapStream[K, V] { + m := newMapX[K, V]() + mm.ParallelForEach(func(kk KK, vv VV) { + k, v, ok := fn(kk, vv) + if ok { + m.set(k, v) + } + }, c) + return SimpleMapStream[K, V]{m.m} +} + +func SimpleMapStreamFilterAndMapToMap[K comparable, V any, KK comparable, VV comparable](a SimpleMapStream[KK, VV], fn func(KK, VV) (K, V, bool)) (r SimpleMapStream[K, V]) { + r = SimpleMapStream[K, V]{make(map[K]V)} + for k, v := range a.m { + kk, vv, ok := fn(k, v) + if ok { + r.m[kk] = vv + } + } + return +} + +func NewSimpleMapStream[K comparable, V any](m map[K]V) SimpleMapStream[K, V] { + return SimpleMapStream[K, V]{m} +} + +type SimpleMapStream[K comparable, V any] struct { + m map[K]V +} + +func (r SimpleMapStream[K, V]) ForEach(fn func(K, V)) { + for k, v := range r.m { + fn(k, v) + } +} + +func (r SimpleMapStream[K, V]) ParallelForEach(fn func(K, V), c int) { + p := taskPools.NewPools(c) + for k, v := range r.m { + k := k + v := v + p.Execute(func() { + fn(k, v) + }) + } + p.Wait() +} + +func (r SimpleMapStream[K, V]) Len() int { + return len(r.m) +} + +func (r SimpleMapStream[K, V]) Result() map[K]V { + return r.m +} diff --git a/stream/simpleMapStream_test.go b/stream/simpleMapStream_test.go new file mode 100644 index 0000000..54e08bd --- /dev/null +++ b/stream/simpleMapStream_test.go @@ -0,0 +1,261 @@ +package stream + +import ( + "fmt" + "github/fthvgb1/wp-go/helper" + "reflect" + "strconv" + "testing" +) + +func TestNewSimpleMapStream(t *testing.T) { + type args[K int, V int] struct { + m map[K]V + } + type testCase[K int, V int] struct { + name string + args args[K, V] + want SimpleMapStream[K, V] + } + tests := []testCase[int, int]{ + { + name: "t1", + args: args[int, int]{make(map[int]int)}, + want: SimpleMapStream[int, int]{make(map[int]int)}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := NewSimpleMapStream(tt.args.m); !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewSimpleMapStream() = %v, want %v", got, tt.want) + } + }) + } +} + +var y = helper.RangeSlice(1, 1000, 1) +var w = helper.SliceToMap(y, func(v int) (int, int) { + return v, v +}, true) + +func TestSimpleMapFilterAndMapToSlice(t *testing.T) { + type args[K int, V int, R int] struct { + mm SimpleMapStream[K, V] + fn func(K, V) (R, bool) + c int + } + type testCase[K int, V int, R int] struct { + name string + args args[K, V, R] + want SimpleSliceStream[R] + } + tests := []testCase[int, int, int]{ + { + name: "t1", + args: args[int, int, int]{ + mm: NewSimpleMapStream(w), + fn: func(k, v int) (int, bool) { + if v > 500 { + return v, true + } + return 0, false + }, + c: 6, + }, + want: NewSimpleSliceStream(y[500:]), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := SimpleMapFilterAndMapToSlice(tt.args.mm, tt.args.fn, tt.args.c).Sort(func(i, j int) bool { + return i < j + }); !reflect.DeepEqual(got, tt.want) { + t.Errorf("SimpleMapFilterAndMapToSlice() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSimpleMapParallelFilterAndMapToMap(t *testing.T) { + type args[KK string, VV string, K int, V int] struct { + mm SimpleMapStream[K, V] + fn func(K, V) (KK, VV, bool) + c int + } + type testCase[KK string, VV string, K int, V int] struct { + name string + args args[KK, VV, K, V] + want SimpleMapStream[KK, VV] + } + tests := []testCase[string, string, int, int]{ + { + name: "t1", + args: args[string, string, int, int]{ + mm: NewSimpleMapStream(w), + fn: func(k, v int) (string, string, bool) { + if v > 500 { + t := strconv.Itoa(v) + return t, t, true + } + return "", "", false + }, + c: 6, + }, + want: NewSimpleMapStream(helper.SliceToMap(y[500:], func(v int) (K, T string) { + t := strconv.Itoa(v) + return t, t + }, true)), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := SimpleMapParallelFilterAndMapToMap(tt.args.mm, tt.args.fn, tt.args.c); !reflect.DeepEqual(got, tt.want) { + t.Errorf("SimpleMapParallelFilterAndMapToMap() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSimpleMapStreamFilterAndMapToMap(t *testing.T) { + type args[KK string, VV string, K int, V int] struct { + a SimpleMapStream[K, V] + fn func(K, V) (KK, VV, bool) + } + type testCase[KK string, VV string, K int, V int] struct { + name string + args args[KK, VV, K, V] + wantR SimpleMapStream[KK, VV] + } + tests := []testCase[string, string, int, int]{ + { + name: "t1", + args: args[string, string, int, int]{ + a: NewSimpleMapStream(w), + fn: func(k, v int) (string, string, bool) { + if v > 500 { + t := strconv.Itoa(v) + return t, t, true + } + return "", "", false + }, + }, + wantR: NewSimpleMapStream(helper.SliceToMap(y[500:], func(v int) (K, T string) { + t := strconv.Itoa(v) + return t, t + }, true)), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if gotR := SimpleMapStreamFilterAndMapToMap(tt.args.a, tt.args.fn); !reflect.DeepEqual(gotR, tt.wantR) { + t.Errorf("SimpleMapStreamFilterAndMapToMap() = %v, want %v", gotR, tt.wantR) + } + }) + } +} + +func TestSimpleMapStream_ForEach(t *testing.T) { + type args[K int, V int] struct { + fn func(K, V) + } + type testCase[K int, V int] struct { + name string + r SimpleMapStream[K, V] + args args[K, V] + } + tests := []testCase[int, int]{ + { + name: "t1", + r: NewSimpleMapStream(helper.SliceToMap(y[0:10], func(v int) (int, int) { + return v, v + }, true)), + args: args[int, int]{ + fn: func(k, v int) { + fmt.Println(k, v) + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.r.ForEach(tt.args.fn) + }) + } +} + +func TestSimpleMapStream_Len(t *testing.T) { + type testCase[K int, V int] struct { + name string + r SimpleMapStream[K, V] + want int + } + tests := []testCase[int, int]{ + { + name: "t1", + r: NewSimpleMapStream(w), + want: len(w), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.r.Len(); got != tt.want { + t.Errorf("Len() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSimpleMapStream_ParallelForEach(t *testing.T) { + type args[K int, V int] struct { + fn func(K, V) + c int + } + type testCase[K int, V int] struct { + name string + r SimpleMapStream[K, V] + args args[K, V] + } + tests := []testCase[int, int]{ + { + name: "t1", + r: NewSimpleMapStream(w), + args: args[int, int]{ + func(k, v int) { + fmt.Println(k, v) + }, + 6, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.r.ParallelForEach(tt.args.fn, tt.args.c) + }) + } +} + +func TestSimpleMapStream_Result(t *testing.T) { + type testCase[K int, V int] struct { + name string + r SimpleMapStream[K, V] + want map[K]V + } + tests := []testCase[int, int]{ + { + name: "t1", + r: NewSimpleMapStream(helper.SliceToMap(y, func(v int) (int, int) { + return v, v + }, true)), + want: helper.SliceToMap(y, func(v int) (int, int) { + return v, v + }, true), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.r.Result(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Result() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/stream/simpleStream.go b/stream/simpleStream.go index c12a6be..b62d42a 100644 --- a/stream/simpleStream.go +++ b/stream/simpleStream.go @@ -7,37 +7,40 @@ import ( ) func SimpleParallelFilterAndMap[R, T any](a SimpleSliceStream[T], fn func(T) (R, bool), c int) SimpleSliceStream[R] { - p := taskPools.NewPools(c) var x []R rr := safety.NewSlice(x) - for _, t := range a.arr { - t := t - p.Execute(func() { - y, ok := fn(t) - if ok { - rr.Append(y) - } - }) - } - p.Wait() + a.ParallelForEach(func(t T) { + y, ok := fn(t) + if ok { + rr.Append(y) + } + }, c) return SimpleSliceStream[R]{rr.Load()} } +func SimpleParallelFilterAndMapToMap[K comparable, V any, T any](a SimpleSliceStream[T], fn func(t T) (K, V, bool), c int) (r SimpleMapStream[K, V]) { + m := newMapX[K, V]() + a.ParallelForEach(func(t T) { + k, v, ok := fn(t) + if ok { + m.set(k, v) + } + }, c) + var mm = map[K]V{} + r = NewSimpleMapStream(mm) + return +} + func SimpleStreamFilterAndMap[R, T any](a SimpleSliceStream[T], fn func(T) (R, bool)) SimpleSliceStream[R] { return NewSimpleSliceStream(helper.SliceFilterAndMap(a.arr, fn)) } func SimpleParallelMap[R, T any](a SimpleSliceStream[T], fn func(T) R, c int) SimpleSliceStream[R] { - p := taskPools.NewPools(c) var x []R rr := safety.NewSlice(x) - for _, t := range a.arr { - t := t - p.Execute(func() { - rr.Append(fn(t)) - }) - } - p.Wait() + a.ParallelForEach(func(t T) { + rr.Append(fn(t)) + }, c) return SimpleSliceStream[R]{rr.Load()} } func SimpleStreamMap[R, T any](a SimpleSliceStream[T], fn func(T) R) SimpleSliceStream[R] { @@ -74,18 +77,12 @@ func (r SimpleSliceStream[T]) ParallelForEach(fn func(T), c int) { } func (r SimpleSliceStream[T]) ParallelFilter(fn func(T) bool, c int) SimpleSliceStream[T] { - p := taskPools.NewPools(c) - var x []T - rr := safety.NewSlice(x) - for _, t := range r.arr { - t := t - p.Execute(func() { - if fn(t) { - rr.Append(t) - } - }) - } - p.Wait() + rr := safety.NewSlice([]T{}) + r.ParallelForEach(func(t T) { + if fn(t) { + rr.Append(t) + } + }, c) return SimpleSliceStream[T]{rr.Load()} } func (r SimpleSliceStream[T]) Filter(fn func(T) bool) SimpleSliceStream[T] { @@ -94,16 +91,10 @@ func (r SimpleSliceStream[T]) Filter(fn func(T) bool) SimpleSliceStream[T] { } func (r SimpleSliceStream[T]) ParallelMap(fn func(T) T, c int) SimpleSliceStream[T] { - p := taskPools.NewPools(c) - var x []T - rr := safety.NewSlice(x) - for _, t := range r.arr { - t := t - p.Execute(func() { - rr.Append(fn(t)) - }) - } - p.Wait() + rr := safety.NewSlice([]T{}) + r.ParallelForEach(func(t T) { + rr.Append(fn(t)) + }, c) return SimpleSliceStream[T]{rr.Load()} } diff --git a/stream/simpleStream_test.go b/stream/simpleStream_test.go index 5030433..e761a92 100644 --- a/stream/simpleStream_test.go +++ b/stream/simpleStream_test.go @@ -522,3 +522,41 @@ func TestSimpleSliceStream_Len(t *testing.T) { }) } } + +func TestSimpleParallelFilterAndMapToMap(t *testing.T) { + type args[T int, K int, V int] struct { + a SimpleSliceStream[V] + fn func(t T) (K, V, bool) + c int + } + type testCase[T int, K int, V int] struct { + name string + args args[T, K, V] + wantR SimpleMapStream[K, V] + } + tests := []testCase[int, int, int]{ + { + name: "t1", + args: args[int, int, int]{ + a: NewSimpleSliceStream(x), + fn: func(v int) (int, int, bool) { + if v >= 50000 { + return v, v, true + } + return 0, 0, false + }, + c: 6, + }, + wantR: NewSimpleMapStream(helper.SliceToMap(x[50000:], func(t int) (int, int) { + return t, t + }, true)), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if gotR := SimpleParallelFilterAndMapToMap(tt.args.a, tt.args.fn, tt.args.c); !reflect.DeepEqual(gotR, tt.wantR) { + t.Errorf("SimpleParallelFilterAndMapToMap() = %v, want %v", gotR, tt.wantR) + } + }) + } +}