xref: /aosp_15_r20/external/bazelbuild-rules_android/src/tools/ak/res/respipe/res_io.go (revision 9e965d6fece27a77de5377433c2f7e6999b8cc0b)
1// Copyright 2022 The Bazel Authors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//    http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15
16package respipe
17
18import (
19	"bufio"
20	"encoding/binary"
21	"io"
22
23	"context"
24	rdpb "src/tools/ak/res/proto/res_data_go_proto"
25	"google.golang.org/protobuf/proto"
26)
27
28// ResInput sends all protos in the provided reader into the pipeline.
29type ResInput struct {
30	In io.Reader
31}
32
33// Produce returns a channel of resource protos encountered in the input along with a chan of errors encountered while decoding them.
34func (ri ResInput) Produce(ctx context.Context) (<-chan *rdpb.Resource, <-chan error) {
35	resC := make(chan *rdpb.Resource)
36	errC := make(chan error)
37	go func() {
38		defer close(resC)
39		defer close(errC)
40		r := bufio.NewReaderSize(ri.In, 2<<16)
41		var b [4]byte
42		for {
43			if _, err := io.ReadFull(r, b[:]); err != nil {
44				if err != io.EOF {
45					SendErr(ctx, errC, Errorf(ctx, "read len failed: %v", err))
46				}
47				return
48
49			}
50			dlen := binary.LittleEndian.Uint32(b[:])
51			d := make([]byte, dlen)
52			if _, err := io.ReadFull(r, d); err != nil {
53				SendErr(ctx, errC, Errorf(ctx, "read proto failed: %v", err))
54				return
55			}
56			r := &rdpb.Resource{}
57			if err := proto.Unmarshal(d, r); err != nil {
58				SendErr(ctx, errC, Errorf(ctx, "unmarshal proto failed: %v", err))
59				return
60			}
61			if !SendRes(ctx, resC, r) {
62				return
63			}
64
65		}
66
67	}()
68	return resC, errC
69}
70
71// ResOutput is a sink to a resource pipeline that writes all resource protos it encounters to the given writer.
72type ResOutput struct {
73	Out io.Writer
74}
75
76// Consume takes all resource protos from the provided channel and writes them to ResOutput's writer.
77func (ro ResOutput) Consume(ctx context.Context, resChan <-chan *rdpb.Resource) <-chan error {
78
79	errC := make(chan error)
80	go func() {
81		defer close(errC)
82
83		w := bufio.NewWriterSize(ro.Out, 2<<16)
84		defer func() {
85			if err := w.Flush(); err != nil {
86				SendErr(ctx, errC, Errorf(ctx, "flush end of data failed: %v", err))
87			}
88		}()
89		var b [4]byte
90		for r := range resChan {
91			d, err := proto.Marshal(r)
92			if err != nil {
93				SendErr(ctx, errC, Errorf(ctx, "%#v encoding failed: %v", r, err))
94				return
95			}
96			binary.LittleEndian.PutUint32(b[:], uint32(len(d)))
97			if _, err := w.Write(b[:]); err != nil {
98				SendErr(ctx, errC, Errorf(ctx, "write failed: %v", err))
99				return
100			}
101			if _, err := w.Write(d); err != nil {
102				SendErr(ctx, errC, Errorf(ctx, "write failed: %v", err))
103				return
104			}
105		}
106	}()
107
108	return errC
109}
110