-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcodec.go
More file actions
142 lines (127 loc) · 3.06 KB
/
codec.go
File metadata and controls
142 lines (127 loc) · 3.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package psycho
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"strconv"
"strings"
)
type ServerCodec struct {
reader *bufio.Reader
writer io.Writer
}
func NewServerCodec(reader io.Reader, writer io.Writer) *ServerCodec {
c := &ServerCodec{
reader: bufio.NewReader(reader),
writer: writer,
}
return c
}
func (c *ServerCodec) HandleInfo(info map[string]interface{}) {
m, err := json.Marshal(info)
if err != nil {
c.writer.Write([]byte(`INFO {"error": "error marshalling info"}\n`))
return
}
fmt.Fprintf(c.writer, "INFO %s\n", m)
}
func (c *ServerCodec) HandleMsg(subject string, payload []byte) {
buf := bytes.NewBuffer(make([]byte, 0, len(payload)+len(subject)+16))
fmt.Fprintf(buf, "MSG %s %d\n", subject, len(payload))
buf.Write(payload)
buf.WriteRune('\n')
c.writer.Write(buf.Bytes())
}
func (c *ServerCodec) ServeClientOpsTo(server Server) {
for {
op, subject, payload, err := c.readMsg()
if err != nil {
if err == io.EOF {
return
}
fmt.Fprintf(c.writer, "-ERR %q\n", err.Error())
continue
}
switch op {
case pub:
server.Pub(subject, payload)
case sub:
server.Sub(subject)
case unsub:
server.Unsub(subject)
}
c.writer.Write([]byte("+OK\n"))
}
}
type opname string
const (
pub opname = "PUB"
sub opname = "SUB"
unsub opname = "UNSUB"
)
func (c *ServerCodec) readLine() ([]byte, error) {
line, err := c.reader.ReadBytes('\n')
if err != nil {
return nil, err
}
line = line[:len(line)-1]
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
return line, nil
}
func (c *ServerCodec) readMsg() (opname, string, []byte, error) {
line, err := c.readLine()
if err != nil {
return "", "", nil, err
}
if len(line) == 0 {
return "", "", nil, errors.New("empty line")
}
tokens := strings.Split(string(line), " ")
switch opname(tokens[0]) {
case pub:
if len(tokens) != 3 {
return "", "", nil, fmt.Errorf("PUB op expects exactly 2 arguments, found %d", len(tokens)-1)
}
payload, err := readPayload(c.reader, tokens[2])
return pub, tokens[1], payload, err
case sub:
if len(tokens) != 2 {
return "", "", nil, fmt.Errorf("SUB op expects exactly 1 argument, found %d", len(tokens)-1)
}
return sub, tokens[1], nil, nil
case unsub:
if len(tokens) != 2 {
return "", "", nil, fmt.Errorf("UNSUB op expects exactly 1 argument, found %d", len(tokens)-1)
}
return unsub, tokens[1], nil, nil
default:
return "", "", nil, errors.New("unknown op name")
}
}
func readPayload(reader *bufio.Reader, nbytes string) ([]byte, error) {
n, err := strconv.ParseUint(nbytes, 10, 64)
if err != nil {
return nil, errors.New("parsing number of bytes")
}
var payload = make([]byte, n)
nread, err := reader.Read(payload)
if nread < int(n) {
return nil, fmt.Errorf("expected to read %d bytes, but only read %d", n, nread)
}
if err != nil {
return nil, err
}
delim, err := reader.ReadString('\n')
if !(delim == "\n" || delim == "\r\n") {
return nil, fmt.Errorf("payload did not end with a new line")
}
if err != nil {
return nil, err
}
return payload, nil
}