Source file
test/fixedbugs/issue33355.go
1
2
3
4
5
6
7
8
9
10 package server
11
12 import (
13 "bytes"
14 "sync"
15 )
16
17 type client struct {
18 junk [4]int
19 mu sync.Mutex
20 srv *Server
21 gw *gateway
22 msgb [100]byte
23 }
24
25 type gateway struct {
26 cfg *gatewayCfg
27 outsim *sync.Map
28 }
29
30 type gatewayCfg struct {
31 replyPfx []byte
32 }
33
34 type Account struct {
35 Name string
36 }
37
38 type Server struct {
39 gateway *srvGateway
40 }
41
42 type srvGateway struct {
43 outo []*client
44 }
45
46 type subscription struct {
47 queue []byte
48 client *client
49 }
50
51 type outsie struct {
52 ni map[string]struct{}
53 sl *Sublist
54 qsubs int
55 }
56
57 type Sublist struct {
58 }
59
60 type SublistResult struct {
61 psubs []*subscription
62 qsubs [][]*subscription
63 }
64
65 var subPool = &sync.Pool{}
66
67 func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte) {
68 var gws []*client
69 gw := c.srv.gateway
70 for i := 0; i < len(gw.outo); i++ {
71 gws = append(gws, gw.outo[i])
72 }
73 var (
74 subj = string(subject)
75 queuesa = [512]byte{}
76 queues = queuesa[:0]
77 mreply []byte
78 dstPfx []byte
79 checkReply = len(reply) > 0
80 )
81
82 sub := subPool.Get().(*subscription)
83
84 if subjectStartsWithGatewayReplyPrefix(subject) {
85 dstPfx = subject[:8]
86 }
87 for i := 0; i < len(gws); i++ {
88 gwc := gws[i]
89 if dstPfx != nil {
90 gwc.mu.Lock()
91 ok := bytes.Equal(dstPfx, gwc.gw.cfg.replyPfx)
92 gwc.mu.Unlock()
93 if !ok {
94 continue
95 }
96 } else {
97 qr := gwc.gatewayInterest(acc.Name, subj)
98 queues = queuesa[:0]
99 for i := 0; i < len(qr.qsubs); i++ {
100 qsubs := qr.qsubs[i]
101 queue := qsubs[0].queue
102 add := true
103 for _, qn := range qgroups {
104 if bytes.Equal(queue, qn) {
105 add = false
106 break
107 }
108 }
109 if add {
110 qgroups = append(qgroups, queue)
111 }
112 }
113 if len(queues) == 0 {
114 continue
115 }
116 }
117 if checkReply {
118 checkReply = false
119 mreply = reply
120 }
121 mh := c.msgb[:10]
122 mh = append(mh, subject...)
123 if len(queues) > 0 {
124 mh = append(mh, mreply...)
125 mh = append(mh, queues...)
126 }
127 sub.client = gwc
128 }
129 subPool.Put(sub)
130 }
131
132 func subjectStartsWithGatewayReplyPrefix(subj []byte) bool {
133 return len(subj) > 8 && string(subj[:4]) == "foob"
134 }
135
136 func (c *client) gatewayInterest(acc, subj string) *SublistResult {
137 ei, _ := c.gw.outsim.Load(acc)
138 var r *SublistResult
139 e := ei.(*outsie)
140 r = e.sl.Match(subj)
141 return r
142 }
143
144 func (s *Sublist) Match(subject string) *SublistResult {
145 return nil
146 }
147
148
View as plain text