1
2
3
4
5 package cache
6
7 import (
8 "bufio"
9 "cmd/go/internal/base"
10 "cmd/internal/quoted"
11 "context"
12 "crypto/sha256"
13 "encoding/base64"
14 "encoding/json"
15 "errors"
16 "fmt"
17 "io"
18 "log"
19 "os"
20 "os/exec"
21 "sync"
22 "sync/atomic"
23 "time"
24 )
25
26
27
28
29
30
31 type ProgCache struct {
32 cmd *exec.Cmd
33 stdout io.ReadCloser
34 stdin io.WriteCloser
35 bw *bufio.Writer
36 jenc *json.Encoder
37
38
39
40 can map[ProgCmd]bool
41
42
43
44
45
46
47
48 fuzzDirCache Cache
49
50 closing atomic.Bool
51 ctx context.Context
52 ctxCancel context.CancelFunc
53 readLoopDone chan struct{}
54
55 mu sync.Mutex
56 nextID int64
57 inFlight map[int64]chan<- *ProgResponse
58 outputFile map[OutputID]string
59
60
61
62 writeMu sync.Mutex
63 }
64
65
66
67
68
69 type ProgCmd string
70
71 const (
72 cmdGet = ProgCmd("get")
73 cmdPut = ProgCmd("put")
74 cmdClose = ProgCmd("close")
75 )
76
77
78
79
80
81 type ProgRequest struct {
82
83
84 ID int64
85
86
87
88
89 Command ProgCmd
90
91
92 ActionID []byte `json:",omitempty"`
93
94
95 ObjectID []byte `json:",omitempty"`
96
97
98
99
100
101
102
103 Body io.Reader `json:"-"`
104
105
106 BodySize int64 `json:",omitempty"`
107 }
108
109
110
111
112
113
114
115
116
117 type ProgResponse struct {
118 ID int64
119 Err string `json:",omitempty"`
120
121
122
123
124
125
126
127
128 KnownCommands []ProgCmd `json:",omitempty"`
129
130
131
132 Miss bool `json:",omitempty"`
133 OutputID []byte `json:",omitempty"`
134 Size int64 `json:",omitempty"`
135 Time *time.Time `json:",omitempty"`
136
137
138
139
140 DiskPath string `json:",omitempty"`
141 }
142
143
144
145
146
147
148 func startCacheProg(progAndArgs string, fuzzDirCache Cache) Cache {
149 if fuzzDirCache == nil {
150 panic("missing fuzzDirCache")
151 }
152 args, err := quoted.Split(progAndArgs)
153 if err != nil {
154 base.Fatalf("GOCACHEPROG args: %v", err)
155 }
156 var prog string
157 if len(args) > 0 {
158 prog = args[0]
159 args = args[1:]
160 }
161
162 ctx, ctxCancel := context.WithCancel(context.Background())
163
164 cmd := exec.CommandContext(ctx, prog, args...)
165 out, err := cmd.StdoutPipe()
166 if err != nil {
167 base.Fatalf("StdoutPipe to GOCACHEPROG: %v", err)
168 }
169 in, err := cmd.StdinPipe()
170 if err != nil {
171 base.Fatalf("StdinPipe to GOCACHEPROG: %v", err)
172 }
173 cmd.Stderr = os.Stderr
174 cmd.Cancel = in.Close
175
176 if err := cmd.Start(); err != nil {
177 base.Fatalf("error starting GOCACHEPROG program %q: %v", prog, err)
178 }
179
180 pc := &ProgCache{
181 ctx: ctx,
182 ctxCancel: ctxCancel,
183 fuzzDirCache: fuzzDirCache,
184 cmd: cmd,
185 stdout: out,
186 stdin: in,
187 bw: bufio.NewWriter(in),
188 inFlight: make(map[int64]chan<- *ProgResponse),
189 outputFile: make(map[OutputID]string),
190 readLoopDone: make(chan struct{}),
191 }
192
193
194
195 capResc := make(chan *ProgResponse, 1)
196 pc.inFlight[0] = capResc
197
198 pc.jenc = json.NewEncoder(pc.bw)
199 go pc.readLoop(pc.readLoopDone)
200
201
202
203 timer := time.NewTicker(5 * time.Second)
204 defer timer.Stop()
205 for {
206 select {
207 case <-timer.C:
208 log.Printf("# still waiting for GOCACHEPROG %v ...", prog)
209 case capRes := <-capResc:
210 can := map[ProgCmd]bool{}
211 for _, cmd := range capRes.KnownCommands {
212 can[cmd] = true
213 }
214 if len(can) == 0 {
215 base.Fatalf("GOCACHEPROG %v declared no supported commands", prog)
216 }
217 pc.can = can
218 return pc
219 }
220 }
221 }
222
223 func (c *ProgCache) readLoop(readLoopDone chan<- struct{}) {
224 defer close(readLoopDone)
225 jd := json.NewDecoder(c.stdout)
226 for {
227 res := new(ProgResponse)
228 if err := jd.Decode(res); err != nil {
229 if c.closing.Load() {
230 return
231 }
232 if err == io.EOF {
233 c.mu.Lock()
234 inFlight := len(c.inFlight)
235 c.mu.Unlock()
236 base.Fatalf("GOCACHEPROG exited pre-Close with %v pending requests", inFlight)
237 }
238 base.Fatalf("error reading JSON from GOCACHEPROG: %v", err)
239 }
240 c.mu.Lock()
241 ch, ok := c.inFlight[res.ID]
242 delete(c.inFlight, res.ID)
243 c.mu.Unlock()
244 if ok {
245 ch <- res
246 } else {
247 base.Fatalf("GOCACHEPROG sent response for unknown request ID %v", res.ID)
248 }
249 }
250 }
251
252 func (c *ProgCache) send(ctx context.Context, req *ProgRequest) (*ProgResponse, error) {
253 resc := make(chan *ProgResponse, 1)
254 if err := c.writeToChild(req, resc); err != nil {
255 return nil, err
256 }
257 select {
258 case res := <-resc:
259 if res.Err != "" {
260 return nil, errors.New(res.Err)
261 }
262 return res, nil
263 case <-ctx.Done():
264 return nil, ctx.Err()
265 }
266 }
267
268 func (c *ProgCache) writeToChild(req *ProgRequest, resc chan<- *ProgResponse) (err error) {
269 c.mu.Lock()
270 c.nextID++
271 req.ID = c.nextID
272 c.inFlight[req.ID] = resc
273 c.mu.Unlock()
274
275 defer func() {
276 if err != nil {
277 c.mu.Lock()
278 delete(c.inFlight, req.ID)
279 c.mu.Unlock()
280 }
281 }()
282
283 c.writeMu.Lock()
284 defer c.writeMu.Unlock()
285
286 if err := c.jenc.Encode(req); err != nil {
287 return err
288 }
289 if err := c.bw.WriteByte('\n'); err != nil {
290 return err
291 }
292 if req.Body != nil && req.BodySize > 0 {
293 if err := c.bw.WriteByte('"'); err != nil {
294 return err
295 }
296 e := base64.NewEncoder(base64.StdEncoding, c.bw)
297 wrote, err := io.Copy(e, req.Body)
298 if err != nil {
299 return err
300 }
301 if err := e.Close(); err != nil {
302 return nil
303 }
304 if wrote != req.BodySize {
305 return fmt.Errorf("short write writing body to GOCACHEPROG for action %x, object %x: wrote %v; expected %v",
306 req.ActionID, req.ObjectID, wrote, req.BodySize)
307 }
308 if _, err := c.bw.WriteString("\"\n"); err != nil {
309 return err
310 }
311 }
312 if err := c.bw.Flush(); err != nil {
313 return err
314 }
315 return nil
316 }
317
318 func (c *ProgCache) Get(a ActionID) (Entry, error) {
319 if !c.can[cmdGet] {
320
321
322
323
324
325
326
327 return Entry{}, &entryNotFoundError{}
328 }
329 res, err := c.send(c.ctx, &ProgRequest{
330 Command: cmdGet,
331 ActionID: a[:],
332 })
333 if err != nil {
334 return Entry{}, err
335 }
336 if res.Miss {
337 return Entry{}, &entryNotFoundError{}
338 }
339 e := Entry{
340 Size: res.Size,
341 }
342 if res.Time != nil {
343 e.Time = *res.Time
344 } else {
345 e.Time = time.Now()
346 }
347 if res.DiskPath == "" {
348 return Entry{}, &entryNotFoundError{errors.New("GOCACHEPROG didn't populate DiskPath on get hit")}
349 }
350 if copy(e.OutputID[:], res.OutputID) != len(res.OutputID) {
351 return Entry{}, &entryNotFoundError{errors.New("incomplete ProgResponse OutputID")}
352 }
353 c.noteOutputFile(e.OutputID, res.DiskPath)
354 return e, nil
355 }
356
357 func (c *ProgCache) noteOutputFile(o OutputID, diskPath string) {
358 c.mu.Lock()
359 defer c.mu.Unlock()
360 c.outputFile[o] = diskPath
361 }
362
363 func (c *ProgCache) OutputFile(o OutputID) string {
364 c.mu.Lock()
365 defer c.mu.Unlock()
366 return c.outputFile[o]
367 }
368
369 func (c *ProgCache) Put(a ActionID, file io.ReadSeeker) (_ OutputID, size int64, _ error) {
370
371 h := sha256.New()
372 if _, err := file.Seek(0, 0); err != nil {
373 return OutputID{}, 0, err
374 }
375 size, err := io.Copy(h, file)
376 if err != nil {
377 return OutputID{}, 0, err
378 }
379 var out OutputID
380 h.Sum(out[:0])
381
382 if _, err := file.Seek(0, 0); err != nil {
383 return OutputID{}, 0, err
384 }
385
386 if !c.can[cmdPut] {
387
388 return out, size, nil
389 }
390
391 res, err := c.send(c.ctx, &ProgRequest{
392 Command: cmdPut,
393 ActionID: a[:],
394 ObjectID: out[:],
395 Body: file,
396 BodySize: size,
397 })
398 if err != nil {
399 return OutputID{}, 0, err
400 }
401 if res.DiskPath == "" {
402 return OutputID{}, 0, errors.New("GOCACHEPROG didn't return DiskPath in put response")
403 }
404 c.noteOutputFile(out, res.DiskPath)
405 return out, size, err
406 }
407
408 func (c *ProgCache) Close() error {
409 c.closing.Store(true)
410 var err error
411
412
413
414
415 if c.can[cmdClose] {
416 _, err = c.send(c.ctx, &ProgRequest{Command: cmdClose})
417 }
418 c.ctxCancel()
419 <-c.readLoopDone
420 return err
421 }
422
423 func (c *ProgCache) FuzzDir() string {
424
425
426 return c.fuzzDirCache.FuzzDir()
427 }
428
View as plain text