diff --git a/stream/simpleMapStream_test.go b/stream/simpleMapStream_test.go index efaf9c8..5f3bc76 100644 --- a/stream/simpleMapStream_test.go +++ b/stream/simpleMapStream_test.go @@ -68,7 +68,7 @@ func TestSimpleMapFilterAndMapToSlice(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := SimpleMapFilterAndMapToSlice(tt.args.mm, tt.args.fn, tt.args.c).Sort(func(i, j int) bool { + if got := SimpleMapFilterAndMapToSlice(tt.args.mm, tt.args.fn).Sort(func(i, j int) bool { return i < j }); !reflect.DeepEqual(got, tt.want) { t.Errorf("SimpleMapFilterAndMapToSlice() = %v, want %v", got, tt.want) diff --git a/stream/stream.go b/stream/stream.go index 24ef73f..0fb30c7 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -50,14 +50,6 @@ func FilterAndMapNewStream[R, T any](a Stream[T], fn func(T) (R, bool)) Stream[R return NewStream(slice.FilterAndMap(a.arr, fn)) } -func ParallelMap[R, T any](a Stream[T], fn func(T) R, c int) Stream[R] { - var x []R - rr := safety.NewSlice(x) - a.ParallelForEach(func(t T) { - rr.Append(fn(t)) - }, c) - return Stream[R]{rr.Load()} -} func MapNewStream[R, T any](a Stream[T], fn func(T) R) Stream[R] { return NewStream(slice.Map(a.arr, fn)) } @@ -91,31 +83,24 @@ func (r Stream[T]) ParallelForEach(fn func(T), c int) { p.Wait() } -func (r Stream[T]) ParallelFilter(fn func(T) bool, c int) Stream[T] { +func (r Stream[T]) ParallelFilterAndMap(fn func(T) (T, bool), c int) Stream[T] { rr := safety.NewSlice([]T{}) r.ParallelForEach(func(t T) { - if fn(t) { - rr.Append(t) + v, ok := fn(t) + if ok { + rr.Append(v) } }, c) return Stream[T]{rr.Load()} } -func (r Stream[T]) Filter(fn func(T) bool) Stream[T] { - r.arr = slice.Filter(r.arr, fn) + +func (r Stream[T]) FilterAndMap(fn func(T) (T, bool)) Stream[T] { + r.arr = slice.FilterAndMap(r.arr, fn) return r } -func (r Stream[T]) ParallelMap(fn func(T) T, c int) Stream[T] { - rr := safety.NewSlice([]T{}) - r.ParallelForEach(func(t T) { - rr.Append(fn(t)) - }, c) - return Stream[T]{rr.Load()} -} - -func (r Stream[T]) Map(fn func(T) T) Stream[T] { - r.arr = slice.Map(r.arr, fn) - return r +func (r Stream[T]) Reduce(fn func(v, r T) T, init T) T { + return slice.Reduce[T, T](r.arr, fn, init) } func (r Stream[T]) Sort(fn func(i, j T) bool) Stream[T] { diff --git a/stream/stream_test.go b/stream/stream_test.go index 954d7a6..b14fbd7 100644 --- a/stream/stream_test.go +++ b/stream/stream_test.go @@ -11,40 +11,6 @@ import ( var s = NewStream(number.Range(1, 10, 1)) -func TestSimpleSliceStream_Filter(t *testing.T) { - type args[T int] struct { - fn func(T) bool - } - type testCase[T int] struct { - name string - r Stream[T] - args args[T] - want Stream[T] - } - tests := []testCase[int]{ - { - name: "t1", - r: s, - args: args[int]{ - func(t int) (r bool) { - if t > 5 { - r = true - } - return - }, - }, - want: Stream[int]{number.Range(6, 10, 1)}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := tt.r.Filter(tt.args.fn); !reflect.DeepEqual(got, tt.want) { - t.Errorf("Filter() = %v, want %v", got, tt.want) - } - }) - } -} - func TestSimpleSliceStream_ForEach(t *testing.T) { type args[T int] struct { fn func(T) @@ -121,37 +87,6 @@ func TestSimpleSliceStream_Limit(t *testing.T) { } } -func TestSimpleSliceStream_Map(t *testing.T) { - type args[T int] struct { - fn func(T) T - } - type testCase[T int] struct { - name string - r Stream[T] - args args[T] - want Stream[T] - } - tests := []testCase[int]{ - { - name: "t1", - r: s, - args: args[int]{ - func(t int) (r int) { - return t * 2 - }, - }, - want: Stream[int]{number.Range(2, 20, 2)}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := tt.r.Map(tt.args.fn); !reflect.DeepEqual(got, tt.want) { - t.Errorf("MapNewStream() = %v, want %v", got, tt.want) - } - }) - } -} - func TestSimpleSliceStream_Result(t *testing.T) { type testCase[T int] struct { name string @@ -234,76 +169,6 @@ func TestSimpleSliceStream_parallelForEach(t *testing.T) { } } -func TestSimpleSliceStream_ParallelFilter(t *testing.T) { - type args[T int] struct { - fn func(T) bool - c int - } - type testCase[T int] struct { - name string - r Stream[T] - args args[T] - want Stream[T] - } - tests := []testCase[int]{ - { - name: "t1", - r: s, - args: args[int]{ - fn: func(t int) bool { - return t > 3 - }, - c: 6, - }, - want: Stream[int]{number.Range(4, 10, 1)}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := tt.r.ParallelFilter(tt.args.fn, tt.args.c).Sort(func(i, j int) bool { - return i < j - }); !reflect.DeepEqual(got, tt.want) { - t.Errorf("ParallelFilter() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestSimpleSliceStream_ParallelMap(t *testing.T) { - type args[T int] struct { - fn func(T) T - c int - } - type testCase[T int] struct { - name string - r Stream[T] - args args[T] - want Stream[T] - } - tests := []testCase[int]{ - { - name: "t1", - r: s, - args: args[int]{ - fn: func(t int) int { - return t * 2 - }, - c: 6, - }, - want: Stream[int]{number.Range(2, 20, 2)}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := tt.r.ParallelMap(tt.args.fn, tt.args.c).Sort(func(i, j int) bool { - return i < j - }); !reflect.DeepEqual(got, tt.want) { - t.Errorf("ParallelMap() = %v, want %v", got, tt.want) - } - }) - } -} - func TestReduce(t *testing.T) { type args[S, T int] struct { s Stream[S] @@ -391,43 +256,6 @@ func TestSimpleStreamMap(t *testing.T) { } } -func TestSimpleParallelMap(t *testing.T) { - type args[T string, R int] struct { - a Stream[string] - fn func(T) R - c int - } - type testCase[T string, R int] struct { - name string - args args[T, R] - want Stream[R] - } - - tests := []testCase[string, int]{ - { - name: "t1", - args: args[string, int]{ - a: NewStream(slice.Map(x, strconv.Itoa)), - fn: func(s string) int { - i, _ := strconv.Atoi(s) - return i - }, - c: 6, - }, - want: NewStream(x), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := ParallelMap(tt.args.a, tt.args.fn, tt.args.c).Sort(func(i, j int) bool { - return i < j - }); !reflect.DeepEqual(got, tt.want) { - t.Errorf("ParallelMap() = %v, want %v", got, tt.want) - } - }) - } -} - func TestSimpleParallelFilterAndMap(t *testing.T) { type args[T string, R int] struct { a Stream[string] @@ -598,3 +426,80 @@ func TestSimpleSliceFilterAndMapToMap(t *testing.T) { }) } } + +func TestStream_ParallelFilterAndMap(t *testing.T) { + type xy struct { + args int + res string + } + type args[T any] struct { + fn func(T) (T, bool) + c int + } + type testCase[T xy] struct { + name string + r Stream[T] + args args[T] + want Stream[T] + } + tests := []testCase[xy]{ + { + name: "t1", + r: NewStream(slice.Map(number.Range(1, 10, 1), func(t int) xy { + return xy{args: t} + })), + args: args[xy]{func(v xy) (xy, bool) { + v.res = strconv.Itoa(v.args) + return v, true + }, 6}, + want: NewStream(slice.Map(number.Range(1, 10, 1), func(t int) xy { + return xy{args: t, res: strconv.Itoa(t)} + })), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.r.ParallelFilterAndMap(tt.args.fn, tt.args.c).Sort(func(i, j xy) bool { + return i.args < j.args + }); !reflect.DeepEqual(got, tt.want) { + t.Errorf("ParallelFilterAndMap() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestStream_Reduce(t *testing.T) { + type aa struct { + args int + res int + } + type args[T any] struct { + fn func(v, r T) T + init T + } + type testCase[T any] struct { + name string + r Stream[T] + args args[T] + want T + } + tests := []testCase[aa]{ + { + name: "t1", + r: NewStream(slice.Map(number.Range(1, 10, 1), func(t int) aa { + return aa{args: t} + })), + args: args[aa]{func(v, r aa) aa { + return aa{res: v.args + r.res} + }, aa{}}, + want: aa{res: 55}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.r.Reduce(tt.args.fn, tt.args.init); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Reduce() = %v, want %v", got, tt.want) + } + }) + } +}