Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 40 additions & 68 deletions manager/ebpf_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,10 @@ func (self *EBPFManager) EidMonitored() []events.ID {
self.mu.Lock()
defer self.mu.Unlock()

return self._EidMonitored()
return self.eidMonitored()
}

func (self *EBPFManager) _EidMonitored() []events.ID {

func (self *EBPFManager) eidMonitored() []events.ID {
eid_monitored := make(map[events.ID]bool)
for _, listener := range self.listeners {
for _, eid := range listener.GetEIDs() {
Expand All @@ -106,42 +105,6 @@ func (self *EBPFManager) _EidMonitored() []events.ID {
return res
}

func (self *EBPFManager) Stats() (res Stats) {
self.mu.Lock()
defer self.mu.Unlock()

if self.currently_loading {
res.EBFProgramStatus = "Currently Loading"

} else if self.collection == nil {
res.EBFProgramStatus = "Unloaded"

} else {
res.EBFProgramStatus = "Loaded"
}

res.NumberOfListeners = len(self.listeners)
res.IdleTime = time.Now().Sub(self.idle_time)
res.IdleUnloadTimeout = self.idle_unload_time

for _, listener := range self.listeners {
eid_monitored := make(map[string]int)

for _, k := range listener.GetEIDs() {
desc, pres := CoreEvents[k]
if !pres {
continue
}

eid_monitored[desc.GetName()] = int(k)
}
res.EIDMonitored = append(res.EIDMonitored, eid_monitored)
res.EventCount += listener.GetCount()
res.PrefilterEventCount += listener.GetPrefilterEvents()
}
return res
}

// Read all events from the queue and forward to all listeners.
func (self *EBPFManager) EventLoop(ctx context.Context) {
self.mu.Lock()
Expand Down Expand Up @@ -177,17 +140,16 @@ func (self *EBPFManager) EventLoop(ctx context.Context) {
continue
}

// Make a local copy of interested listeners.
self.mu.Lock()
listeners := self.listeners
self.mu.Unlock()

var interested_listeners []*listener
for _, l := range listeners {
for _, l := range self.listeners {
if !l.Prefilter(record.RawSample) {
continue
}
interested_listeners = append(interested_listeners, l)
}
self.mu.Unlock()

// No listeners - dont bother about it.
if len(interested_listeners) == 0 {
Expand All @@ -200,7 +162,9 @@ func (self *EBPFManager) EventLoop(ctx context.Context) {
}

for _, listener := range interested_listeners {
listener.Feed(eid, event)
if listener.IsTriggered(event.MatchedPolicies) {
listener.Feed(eid, event)
}
}
}
}
Expand Down Expand Up @@ -228,7 +192,7 @@ func (self *EBPFManager) startHousekeeping(ctx context.Context) {

func (self *EBPFManager) getRequiredKsyms() (res []string) {
tmp := make(map[string]bool)
for _, eid := range self._EidMonitored() {
for _, eid := range self.eidMonitored() {
definition, pres := CoreEvents[eid]
if !pres {
continue
Expand All @@ -251,7 +215,7 @@ func (self *EBPFManager) getRequiredKsyms() (res []string) {

func (self *EBPFManager) getProbeHandles() (res []probes.Handle) {
tmp := make(map[probes.Handle]bool)
for _, eid := range self._EidMonitored() {
for _, eid := range self.eidMonitored() {
definition, pres := CoreEvents[eid]
if !pres {
continue
Expand All @@ -274,7 +238,7 @@ func (self *EBPFManager) getProbeHandles() (res []probes.Handle) {
}

func (self *EBPFManager) setTailCalls() error {
for _, eid := range self._EidMonitored() {
for _, eid := range self.eidMonitored() {
err := self.setTailCall(eid, tailCallsAdd)
if err != nil {
return err
Expand Down Expand Up @@ -458,8 +422,9 @@ func (self *EBPFManager) updateEbpfState() (err error) {
func (self *EBPFManager) compilePolicies() error {
var policies []k8s.PolicyInterface

for _, p := range self.listeners {
for idx, p := range self.listeners {
policies = append(policies, p.Policy())
p.SetPolicyId(idx)
}

scope_map, event_map, err := flags.PrepareFilterMapsFromPolicies(policies)
Expand Down Expand Up @@ -492,6 +457,26 @@ func (self *EBPFManager) compilePolicies() error {
return err
}

func (self *EBPFManager) removeListener(new_listener *listener) {
new_listener.Close()

var new_listeners []*listener

for _, d := range self.listeners {
if d == new_listener {
continue
}
new_listeners = append(new_listeners, d)
}

self.listeners = new_listeners

// We are now idle.
if len(new_listeners) == 0 {
self.idle_time = time.Now()
}
}

func (self *EBPFManager) Watch(
ctx context.Context, opts EBPFWatchOptions) (
chan *ordereddict.Dict, func(), error) {
Expand All @@ -513,6 +498,8 @@ func (self *EBPFManager) Watch(
new_listener := NewListner(
self.logger, self.dnscache, ctx, self.ctx, events, p)
new_listener.SetPrefilter(opts.Prefilter)

// Add the new listener to the list.
self.listeners = append(self.listeners, new_listener)

// If the program is not already loaded, start it.
Expand All @@ -524,42 +511,27 @@ func (self *EBPFManager) Watch(
}
}

// Update the policies map
err = self.compilePolicies()
if err != nil {
self.removeListener(new_listener)
return nil, nil, err
}

// Update the ebpf state to reflect the new listene
// Update the ebpf state to reflect the new listeners
err = self.updateEbpfState()
if err != nil {
self.listeners = nil
self.removeListener(new_listener)
return nil, nil, err
}

return new_listener.output_chan,

// Remove the output chan from the listeners.
func() {
self.mu.Lock()
defer self.mu.Unlock()

new_listener.Close()

var new_listeners []*listener

for _, d := range self.listeners {
if d == new_listener {
continue
}
new_listeners = append(new_listeners, d)
}

self.listeners = new_listeners

// We are now idle.
if len(new_listeners) == 0 {
self.idle_time = time.Now()
}
self.removeListener(new_listener)
}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions manager/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type eventType struct {

System *ordereddict.Dict
tevent *trace.Event

// A bitmap of the policies that matched this event.
MatchedPolicies uint64
}

func (self *EBPFManager) decodeEvent(dataRaw []byte) (*eventType, events.ID, error) {
Expand Down Expand Up @@ -98,6 +101,7 @@ func (self *EBPFManager) decodeEvent(dataRaw []byte) (*eventType, events.ID, err
ReturnValue: int(eCtx.Retval),
Args: args,
},
MatchedPolicies: eCtx.MatchedPolicies,
}, eCtx.EventID, nil
}

Expand Down
24 changes: 24 additions & 0 deletions manager/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type listener struct {
prefilter func(buf []byte) bool

policy k8s.PolicyInterface

policy_id int
}

func (self *listener) Policy() k8s.PolicyInterface {
Expand Down Expand Up @@ -273,6 +275,28 @@ func (self *listener) feed(
}
}

func (self *listener) IsTriggered(policy_mask uint64) bool {
self.mu.Lock()
defer self.mu.Unlock()

my_policy_mask := uint64(1) << self.policy_id
return (policy_mask & my_policy_mask) > 0
}

func (self *listener) SetPolicyId(policy_idx int) {
self.mu.Lock()
defer self.mu.Unlock()

self.policy_id = policy_idx
}

func (self *listener) GetPolicyId() int {
self.mu.Lock()
defer self.mu.Unlock()

return self.policy_id
}

func (self *listener) GetCount() int {
self.mu.Lock()
defer self.mu.Unlock()
Expand Down
8 changes: 0 additions & 8 deletions manager/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@ type EBPFWatchOptions struct {
// applied in the kernel so they are essential for reducing CPU
// use.
//
// NOTE: Currently all watchers that watch the same event id will
// receive all events - regardless if their specific policy
// applies or if another listener selected the same event with
// another policy. Therefore events should be post-filtered again.
//
// Therefore, Policy filters should be considered as a performance
// optimization only.
//
// For example:
//
// metadata:
Expand Down
17 changes: 17 additions & 0 deletions manager/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package manager
import (
"encoding/json"
"errors"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/Velocidex/tracee_velociraptor/userspace/cmd/flags"
"github.com/Velocidex/tracee_velociraptor/userspace/events"
Expand All @@ -12,6 +15,15 @@ import (
"gopkg.in/yaml.v2"
)

var (
idx uint64 = uint64(time.Now().UnixNano())
)

// Get unique ID
func GetId() uint64 {
return atomic.AddUint64(&idx, 1)
}

func PolicyFromString(in string) (res k8s.PolicyInterface, err error) {
in = strings.TrimSpace(in)
if len(in) == 0 {
Expand All @@ -35,6 +47,11 @@ func PolicyFromString(in string) (res k8s.PolicyInterface, err error) {
p.APIVersion = "tracee.aquasec.com/v1beta1"
p.Kind = "Policy"

// If the policy does not have a name, create a random name.
if p.Metadata.Name == "" {
p.Metadata.Name = fmt.Sprintf("Policy_%d", GetId())
}

err = p.Validate()
if err != nil {
return p, err
Expand Down
65 changes: 62 additions & 3 deletions manager/stats.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
package manager

import "time"
import (
"time"

k8s "github.com/Velocidex/tracee_velociraptor/userspace/k8s/apis/tracee.aquasec.com/v1beta1"
)

type ListenerStats struct {
PolicyID int
Policy k8s.PolicyInterface
EIDMonitored []string

// Events fed to this listener.
EventCount int
}

type Stats struct {
NumberOfListeners int
EIDMonitored []map[string]int
Listeners []ListenerStats

IdleTime time.Duration
IdleUnloadTimeout time.Duration
Expand All @@ -17,3 +29,50 @@ type Stats struct {
// Total events parsed
EventCount int
}

func (self *listener) Stats() ListenerStats {
self.mu.Lock()
defer self.mu.Unlock()

res := ListenerStats{
PolicyID: self.policy_id,
Policy: self.policy,
EventCount: self.count,
}

for k := range self.eid_monitored {
desc, pres := CoreEvents[k]
if !pres {
continue
}

res.EIDMonitored = append(res.EIDMonitored, desc.GetName())
}

return res
}

func (self *EBPFManager) Stats() (res Stats) {
self.mu.Lock()
defer self.mu.Unlock()

if self.currently_loading {
res.EBFProgramStatus = "Currently Loading"

} else if self.collection == nil {
res.EBFProgramStatus = "Unloaded"

} else {
res.EBFProgramStatus = "Loaded"
}

res.IdleTime = time.Now().Sub(self.idle_time)
res.IdleUnloadTimeout = self.idle_unload_time

for _, listener := range self.listeners {
res.Listeners = append(res.Listeners, listener.Stats())
res.EventCount += listener.GetCount()
res.PrefilterEventCount += listener.GetPrefilterEvents()
}
return res
}
Loading