-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathcarstream.go
More file actions
101 lines (89 loc) · 3 KB
/
carstream.go
File metadata and controls
101 lines (89 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package frisbii
import (
"bytes"
"context"
"fmt"
"io"
"sync"
// codecs we care about
_ "github.com/ipld/go-ipld-prime/codec/cbor"
_ "github.com/ipld/go-ipld-prime/codec/dagcbor"
_ "github.com/ipld/go-ipld-prime/codec/dagjson"
_ "github.com/ipld/go-ipld-prime/codec/json"
_ "github.com/ipld/go-ipld-prime/codec/raw"
"github.com/ipld/go-trustless-utils/traversal"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/storage"
"github.com/ipld/go-car/v2/storage/deferred"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
trustlessutils "github.com/ipld/go-trustless-utils"
)
var (
// ProbeCID is the special identity CID used for probing the gateway
// bafkqaaa is an identity CID with empty content
ProbeCID = cid.MustParse("bafkqaaa")
// probeCarBytes is the pre-generated CAR response for the probe CID.
// Since this is always the same, we generate it once and reuse it.
probeCarBytes []byte
probeCarOnce sync.Once
)
// getProbeCarBytes generates or returns the cached probe CAR response
func getProbeCarBytes() []byte {
probeCarOnce.Do(func() {
var buf bytes.Buffer
// Create a simple CAR v1 with the probe CID as root
carWriter, err := storage.NewWritable(&buf, []cid.Cid{ProbeCID}, car.WriteAsCarV1(true))
if err != nil {
// This should never happen with valid inputs
panic(fmt.Sprintf("failed to create probe CAR writer: %v", err))
}
// Identity CIDs are not stored by default (no StoreIdentityCIDs option),
// so we just finalize to get a CAR with only the header.
// The spec says identity block MAY be skipped in the data section.
if err = carWriter.Finalize(); err != nil {
panic(fmt.Sprintf("failed to finalize probe CAR: %v", err))
}
probeCarBytes = buf.Bytes()
})
return probeCarBytes
}
// StreamCar streams a DAG in CARv1 format to the given writer, using the given
// selector.
func StreamCar(
ctx context.Context,
requestLsys linking.LinkSystem,
out io.Writer,
request trustlessutils.Request,
) error {
carWriter := deferred.NewDeferredCarWriterForStream(out, []cid.Cid{request.Root}, car.AllowDuplicatePuts(request.Duplicates))
requestLsys.StorageReadOpener = carPipe(requestLsys.StorageReadOpener, carWriter)
cfg := traversal.Config{Root: request.Root, Selector: request.Selector()}
lastPath, err := cfg.Traverse(ctx, requestLsys, nil)
if err != nil {
return err
}
if err := traversal.CheckPath(datamodel.ParsePath(request.Path), lastPath); err != nil {
logger.Warn(err)
}
return nil
}
func carPipe(orig linking.BlockReadOpener, car *deferred.DeferredCarWriter) linking.BlockReadOpener {
return func(lc linking.LinkContext, lnk datamodel.Link) (io.Reader, error) {
r, err := orig(lc, lnk)
if err != nil {
return nil, err
}
byts, err := io.ReadAll(r)
if err != nil {
return nil, err
}
err = car.Put(lc.Ctx, lnk.(cidlink.Link).Cid.KeyString(), byts)
if err != nil {
return nil, err
}
return bytes.NewReader(byts), nil
}
}