mirror of
https://github.com/navidrome/navidrome.git
synced 2025-08-13 14:01:14 +03:00
Implemented EnableTranscodingCancellation configuration option to control whether FFmpeg transcoding processes can be interrupted when client requests are cancelled. This addresses resource management issues on low-power hardware where transcoding processes would accumulate and cause CPU spikes. Key changes: - Added EnableTranscodingCancellation bool to configuration (default: false) - Added CLI flag --enabletranscodingcancellation and TOML/env support - Modified FFmpeg package to always use exec.CommandContext for consistency - Implemented conditional context handling in NewTranscodingCache function - When enabled: uses request context directly (allows cancellation) - When disabled: uses background context with request metadata preserved - Added comprehensive tests for both FFmpeg and transcoding layers - Maintained backward compatibility with existing behavior as default The implementation follows proper layered architecture with policy decisions at the media streaming layer and execution utilities remaining focused on their core responsibilities. Signed-off-by: Deluan <deluan@navidrome.org>
228 lines
7.0 KiB
Go
228 lines
7.0 KiB
Go
package core
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"mime"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/navidrome/navidrome/conf"
|
|
"github.com/navidrome/navidrome/consts"
|
|
"github.com/navidrome/navidrome/core/ffmpeg"
|
|
"github.com/navidrome/navidrome/log"
|
|
"github.com/navidrome/navidrome/model"
|
|
"github.com/navidrome/navidrome/model/request"
|
|
"github.com/navidrome/navidrome/utils/cache"
|
|
)
|
|
|
|
type MediaStreamer interface {
|
|
NewStream(ctx context.Context, id string, reqFormat string, reqBitRate int, offset int) (*Stream, error)
|
|
DoStream(ctx context.Context, mf *model.MediaFile, reqFormat string, reqBitRate int, reqOffset int) (*Stream, error)
|
|
}
|
|
|
|
type TranscodingCache cache.FileCache
|
|
|
|
func NewMediaStreamer(ds model.DataStore, t ffmpeg.FFmpeg, cache TranscodingCache) MediaStreamer {
|
|
return &mediaStreamer{ds: ds, transcoder: t, cache: cache}
|
|
}
|
|
|
|
type mediaStreamer struct {
|
|
ds model.DataStore
|
|
transcoder ffmpeg.FFmpeg
|
|
cache cache.FileCache
|
|
}
|
|
|
|
type streamJob struct {
|
|
ms *mediaStreamer
|
|
mf *model.MediaFile
|
|
filePath string
|
|
format string
|
|
bitRate int
|
|
offset int
|
|
}
|
|
|
|
func (j *streamJob) Key() string {
|
|
return fmt.Sprintf("%s.%s.%d.%s.%d", j.mf.ID, j.mf.UpdatedAt.Format(time.RFC3339Nano), j.bitRate, j.format, j.offset)
|
|
}
|
|
|
|
func (ms *mediaStreamer) NewStream(ctx context.Context, id string, reqFormat string, reqBitRate int, reqOffset int) (*Stream, error) {
|
|
mf, err := ms.ds.MediaFile(ctx).Get(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return ms.DoStream(ctx, mf, reqFormat, reqBitRate, reqOffset)
|
|
}
|
|
|
|
func (ms *mediaStreamer) DoStream(ctx context.Context, mf *model.MediaFile, reqFormat string, reqBitRate int, reqOffset int) (*Stream, error) {
|
|
var format string
|
|
var bitRate int
|
|
var cached bool
|
|
defer func() {
|
|
log.Info(ctx, "Streaming file", "title", mf.Title, "artist", mf.Artist, "format", format, "cached", cached,
|
|
"bitRate", bitRate, "user", userName(ctx), "transcoding", format != "raw",
|
|
"originalFormat", mf.Suffix, "originalBitRate", mf.BitRate)
|
|
}()
|
|
|
|
format, bitRate = selectTranscodingOptions(ctx, ms.ds, mf, reqFormat, reqBitRate)
|
|
s := &Stream{ctx: ctx, mf: mf, format: format, bitRate: bitRate}
|
|
filePath := mf.AbsolutePath()
|
|
|
|
if format == "raw" {
|
|
log.Debug(ctx, "Streaming RAW file", "id", mf.ID, "path", filePath,
|
|
"requestBitrate", reqBitRate, "requestFormat", reqFormat, "requestOffset", reqOffset,
|
|
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix,
|
|
"selectedBitrate", bitRate, "selectedFormat", format)
|
|
f, err := os.Open(filePath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.ReadCloser = f
|
|
s.Seeker = f
|
|
s.format = mf.Suffix
|
|
return s, nil
|
|
}
|
|
|
|
job := &streamJob{
|
|
ms: ms,
|
|
mf: mf,
|
|
filePath: filePath,
|
|
format: format,
|
|
bitRate: bitRate,
|
|
offset: reqOffset,
|
|
}
|
|
r, err := ms.cache.Get(ctx, job)
|
|
if err != nil {
|
|
log.Error(ctx, "Error accessing transcoding cache", "id", mf.ID, err)
|
|
return nil, err
|
|
}
|
|
cached = r.Cached
|
|
|
|
s.ReadCloser = r
|
|
s.Seeker = r.Seeker
|
|
|
|
log.Debug(ctx, "Streaming TRANSCODED file", "id", mf.ID, "path", filePath,
|
|
"requestBitrate", reqBitRate, "requestFormat", reqFormat, "requestOffset", reqOffset,
|
|
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix,
|
|
"selectedBitrate", bitRate, "selectedFormat", format, "cached", cached, "seekable", s.Seekable())
|
|
|
|
return s, nil
|
|
}
|
|
|
|
type Stream struct {
|
|
ctx context.Context
|
|
mf *model.MediaFile
|
|
bitRate int
|
|
format string
|
|
io.ReadCloser
|
|
io.Seeker
|
|
}
|
|
|
|
func (s *Stream) Seekable() bool { return s.Seeker != nil }
|
|
func (s *Stream) Duration() float32 { return s.mf.Duration }
|
|
func (s *Stream) ContentType() string { return mime.TypeByExtension("." + s.format) }
|
|
func (s *Stream) Name() string { return s.mf.Title + "." + s.format }
|
|
func (s *Stream) ModTime() time.Time { return s.mf.UpdatedAt }
|
|
func (s *Stream) EstimatedContentLength() int {
|
|
return int(s.mf.Duration * float32(s.bitRate) / 8 * 1024)
|
|
}
|
|
|
|
// TODO This function deserves some love (refactoring)
|
|
func selectTranscodingOptions(ctx context.Context, ds model.DataStore, mf *model.MediaFile, reqFormat string, reqBitRate int) (format string, bitRate int) {
|
|
format = "raw"
|
|
if reqFormat == "raw" {
|
|
return format, 0
|
|
}
|
|
if reqFormat == mf.Suffix && reqBitRate == 0 {
|
|
bitRate = mf.BitRate
|
|
return format, bitRate
|
|
}
|
|
trc, hasDefault := request.TranscodingFrom(ctx)
|
|
var cFormat string
|
|
var cBitRate int
|
|
if reqFormat != "" {
|
|
cFormat = reqFormat
|
|
} else {
|
|
if hasDefault {
|
|
cFormat = trc.TargetFormat
|
|
cBitRate = trc.DefaultBitRate
|
|
if p, ok := request.PlayerFrom(ctx); ok {
|
|
cBitRate = p.MaxBitRate
|
|
}
|
|
} else if reqBitRate > 0 && reqBitRate < mf.BitRate && conf.Server.DefaultDownsamplingFormat != "" {
|
|
// If no format is specified and no transcoding associated to the player, but a bitrate is specified,
|
|
// and there is no transcoding set for the player, we use the default downsampling format.
|
|
// But only if the requested bitRate is lower than the original bitRate.
|
|
log.Debug("Default Downsampling", "Using default downsampling format", conf.Server.DefaultDownsamplingFormat)
|
|
cFormat = conf.Server.DefaultDownsamplingFormat
|
|
}
|
|
}
|
|
if reqBitRate > 0 {
|
|
cBitRate = reqBitRate
|
|
}
|
|
if cBitRate == 0 && cFormat == "" {
|
|
return format, bitRate
|
|
}
|
|
t, err := ds.Transcoding(ctx).FindByFormat(cFormat)
|
|
if err == nil {
|
|
format = t.TargetFormat
|
|
if cBitRate != 0 {
|
|
bitRate = cBitRate
|
|
} else {
|
|
bitRate = t.DefaultBitRate
|
|
}
|
|
}
|
|
if format == mf.Suffix && bitRate >= mf.BitRate {
|
|
format = "raw"
|
|
bitRate = 0
|
|
}
|
|
return format, bitRate
|
|
}
|
|
|
|
var (
|
|
onceTranscodingCache sync.Once
|
|
instanceTranscodingCache TranscodingCache
|
|
)
|
|
|
|
func GetTranscodingCache() TranscodingCache {
|
|
onceTranscodingCache.Do(func() {
|
|
instanceTranscodingCache = NewTranscodingCache()
|
|
})
|
|
return instanceTranscodingCache
|
|
}
|
|
|
|
func NewTranscodingCache() TranscodingCache {
|
|
return cache.NewFileCache("Transcoding", conf.Server.TranscodingCacheSize,
|
|
consts.TranscodingCacheDir, consts.DefaultTranscodingCacheMaxItems,
|
|
func(ctx context.Context, arg cache.Item) (io.Reader, error) {
|
|
job := arg.(*streamJob)
|
|
t, err := job.ms.ds.Transcoding(ctx).FindByFormat(job.format)
|
|
if err != nil {
|
|
log.Error(ctx, "Error loading transcoding command", "format", job.format, err)
|
|
return nil, os.ErrInvalid
|
|
}
|
|
|
|
// Choose the appropriate context based on EnableTranscodingCancellation configuration.
|
|
// This is where we decide whether transcoding processes should be cancellable or not.
|
|
var transcodingCtx context.Context
|
|
if conf.Server.EnableTranscodingCancellation {
|
|
// Use the request context directly, allowing cancellation when client disconnects
|
|
transcodingCtx = ctx
|
|
} else {
|
|
// Use background context with request values preserved.
|
|
// This prevents cancellation but maintains request metadata (user, client, etc.)
|
|
transcodingCtx = request.AddValues(context.Background(), ctx)
|
|
}
|
|
|
|
out, err := job.ms.transcoder.Transcode(transcodingCtx, t.Command, job.filePath, job.bitRate, job.offset)
|
|
if err != nil {
|
|
log.Error(ctx, "Error starting transcoder", "id", job.mf.ID, err)
|
|
return nil, os.ErrInvalid
|
|
}
|
|
return out, nil
|
|
})
|
|
}
|