1// Copyright 2022 The Bazel Authors. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package respipe 16 17import ( 18 "context" 19 "errors" 20 "testing" 21 22 rdpb "src/tools/ak/res/proto/res_data_go_proto" 23 "src/tools/ak/res/res" 24) 25 26func TestMergePathInfoStreams(t *testing.T) { 27 ctx, cancel := context.WithCancel(context.Background()) 28 defer cancel() 29 sendClose := func(p *res.PathInfo, c chan<- *res.PathInfo) { 30 defer close(c) 31 c <- p 32 } 33 in1 := make(chan *res.PathInfo) 34 in2 := make(chan *res.PathInfo) 35 go sendClose(&res.PathInfo{}, in1) 36 go sendClose(&res.PathInfo{}, in2) 37 mergedC := MergePathInfoStreams(ctx, []<-chan *res.PathInfo{in1, in2}) 38 var rcv []*res.PathInfo 39 for p := range mergedC { 40 rcv = append(rcv, p) 41 } 42 if len(rcv) != 2 { 43 t.Errorf("got: %v on merged stream, wanted only 2 elements", rcv) 44 } 45} 46 47func TestMergeResStreams(t *testing.T) { 48 ctx := context.Background() 49 sendClose := func(r *rdpb.Resource, c chan<- *rdpb.Resource) { 50 defer close(c) 51 c <- r 52 } 53 in1 := make(chan *rdpb.Resource) 54 in2 := make(chan *rdpb.Resource) 55 go sendClose(&rdpb.Resource{}, in1) 56 go sendClose(&rdpb.Resource{}, in2) 57 merged := MergeResStreams(ctx, []<-chan *rdpb.Resource{in1, in2}) 58 var rcv []*rdpb.Resource 59 for r := range merged { 60 rcv = append(rcv, r) 61 } 62 if len(rcv) != 2 { 63 t.Errorf("got: %v on merged stream, wanted only 2 elements", rcv) 64 } 65} 66 67func TestMergeErrStreams(t *testing.T) { 68 ctx := context.Background() 69 sendClose := func(e error, eC chan<- error) { 70 defer close(eC) 71 eC <- e 72 } 73 in1 := make(chan error) 74 in2 := make(chan error) 75 go sendClose(errors.New("hi"), in1) 76 go sendClose(errors.New("hello"), in2) 77 merged := MergeErrStreams(ctx, []<-chan error{in1, in2}) 78 var rcv []error 79 for r := range merged { 80 rcv = append(rcv, r) 81 } 82 if len(rcv) != 2 { 83 t.Errorf("got: %v on merged stream, wanted only 2 elements", rcv) 84 } 85} 86