forked from rafalmnich/streams
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstream.go
More file actions
126 lines (99 loc) · 3.15 KB
/
stream.go
File metadata and controls
126 lines (99 loc) · 3.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package streams
// StreamBuilder represents a stream builder.
type StreamBuilder struct {
tp *TopologyBuilder
}
// NewStreamBuilder creates a new StreamBuilder.
func NewStreamBuilder() *StreamBuilder {
return &StreamBuilder{
tp: NewTopologyBuilder(),
}
}
// Source adds a Source to the stream, returning the Stream.
func (sb *StreamBuilder) Source(name string, source Source) *Stream {
n := sb.tp.AddSource(name, source)
return newStream(sb.tp, []Node{n})
}
// Build builds the stream Topology.
func (sb *StreamBuilder) Build() (*Topology, []error) {
return sb.tp.Build()
}
// Stream represents a stream of data.
type Stream struct {
tp *TopologyBuilder
parents []Node
}
func newStream(tp *TopologyBuilder, parents []Node) *Stream {
return &Stream{
tp: tp,
parents: parents,
}
}
// Filter filters the stream using a predicate.
func (s *Stream) Filter(name string, pred Predicate) *Stream {
p := NewFilterProcessor(pred)
n := s.tp.AddProcessor(name, p, s.parents)
return newStream(s.tp, []Node{n})
}
// FilterFunc filters the stream using a predicate.
func (s *Stream) FilterFunc(name string, pred PredicateFunc) *Stream {
return s.Filter(name, pred)
}
// Branch branches a stream based on the given predicates.
func (s *Stream) Branch(name string, preds ...Predicate) []*Stream {
p := NewBranchProcessor(preds)
n := s.tp.AddProcessor(name, p, s.parents)
streams := make([]*Stream, 0, len(preds))
for range preds {
streams = append(streams, newStream(s.tp, []Node{n}))
}
return streams
}
// BranchFunc branches a stream based on the given predicates.
func (s *Stream) BranchFunc(name string, preds ...PredicateFunc) []*Stream {
ps := make([]Predicate, len(preds))
for i, fn := range preds {
ps[i] = fn
}
return s.Branch(name, ps...)
}
// Map runs a mapper on the stream.
func (s *Stream) Map(name string, mapper Mapper) *Stream {
p := NewMapProcessor(mapper)
n := s.tp.AddProcessor(name, p, s.parents)
return newStream(s.tp, []Node{n})
}
// MapFunc runs a mapper on the stream.
func (s *Stream) MapFunc(name string, mapper MapperFunc) *Stream {
return s.Map(name, mapper)
}
// FlatMap runs a flat mapper on the stream.
func (s *Stream) FlatMap(name string, mapper FlatMapper) *Stream {
p := NewFlatMapProcessor(mapper)
n := s.tp.AddProcessor(name, p, s.parents)
return newStream(s.tp, []Node{n})
}
// FlatMapFunc runs a flat mapper on the stream.
func (s *Stream) FlatMapFunc(name string, mapper FlatMapperFunc) *Stream {
return s.FlatMap(name, mapper)
}
// Merge merges one or more streams into this stream.
func (s *Stream) Merge(name string, streams ...*Stream) *Stream {
parents := []Node{}
parents = append(parents, s.parents...)
for _, stream := range streams {
parents = append(parents, stream.parents...)
}
p := NewMergeProcessor()
n := s.tp.AddProcessor(name, p, parents)
return newStream(s.tp, []Node{n})
}
// Print prints the data in the stream.
func (s *Stream) Print(name string) *Stream {
return s.Process(name, NewPrintProcessor())
}
// Process runs a custom processor on the stream.
func (s *Stream) Process(name string, p Processor) *Stream {
n := s.tp.AddProcessor(name, p, s.parents)
return newStream(s.tp, []Node{n})
}