-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathmain.py
More file actions
135 lines (114 loc) · 3.81 KB
/
Copy pathmain.py
File metadata and controls
135 lines (114 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env -S uv run
# /// script
# requires-python = ">=3.12"
# dependencies = ["matchlock"]
# ///
"""VFS interception hooks example.
Usage:
uv run examples/python/vfs_hooks/main.py
"""
from __future__ import annotations
import logging
import time
from matchlock import (
VFS_HOOK_ACTION_BLOCK,
VFS_HOOK_OP_CREATE,
VFS_HOOK_OP_WRITE,
VFS_HOOK_PHASE_AFTER,
VFS_HOOK_PHASE_BEFORE,
Client,
Sandbox,
VFSActionRequest,
VFSHookEvent,
VFSHookRule,
VFSInterceptionConfig,
VFSMutateRequest,
)
logging.basicConfig(format="%(levelname)s %(message)s", level=logging.INFO)
log = logging.getLogger(__name__)
def after_write_hook(event: VFSHookEvent) -> None:
log.info(
"after hook op=%s path=%s size=%d mode=%o uid=%d gid=%d",
event.op,
event.path,
event.size,
event.mode,
event.uid,
event.gid,
)
def mutate_write_hook(req: VFSMutateRequest) -> bytes:
log.info(
"mutate hook path=%s size=%d mode=%o uid=%d gid=%d",
req.path,
req.size,
req.mode,
req.uid,
req.gid,
)
return (
f"mutated-by-hook size={req.size} "
f"mode={oct(req.mode)} uid={req.uid} gid={req.gid}"
).encode("utf-8")
def block_action_hook(req: VFSActionRequest) -> str:
_ = req
return VFS_HOOK_ACTION_BLOCK
def main() -> None:
sandbox = Sandbox("alpine:latest").with_workspace("/workspace").mount_memory("/workspace").with_vfs_interception(
VFSInterceptionConfig(
rules=[
VFSHookRule(
name="host-block-create",
phase=VFS_HOOK_PHASE_BEFORE,
ops=[VFS_HOOK_OP_CREATE],
path="/workspace/blocked-create.txt",
action=VFS_HOOK_ACTION_BLOCK,
),
VFSHookRule(
name="sdk-block-write",
phase=VFS_HOOK_PHASE_BEFORE,
ops=[VFS_HOOK_OP_WRITE],
path="/workspace/blocked-write.txt",
action_hook=block_action_hook,
),
VFSHookRule(
name="mutate-write",
phase=VFS_HOOK_PHASE_BEFORE,
ops=[VFS_HOOK_OP_WRITE],
path="/workspace/mutated.txt",
mutate_hook=mutate_write_hook,
),
VFSHookRule(
name="audit-after-write",
phase=VFS_HOOK_PHASE_AFTER,
ops=[VFS_HOOK_OP_WRITE],
path="/workspace/*",
timeout_ms=2000,
hook=after_write_hook,
),
],
)
)
with Client() as client:
vm_id = client.launch(sandbox)
log.info("sandbox ready vm=%s", vm_id)
try:
client.write_file("/workspace/blocked-create.txt", "blocked")
log.warning("host create block unexpectedly succeeded")
except Exception as exc: # noqa: BLE001
log.info("host create block rejected as expected: %s", exc)
try:
client.write_file("/workspace/blocked-write.txt", "blocked")
log.warning("local write block unexpectedly succeeded")
except Exception as exc: # noqa: BLE001
log.info("local write block rejected as expected: %s", exc)
client.write_file("/workspace/mutated.txt", "original-content", mode=0o640)
mutated = client.read_file("/workspace/mutated.txt").decode(
"utf-8", errors="replace"
)
print(f"mutated file content: {mutated.strip()!r}")
client.write_file("/workspace/trigger.txt", "trigger", mode=0o600)
time.sleep(0.4)
client.close()
client.remove()
if __name__ == "__main__":
main()