1
2
3
4
5
6
7
8 package main
9
10 import (
11 "context"
12 "fmt"
13 "runtime"
14 "sort"
15 "sync"
16 "time"
17 )
18
19
20
21 func _SliceEqual[Elem comparable](s1, s2 []Elem) bool {
22 if len(s1) != len(s2) {
23 return false
24 }
25 for i, v1 := range s1 {
26 v2 := s2[i]
27 if v1 != v2 {
28 isNaN := func(f Elem) bool { return f != f }
29 if !isNaN(v1) || !isNaN(v2) {
30 return false
31 }
32 }
33 }
34 return true
35 }
36
37
38
39 func _ReadAll[Elem any](ctx context.Context, c <-chan Elem) []Elem {
40 var r []Elem
41 for {
42 select {
43 case <-ctx.Done():
44 return r
45 case v, ok := <-c:
46 if !ok {
47 return r
48 }
49 r = append(r, v)
50 }
51 }
52 }
53
54
55
56
57 func _Merge[Elem any](ctx context.Context, c1, c2 <-chan Elem) <-chan Elem {
58 r := make(chan Elem)
59 go func(ctx context.Context, c1, c2 <-chan Elem, r chan<- Elem) {
60 defer close(r)
61 for c1 != nil || c2 != nil {
62 select {
63 case <-ctx.Done():
64 return
65 case v1, ok := <-c1:
66 if ok {
67 r <- v1
68 } else {
69 c1 = nil
70 }
71 case v2, ok := <-c2:
72 if ok {
73 r <- v2
74 } else {
75 c2 = nil
76 }
77 }
78 }
79 }(ctx, c1, c2, r)
80 return r
81 }
82
83
84
85
86
87 func _Filter[Elem any](ctx context.Context, c <-chan Elem, f func(Elem) bool) <-chan Elem {
88 r := make(chan Elem)
89 go func(ctx context.Context, c <-chan Elem, f func(Elem) bool, r chan<- Elem) {
90 defer close(r)
91 for {
92 select {
93 case <-ctx.Done():
94 return
95 case v, ok := <-c:
96 if !ok {
97 return
98 }
99 if f(v) {
100 r <- v
101 }
102 }
103 }
104 }(ctx, c, f, r)
105 return r
106 }
107
108
109
110
111 func _Sink[Elem any](ctx context.Context) chan<- Elem {
112 r := make(chan Elem)
113 go func(ctx context.Context, r <-chan Elem) {
114 for {
115 select {
116 case <-ctx.Done():
117 return
118 case _, ok := <-r:
119 if !ok {
120 return
121 }
122 }
123 }
124 }(ctx, r)
125 return r
126 }
127
128
129
130 type _Exclusive[Val any] struct {
131 c chan Val
132 }
133
134
135 func _MakeExclusive[Val any](initial Val) *_Exclusive[Val] {
136 r := &_Exclusive[Val]{
137 c: make(chan Val, 1),
138 }
139 r.c <- initial
140 return r
141 }
142
143
144
145 func (e *_Exclusive[Val]) Acquire() Val {
146 return <-e.c
147 }
148
149
150
151
152 func (e *_Exclusive[Val]) TryAcquire() (v Val, ok bool) {
153 select {
154 case r := <-e.c:
155 return r, true
156 default:
157 return v, false
158 }
159 }
160
161
162
163 func (e *_Exclusive[Val]) Release(v Val) {
164 select {
165 case e.c <- v:
166 default:
167 panic("_Exclusive Release without Acquire")
168 }
169 }
170
171
172
173
174
175
176
177
178
179 func _Ranger[Elem any]() (*_Sender[Elem], *_Receiver[Elem]) {
180 c := make(chan Elem)
181 d := make(chan struct{})
182 s := &_Sender[Elem]{
183 values: c,
184 done: d,
185 }
186 r := &_Receiver[Elem]{
187 values: c,
188 done: d,
189 }
190 runtime.SetFinalizer(r, (*_Receiver[Elem]).finalize)
191 return s, r
192 }
193
194
195 type _Sender[Elem any] struct {
196 values chan<- Elem
197 done <-chan struct{}
198 }
199
200
201
202
203 func (s *_Sender[Elem]) Send(ctx context.Context, v Elem) bool {
204 select {
205 case <-ctx.Done():
206 return false
207 case s.values <- v:
208 return true
209 case <-s.done:
210 return false
211 }
212 }
213
214
215
216 func (s *_Sender[Elem]) Close() {
217 close(s.values)
218 }
219
220
221 type _Receiver[Elem any] struct {
222 values <-chan Elem
223 done chan<- struct{}
224 }
225
226
227
228 func (r *_Receiver[Elem]) Next(ctx context.Context) (v Elem, ok bool) {
229 select {
230 case <-ctx.Done():
231 case v, ok = <-r.values:
232 }
233 return v, ok
234 }
235
236
237 func (r *_Receiver[Elem]) finalize() {
238 close(r.done)
239 }
240
241 func TestReadAll() {
242 c := make(chan int)
243 go func() {
244 c <- 4
245 c <- 2
246 c <- 5
247 close(c)
248 }()
249 got := _ReadAll(context.Background(), c)
250 want := []int{4, 2, 5}
251 if !_SliceEqual(got, want) {
252 panic(fmt.Sprintf("_ReadAll returned %v, want %v", got, want))
253 }
254 }
255
256 func TestMerge() {
257 c1 := make(chan int)
258 c2 := make(chan int)
259 go func() {
260 c1 <- 1
261 c1 <- 3
262 c1 <- 5
263 close(c1)
264 }()
265 go func() {
266 c2 <- 2
267 c2 <- 4
268 c2 <- 6
269 close(c2)
270 }()
271 ctx := context.Background()
272 got := _ReadAll(ctx, _Merge(ctx, c1, c2))
273 sort.Ints(got)
274 want := []int{1, 2, 3, 4, 5, 6}
275 if !_SliceEqual(got, want) {
276 panic(fmt.Sprintf("_Merge returned %v, want %v", got, want))
277 }
278 }
279
280 func TestFilter() {
281 c := make(chan int)
282 go func() {
283 c <- 1
284 c <- 2
285 c <- 3
286 close(c)
287 }()
288 even := func(i int) bool { return i%2 == 0 }
289 ctx := context.Background()
290 got := _ReadAll(ctx, _Filter(ctx, c, even))
291 want := []int{2}
292 if !_SliceEqual(got, want) {
293 panic(fmt.Sprintf("_Filter returned %v, want %v", got, want))
294 }
295 }
296
297 func TestSink() {
298 c := _Sink[int](context.Background())
299 after := time.NewTimer(time.Minute)
300 defer after.Stop()
301 send := func(v int) {
302 select {
303 case c <- v:
304 case <-after.C:
305 panic("timed out sending to _Sink")
306 }
307 }
308 send(1)
309 send(2)
310 send(3)
311 close(c)
312 }
313
314 func TestExclusive() {
315 val := 0
316 ex := _MakeExclusive(&val)
317
318 var wg sync.WaitGroup
319 f := func() {
320 defer wg.Done()
321 for i := 0; i < 10; i++ {
322 p := ex.Acquire()
323 (*p)++
324 ex.Release(p)
325 }
326 }
327
328 wg.Add(2)
329 go f()
330 go f()
331
332 wg.Wait()
333 if val != 20 {
334 panic(fmt.Sprintf("after Acquire/Release loop got %d, want 20", val))
335 }
336 }
337
338 func TestExclusiveTry() {
339 s := ""
340 ex := _MakeExclusive(&s)
341 p, ok := ex.TryAcquire()
342 if !ok {
343 panic("TryAcquire failed")
344 }
345 *p = "a"
346
347 var wg sync.WaitGroup
348 wg.Add(1)
349 go func() {
350 defer wg.Done()
351 _, ok := ex.TryAcquire()
352 if ok {
353 panic(fmt.Sprintf("TryAcquire succeeded unexpectedly"))
354 }
355 }()
356 wg.Wait()
357
358 ex.Release(p)
359
360 p, ok = ex.TryAcquire()
361 if !ok {
362 panic(fmt.Sprintf("TryAcquire failed"))
363 }
364 }
365
366 func TestRanger() {
367 s, r := _Ranger[int]()
368
369 ctx := context.Background()
370 go func() {
371
372 v, ok := r.Next(ctx)
373 if !ok {
374 panic(fmt.Sprintf("did not receive any values"))
375 } else if v != 1 {
376 panic(fmt.Sprintf("received %d, want 1", v))
377 }
378 }()
379
380 c1 := make(chan bool)
381 c2 := make(chan bool)
382 go func() {
383 defer close(c2)
384 if !s.Send(ctx, 1) {
385 panic(fmt.Sprintf("Send failed unexpectedly"))
386 }
387 close(c1)
388 if s.Send(ctx, 2) {
389 panic(fmt.Sprintf("Send succeeded unexpectedly"))
390 }
391 }()
392
393 <-c1
394
395
396 runtime.GC()
397
398 select {
399 case <-c2:
400 case <-time.After(time.Minute):
401 panic("_Ranger Send should have failed, but timed out")
402 }
403 }
404
405 func main() {
406 TestReadAll()
407 TestMerge()
408 TestFilter()
409 TestSink()
410 TestExclusive()
411 TestExclusiveTry()
412 TestRanger()
413 }
414
View as plain text