Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ ANONYMOUS_LOGIN=true
# MAX_CONCURRENT_RECORDINGS=5
# MAX_RECORDING_HOURS=10
# MAX_RECOVERY_ATTEMPTS=5
# MAX_RECORDING_FILE_SIZE_BYTES=0
# OUTPUT_DIR=./records
# SECRET_DIR=./secrets
# DATABASE_DIR=./database
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ ENV ANONYMOUS_LOGIN=false \
MAX_CONCURRENT_RECORDINGS=3 \
MAX_RECORDING_HOURS=5 \
MAX_RECOVERY_ATTEMPTS=5 \
MAX_RECORDING_FILE_SIZE_BYTES=0 \
OUTPUT_DIR=records \
DATABASE_DIR=database \
CONVERT_FLV_TO_MP4=false \
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ docker run -d --name bilirec -p 8080:8080 eric1008818/bilirec:latest
| `MAX_CONCURRENT_RECORDINGS` | 最大同时录制数 | `3` |
| `MAX_RECORDING_HOURS` | 单次录制最长时间(小时) | `5` |
| `MAX_RECOVERY_ATTEMPTS` | 单次录制的最大重连尝试次数 | `5` |
| `MAX_RECORDING_FILE_SIZE_BYTES` | 单个录制文件最大大小(字节,`0` 表示禁用) | `0` |
| `OUTPUT_DIR` | 录制文件保存目录 | `records` |
| `SECRET_DIR` | Cookie 和 Token 保存目录 | `secrets` |
| `CONVERT_FLV_TO_MP4` | 在下载时是否将 FLV 转为 MP4 | `false` |
Expand Down Expand Up @@ -103,6 +104,7 @@ export PORT=8080
export MAX_CONCURRENT_RECORDINGS=5
export MAX_RECORDING_HOURS=10
export MAX_RECOVERY_ATTEMPTS=5
export MAX_RECORDING_FILE_SIZE_BYTES=0
export OUTPUT_DIR=/path/to/records
export SECRET_DIR=/path/to/secrets
export DATABASE_DIR=/path/to/database
Expand Down
48 changes: 25 additions & 23 deletions internal/modules/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ type Config struct {
AnonymousLogin bool
Port string

MaxConcurrentRecordings int
MaxRecordingHours int
MaxRecoveryAttempts int
MaxConcurrentRecordings int
MaxRecordingHours int
MaxRecoveryAttempts int
MaxRecordingFileSizeBytes int64

OutputDir string
SecretDir string
Expand Down Expand Up @@ -82,26 +83,27 @@ func provider() (*Config, error) {
}

c := &Config{
AnonymousLogin: os.Getenv("ANONYMOUS_LOGIN") == "true",
Port: utils.EmptyOrElse(os.Getenv("PORT"), "8080"),
MaxConcurrentRecordings: utils.MustAtoi(utils.EmptyOrElse(os.Getenv("MAX_CONCURRENT_RECORDINGS"), "3")),
MaxRecordingHours: utils.MustAtoi(utils.EmptyOrElse(os.Getenv("MAX_RECORDING_HOURS"), "5")),
MaxRecoveryAttempts: utils.MustAtoi(utils.EmptyOrElse(os.Getenv("MAX_RECOVERY_ATTEMPTS"), "5")),
OutputDir: utils.EmptyOrElse(os.Getenv("OUTPUT_DIR"), "records"),
SecretDir: utils.EmptyOrElse(os.Getenv("SECRET_DIR"), "secrets"),
DatabaseDir: utils.EmptyOrElse(os.Getenv("DATABASE_DIR"), "database"),
CloudConvertThreshold: utils.MustAtoi64(utils.EmptyOrElse(os.Getenv("CLOUDCONVERT_THRESHOLD"), "1073741824")), // 1 GB
CloudConvertApiKey: os.Getenv("CLOUDCONVERT_API_KEY"), // empty to disable
ConvertFLVToMp4: os.Getenv("CONVERT_FLV_TO_MP4") == "true",
DeleteFlvAfterConvert: os.Getenv("DELETE_FLV_AFTER_CONVERT") == "true",
FrontendURL: url,
BackendHost: utils.EmptyOrElse(os.Getenv("BACKEND_HOST"), "localhost:8080"),
Username: username,
PasswordHash: string(passwordHash),
JwtSecret: utils.EmptyOrElse(os.Getenv("JWT_SECRET"), "bilirec_secret"),
Debug: debug,
ProductionMode: os.Getenv("PRODUCTION_MODE") == "true",
MinDiskSpaceBytes: utils.MustAtoi64(utils.EmptyOrElse(os.Getenv("MIN_DISK_SPACE_BYTES"), "5368709120")), // 5GB
AnonymousLogin: os.Getenv("ANONYMOUS_LOGIN") == "true",
Port: utils.EmptyOrElse(os.Getenv("PORT"), "8080"),
MaxConcurrentRecordings: utils.MustAtoi(utils.EmptyOrElse(os.Getenv("MAX_CONCURRENT_RECORDINGS"), "3")),
MaxRecordingHours: utils.MustAtoi(utils.EmptyOrElse(os.Getenv("MAX_RECORDING_HOURS"), "5")),
MaxRecoveryAttempts: utils.MustAtoi(utils.EmptyOrElse(os.Getenv("MAX_RECOVERY_ATTEMPTS"), "5")),
MaxRecordingFileSizeBytes: utils.MustAtoi64(utils.EmptyOrElse(os.Getenv("MAX_RECORDING_FILE_SIZE_BYTES"), "0")),
OutputDir: utils.EmptyOrElse(os.Getenv("OUTPUT_DIR"), "records"),
SecretDir: utils.EmptyOrElse(os.Getenv("SECRET_DIR"), "secrets"),
DatabaseDir: utils.EmptyOrElse(os.Getenv("DATABASE_DIR"), "database"),
CloudConvertThreshold: utils.MustAtoi64(utils.EmptyOrElse(os.Getenv("CLOUDCONVERT_THRESHOLD"), "1073741824")), // 1 GB
CloudConvertApiKey: os.Getenv("CLOUDCONVERT_API_KEY"), // empty to disable
ConvertFLVToMp4: os.Getenv("CONVERT_FLV_TO_MP4") == "true",
DeleteFlvAfterConvert: os.Getenv("DELETE_FLV_AFTER_CONVERT") == "true",
FrontendURL: url,
BackendHost: utils.EmptyOrElse(os.Getenv("BACKEND_HOST"), "localhost:8080"),
Username: username,
PasswordHash: string(passwordHash),
JwtSecret: utils.EmptyOrElse(os.Getenv("JWT_SECRET"), "bilirec_secret"),
Debug: debug,
ProductionMode: os.Getenv("PRODUCTION_MODE") == "true",
MinDiskSpaceBytes: utils.MustAtoi64(utils.EmptyOrElse(os.Getenv("MIN_DISK_SPACE_BYTES"), "5368709120")), // 5GB

// global performance configs
uploadBufferSize: utils.MustAtoi(utils.EmptyOrElse(os.Getenv("UPLOAD_BUFFER_SIZE"), "5242880")), // default 5MB
Expand Down
145 changes: 121 additions & 24 deletions internal/services/recorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"path/filepath"
"runtime"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -31,6 +33,8 @@ type RecordStatus string
const Recording RecordStatus = "recording"
const Recovering RecordStatus = "recovering"
const Idle RecordStatus = "idle"
// maxSegmentNameCollisionAttempts bounds file name retries during segment rotation.
const maxSegmentNameCollisionAttempts = 1000

// var idlePtr *RecordStatus = utils.Ptr(Idle)
var recordingPtr *RecordStatus = utils.Ptr(Recording)
Expand All @@ -46,14 +50,28 @@ var ErrRoomEncrypted = errors.New("the room is encrypted")
var ErrInsufficientDiskSpace = errors.New("insufficient disk space")

type Recorder struct {
status atomic.Pointer[RecordStatus]
bytesRead atomic.Uint64
startTime time.Time
outputPath string
status atomic.Pointer[RecordStatus]
bytesRead atomic.Uint64
segmentBytes atomic.Uint64
startTime time.Time
outputPath string
mu sync.RWMutex

cancel context.CancelFunc
}

func (r *Recorder) GetOutputPath() string {
r.mu.RLock()
defer r.mu.RUnlock()
return r.outputPath
}

func (r *Recorder) SetOutputPath(path string) {
r.mu.Lock()
r.outputPath = path
r.mu.Unlock()
}

type Service struct {
st *stream.Service
cv *convert.Service
Expand Down Expand Up @@ -91,7 +109,7 @@ func NewService(

go s.backgroundMaintenance(ctx)
go initOutputDir(cfg)

lc.Append(fx.StopHook(cancel))
return s
}
Expand Down Expand Up @@ -197,40 +215,50 @@ func (r *Service) Stop(roomId int) bool {

func (r *Service) prepare(roomId int, ch <-chan []byte, ctx context.Context, info *Recorder) error {

pipe, err := r.newPipe(info.GetOutputPath(), ctx)
if err != nil {
return err
}

r.recording.Store(roomId, info)
r.pipes.Store(roomId, pipe)

go r.rev(roomId, ch, ctx, info, pipe)
go r.checkRecordingDurationPeriodically(roomId, ctx)
return nil
}

func (r *Service) newPipe(outputPath string, ctx context.Context) (*pipeline.Pipe[[]byte], error) {
pipe := pipeline.New(
// fix FLV stream
processors.NewFlvStreamFixer(),
// write to file with buffered writer
// flushes every 5 seconds then writes to disk
processors.NewBufferedStreamWriter(info.outputPath, config.ReadOnly.LiveStreamWriterBufferSize()),
processors.NewBufferedStreamWriter(outputPath, config.ReadOnly.LiveStreamWriterBufferSize()),
)

startCtx, startCancel := context.WithTimeout(ctx, 10*time.Second)
if err := pipe.Open(startCtx); err != nil {
startCancel()
return fmt.Errorf("cannot open pipeline: %v", err)
return nil, fmt.Errorf("cannot open pipeline: %v", err)
}
startCancel()

r.recording.Store(roomId, info)
r.pipes.Store(roomId, pipe)

go r.rev(roomId, ch, info, pipe)
go r.checkRecordingDurationPeriodically(roomId, ctx)
return nil
return pipe, nil
}

func (r *Service) rev(roomId int, ch <-chan []byte, info *Recorder, pipe *pipeline.Pipe[[]byte]) {
func (r *Service) rev(roomId int, ch <-chan []byte, ctx context.Context, info *Recorder, pipe *pipeline.Pipe[[]byte]) {
l := logger.WithField("room", roomId)
defer r.recover(roomId)
currentPipe := pipe
defer func() {
pipe.Close()
currentPipe.Close()
go r.finalize(roomId, info)
}()
for data := range ch {

info.bytesRead.Add(uint64(len(data)))
result, err := pipe.Process(r.ctx, data)
info.segmentBytes.Add(uint64(len(data)))
result, err := currentPipe.Process(r.ctx, data)
r.st.Flush(data)
r.st.Flush(result)

Expand All @@ -249,9 +277,76 @@ func (r *Service) rev(roomId int, ch <-chan []byte, info *Recorder, pipe *pipeli
}
return
}

if r.shouldRotateSegment(info) {
if err := r.rotateSegment(roomId, info, ctx, &currentPipe); err != nil {
l.Errorf("failed to rotate segment by size: %v", err)
}
}
}
}

func (r *Service) shouldRotateSegment(info *Recorder) bool {
maxSize := r.cfg.MaxRecordingFileSizeBytes
return maxSize > 0 && info.segmentBytes.Load() >= uint64(maxSize)
}

func (r *Service) nextSegmentOutputPath(currentPath string, now time.Time) string {
dir := filepath.Dir(currentPath)
ext := filepath.Ext(currentPath)
baseWithoutExt := strings.TrimSuffix(filepath.Base(currentPath), ext)

prefix := baseWithoutExt
if idx := strings.LastIndex(baseWithoutExt, "-"); idx >= 0 {
prefix = baseWithoutExt[:idx]
}

timestamp := now.Format("20060102_150405")
candidate := filepath.Join(dir, fmt.Sprintf("%s-%s%s", prefix, timestamp, ext))

if candidate != currentPath && !r.writtingFiles.Contains(filepath.Base(candidate)) {
if _, err := os.Stat(candidate); errors.Is(err, os.ErrNotExist) {
return candidate
}
}

for suffix := 1; suffix <= maxSegmentNameCollisionAttempts; suffix++ {
candidate = filepath.Join(dir, fmt.Sprintf("%s-%s-%d%s", prefix, timestamp, suffix, ext))
if r.writtingFiles.Contains(filepath.Base(candidate)) {
continue
}
if _, err := os.Stat(candidate); errors.Is(err, os.ErrNotExist) {
return candidate
}
}

fallbackPath := filepath.Join(dir, fmt.Sprintf("%s-%d%s", prefix, now.UnixNano(), ext))
logger.Warnf("segment filename collision attempts exceeded (%d), using fallback path: %s", maxSegmentNameCollisionAttempts, fallbackPath)
return fallbackPath
}

func (r *Service) rotateSegment(roomId int, info *Recorder, ctx context.Context, pipe **pipeline.Pipe[[]byte]) error {
oldPipe := *pipe
oldPath := info.GetOutputPath()
nextPath := r.nextSegmentOutputPath(oldPath, time.Now())

nextPipe, err := r.newPipe(nextPath, ctx)
if err != nil {
return err
}

r.writtingFiles.Add(filepath.Base(nextPath))
info.SetOutputPath(nextPath)
info.segmentBytes.Store(0)

r.pipes.Store(roomId, nextPipe)
*pipe = nextPipe

oldPipe.Close()
go r.finalizeOutputPath(roomId, oldPath)
return nil
}

func (r *Service) checkRecordingDurationPeriodically(roomId int, ctx context.Context) {
log := logger.WithField("room", roomId)
ticker := time.NewTicker(1 * time.Minute)
Expand Down Expand Up @@ -345,17 +440,20 @@ func (r *Service) finalize(roomId int, info *Recorder) {
logger.Warnf("skipping finalize for room %d: no recording info", roomId)
return
}
r.finalizeOutputPath(roomId, info.GetOutputPath())
}

defer r.writtingFiles.Remove(filepath.Base(info.outputPath))
func (r *Service) finalizeOutputPath(roomId int, outputPath string) {
defer r.writtingFiles.Remove(filepath.Base(outputPath))

fileInfo, err := os.Stat(info.outputPath)
fileInfo, err := os.Stat(outputPath)
if err != nil {
logger.Errorf("failed to stat recorded file for room %d: %v", roomId, err)
return
} else if fileInfo.Size() < 1024 { // less than 1KB
logger.Warnf("recorded file for room %d is too small (%d bytes), skipping finallization and removing file", roomId, fileInfo.Size())
if err := os.Remove(info.outputPath); err != nil {
logger.Errorf("failed to remove empty file %s: %v", info.outputPath, err)
if err := os.Remove(outputPath); err != nil {
logger.Errorf("failed to remove empty file %s: %v", outputPath, err)
}
return
}
Expand All @@ -366,7 +464,7 @@ func (r *Service) finalize(roomId int, info *Recorder) {
}

// process finalization via convert service
if queue, err := r.cv.Enqueue(info.outputPath, "mp4", r.cfg.DeleteFlvAfterConvert); err != nil {
if queue, err := r.cv.Enqueue(outputPath, "mp4", r.cfg.DeleteFlvAfterConvert); err != nil {
logger.Errorf("failed to enqueue conversion for room %d: %v", roomId, err)
logger.Warnf("you may need to convert mp4 manually for room: %d", roomId)
} else {
Expand Down Expand Up @@ -425,9 +523,8 @@ func (r *Service) prepareFilePath(info *bilibili.LiveRoomInfoDetail, start time.
return fmt.Sprintf("%s/%s-%s.flv", dirPath, safeTitle, start.Format("20060102_150405")), nil
}


func initOutputDir(cfg *config.Config) {
if err := os.MkdirAll(cfg.OutputDir, 0755); err != nil {
logger.Fatalf("cannot create output directory %s: %v", cfg.OutputDir, err)
}
}
}
59 changes: 59 additions & 0 deletions internal/services/recorder/rotation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package recorder

import (
"os"
"path/filepath"
"testing"
"time"

"github.com/eric2788/bilirec/internal/modules/config"
"github.com/eric2788/bilirec/pkg/ds"
)

func TestShouldRotateSegment(t *testing.T) {
s := &Service{
cfg: &config.Config{MaxRecordingFileSizeBytes: 100},
}
info := &Recorder{}

info.segmentBytes.Store(99)
if s.shouldRotateSegment(info) {
t.Fatal("expected no rotation when segment bytes below max")
}

info.segmentBytes.Store(100)
if !s.shouldRotateSegment(info) {
t.Fatal("expected rotation when segment bytes reaches max")
}

s.cfg.MaxRecordingFileSizeBytes = 0
if s.shouldRotateSegment(info) {
t.Fatal("expected no rotation when max size is disabled")
}
}

func TestNextSegmentOutputPath(t *testing.T) {
tempDir := t.TempDir()
s := &Service{
writtingFiles: ds.NewSyncedSet[string](),
}

currentPath := filepath.Join(tempDir, "title-with-dash-20260101_090000.flv")
fixed := time.Date(2026, 1, 1, 9, 30, 0, 0, time.UTC)

next := s.nextSegmentOutputPath(currentPath, fixed)
expected := filepath.Join(tempDir, "title-with-dash-20260101_093000.flv")
if next != expected {
t.Fatalf("unexpected next segment path, got %s, want %s", next, expected)
}

if err := os.WriteFile(expected, []byte("existing"), 0644); err != nil {
t.Fatalf("failed to create existing file: %v", err)
}

nextWithSuffix := s.nextSegmentOutputPath(currentPath, fixed)
expectedWithSuffix := filepath.Join(tempDir, "title-with-dash-20260101_093000-1.flv")
if nextWithSuffix != expectedWithSuffix {
t.Fatalf("unexpected collision-resolved path, got %s, want %s", nextWithSuffix, expectedWithSuffix)
}
}
Loading