| package model |
|
|
| import ( |
| "sort" |
| "sync" |
| "time" |
|
|
| "github.com/mudler/LocalAI/pkg/xsysinfo" |
| process "github.com/mudler/go-processmanager" |
| "github.com/mudler/xlog" |
| ) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| type WatchDog struct { |
| sync.Mutex |
| busyTime map[string]time.Time |
| idleTime map[string]time.Time |
| lastUsed map[string]time.Time |
| timeout, idletimeout time.Duration |
| addressMap map[string]*process.Process |
| addressModelMap map[string]string |
| pm ProcessManager |
| stop chan bool |
|
|
| busyCheck, idleCheck bool |
| lruLimit int |
|
|
| |
| memoryReclaimerEnabled bool |
| memoryReclaimerThreshold float64 |
| watchdogInterval time.Duration |
|
|
| |
| forceEvictionWhenBusy bool |
| } |
|
|
| type ProcessManager interface { |
| ShutdownModel(modelName string) error |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| func NewWatchDog(opts ...WatchDogOption) *WatchDog { |
| o := NewWatchDogOptions(opts...) |
|
|
| return &WatchDog{ |
| timeout: o.busyTimeout, |
| idletimeout: o.idleTimeout, |
| pm: o.processManager, |
| busyTime: make(map[string]time.Time), |
| idleTime: make(map[string]time.Time), |
| lastUsed: make(map[string]time.Time), |
| addressMap: make(map[string]*process.Process), |
| busyCheck: o.busyCheck, |
| idleCheck: o.idleCheck, |
| lruLimit: o.lruLimit, |
| addressModelMap: make(map[string]string), |
| stop: make(chan bool, 1), |
| memoryReclaimerEnabled: o.memoryReclaimerEnabled, |
| memoryReclaimerThreshold: o.memoryReclaimerThreshold, |
| watchdogInterval: o.watchdogInterval, |
| forceEvictionWhenBusy: o.forceEvictionWhenBusy, |
| } |
| } |
|
|
| |
| func (wd *WatchDog) SetLRULimit(limit int) { |
| wd.Lock() |
| defer wd.Unlock() |
| wd.lruLimit = limit |
| } |
|
|
| |
| func (wd *WatchDog) GetLRULimit() int { |
| wd.Lock() |
| defer wd.Unlock() |
| return wd.lruLimit |
| } |
|
|
| |
| func (wd *WatchDog) SetMemoryReclaimer(enabled bool, threshold float64) { |
| wd.Lock() |
| defer wd.Unlock() |
| wd.memoryReclaimerEnabled = enabled |
| wd.memoryReclaimerThreshold = threshold |
| } |
|
|
| |
| func (wd *WatchDog) GetMemoryReclaimerSettings() (enabled bool, threshold float64) { |
| wd.Lock() |
| defer wd.Unlock() |
| return wd.memoryReclaimerEnabled, wd.memoryReclaimerThreshold |
| } |
|
|
| |
| func (wd *WatchDog) SetForceEvictionWhenBusy(force bool) { |
| wd.Lock() |
| defer wd.Unlock() |
| wd.forceEvictionWhenBusy = force |
| } |
|
|
| func (wd *WatchDog) Shutdown() { |
| wd.Lock() |
| defer wd.Unlock() |
| xlog.Info("[WatchDog] Shutting down watchdog") |
| wd.stop <- true |
| } |
|
|
| func (wd *WatchDog) AddAddressModelMap(address string, model string) { |
| wd.Lock() |
| defer wd.Unlock() |
| wd.addressModelMap[address] = model |
|
|
| } |
| func (wd *WatchDog) Add(address string, p *process.Process) { |
| wd.Lock() |
| defer wd.Unlock() |
| wd.addressMap[address] = p |
| } |
|
|
| func (wd *WatchDog) Mark(address string) { |
| wd.Lock() |
| defer wd.Unlock() |
| now := time.Now() |
| wd.busyTime[address] = now |
| wd.lastUsed[address] = now |
| delete(wd.idleTime, address) |
| } |
|
|
| func (wd *WatchDog) UnMark(ModelAddress string) { |
| wd.Lock() |
| defer wd.Unlock() |
| now := time.Now() |
| delete(wd.busyTime, ModelAddress) |
| wd.idleTime[ModelAddress] = now |
| wd.lastUsed[ModelAddress] = now |
| } |
|
|
| |
| |
| func (wd *WatchDog) UpdateLastUsed(address string) { |
| wd.Lock() |
| defer wd.Unlock() |
| wd.lastUsed[address] = time.Now() |
| } |
|
|
| |
| func (wd *WatchDog) GetLoadedModelCount() int { |
| wd.Lock() |
| defer wd.Unlock() |
| return len(wd.addressModelMap) |
| } |
|
|
| |
| type modelUsageInfo struct { |
| address string |
| model string |
| lastUsed time.Time |
| } |
|
|
| |
| type EnforceLRULimitResult struct { |
| EvictedCount int |
| NeedMore bool |
| } |
|
|
| |
| |
| |
| |
| func (wd *WatchDog) EnforceLRULimit(pendingLoads int) EnforceLRULimitResult { |
| if wd.lruLimit <= 0 { |
| return EnforceLRULimitResult{EvictedCount: 0, NeedMore: false} |
| } |
|
|
| wd.Lock() |
|
|
| currentCount := len(wd.addressModelMap) |
| |
| |
| |
| |
| modelsToEvict := currentCount - wd.lruLimit + pendingLoads + 1 |
| forceEvictionWhenBusy := wd.forceEvictionWhenBusy |
| if modelsToEvict <= 0 { |
| wd.Unlock() |
| return EnforceLRULimitResult{EvictedCount: 0, NeedMore: false} |
| } |
|
|
| xlog.Debug("[WatchDog] LRU enforcement triggered", "current", currentCount, "pendingLoads", pendingLoads, "limit", wd.lruLimit, "toEvict", modelsToEvict) |
|
|
| |
| var models []modelUsageInfo |
| for address, model := range wd.addressModelMap { |
| lastUsed := wd.lastUsed[address] |
| if lastUsed.IsZero() { |
| |
| lastUsed = time.Time{} |
| } |
| models = append(models, modelUsageInfo{ |
| address: address, |
| model: model, |
| lastUsed: lastUsed, |
| }) |
| } |
|
|
| |
| sort.Slice(models, func(i, j int) bool { |
| return models[i].lastUsed.Before(models[j].lastUsed) |
| }) |
|
|
| |
| var modelsToShutdown []string |
| evictedCount := 0 |
| skippedBusyCount := 0 |
| for i := 0; evictedCount < modelsToEvict && i < len(models); i++ { |
| m := models[i] |
| |
| _, isBusy := wd.busyTime[m.address] |
| if isBusy && !forceEvictionWhenBusy { |
| |
| xlog.Warn("[WatchDog] Skipping LRU eviction for busy model", "model", m.model, "reason", "model has active API calls") |
| skippedBusyCount++ |
| continue |
| } |
| xlog.Info("[WatchDog] LRU evicting model", "model", m.model, "lastUsed", m.lastUsed, "busy", isBusy) |
| modelsToShutdown = append(modelsToShutdown, m.model) |
| |
| wd.untrack(m.address) |
| evictedCount++ |
| } |
| needMore := evictedCount < modelsToEvict && skippedBusyCount > 0 |
| wd.Unlock() |
|
|
| |
| for _, model := range modelsToShutdown { |
| if err := wd.pm.ShutdownModel(model); err != nil { |
| xlog.Error("[WatchDog] error shutting down model during LRU eviction", "error", err, "model", model) |
| } |
| xlog.Debug("[WatchDog] LRU eviction complete", "model", model) |
| } |
|
|
| if needMore { |
| xlog.Warn("[WatchDog] LRU eviction incomplete", "evicted", evictedCount, "needed", modelsToEvict, "skippedBusy", skippedBusyCount, "reason", "some models are busy with active API calls") |
| } |
|
|
| return EnforceLRULimitResult{ |
| EvictedCount: len(modelsToShutdown), |
| NeedMore: needMore, |
| } |
| } |
|
|
| func (wd *WatchDog) Run() { |
| xlog.Info("[WatchDog] starting watchdog") |
|
|
| for { |
| select { |
| case <-wd.stop: |
| xlog.Info("[WatchDog] Stopping watchdog") |
| return |
| case <-time.After(wd.watchdogInterval): |
| |
| wd.Lock() |
| busyCheck := wd.busyCheck |
| idleCheck := wd.idleCheck |
| memoryCheck := wd.memoryReclaimerEnabled |
| wd.Unlock() |
|
|
| if !busyCheck && !idleCheck && !memoryCheck { |
| xlog.Info("[WatchDog] No checks enabled, stopping watchdog") |
| return |
| } |
| if busyCheck { |
| wd.checkBusy() |
| } |
| if idleCheck { |
| wd.checkIdle() |
| } |
| if memoryCheck { |
| wd.checkMemory() |
| } |
| } |
| } |
| } |
|
|
| func (wd *WatchDog) checkIdle() { |
| wd.Lock() |
| xlog.Debug("[WatchDog] Watchdog checks for idle connections") |
|
|
| |
| var modelsToShutdown []string |
| for address, t := range wd.idleTime { |
| xlog.Debug("[WatchDog] idle connection", "address", address) |
| if time.Since(t) > wd.idletimeout { |
| xlog.Warn("[WatchDog] Address is idle for too long, killing it", "address", address) |
| model, ok := wd.addressModelMap[address] |
| if ok { |
| modelsToShutdown = append(modelsToShutdown, model) |
| } else { |
| xlog.Warn("[WatchDog] Address unresolvable", "address", address) |
| } |
| wd.untrack(address) |
| } |
| } |
| wd.Unlock() |
|
|
| |
| for _, model := range modelsToShutdown { |
| if err := wd.pm.ShutdownModel(model); err != nil { |
| xlog.Error("[watchdog] error shutting down model", "error", err, "model", model) |
| } |
| xlog.Debug("[WatchDog] model shut down", "model", model) |
| } |
| } |
|
|
| func (wd *WatchDog) checkBusy() { |
| wd.Lock() |
| xlog.Debug("[WatchDog] Watchdog checks for busy connections") |
|
|
| |
| var modelsToShutdown []string |
| for address, t := range wd.busyTime { |
| xlog.Debug("[WatchDog] active connection", "address", address) |
|
|
| if time.Since(t) > wd.timeout { |
| model, ok := wd.addressModelMap[address] |
| if ok { |
| xlog.Warn("[WatchDog] Model is busy for too long, killing it", "model", model) |
| modelsToShutdown = append(modelsToShutdown, model) |
| } else { |
| xlog.Warn("[WatchDog] Address unresolvable", "address", address) |
| } |
| wd.untrack(address) |
| } |
| } |
| wd.Unlock() |
|
|
| |
| for _, model := range modelsToShutdown { |
| if err := wd.pm.ShutdownModel(model); err != nil { |
| xlog.Error("[watchdog] error shutting down model", "error", err, "model", model) |
| } |
| xlog.Debug("[WatchDog] model shut down", "model", model) |
| } |
| } |
|
|
| |
| func (wd *WatchDog) checkMemory() { |
| wd.Lock() |
| threshold := wd.memoryReclaimerThreshold |
| enabled := wd.memoryReclaimerEnabled |
| modelCount := len(wd.addressModelMap) |
| wd.Unlock() |
|
|
| if !enabled || threshold <= 0 || modelCount == 0 { |
| return |
| } |
|
|
| |
| aggregate := xsysinfo.GetResourceAggregateInfo() |
| if aggregate.TotalMemory == 0 { |
| xlog.Debug("[WatchDog] No memory information available for memory reclaimer") |
| return |
| } |
|
|
| |
| thresholdPercent := threshold * 100 |
|
|
| memoryType := "GPU" |
| if aggregate.GPUCount == 0 { |
| memoryType = "RAM" |
| } |
|
|
| xlog.Debug("[WatchDog] Memory check", "type", memoryType, "usage_percent", aggregate.UsagePercent, "threshold_percent", thresholdPercent, "loaded_models", modelCount) |
|
|
| |
| if aggregate.UsagePercent > thresholdPercent { |
| xlog.Warn("[WatchDog] Memory usage exceeds threshold, evicting LRU backend", "type", memoryType, "usage_percent", aggregate.UsagePercent, "threshold_percent", thresholdPercent) |
|
|
| |
| wd.evictLRUModel() |
| } |
| } |
|
|
| |
| func (wd *WatchDog) evictLRUModel() { |
| wd.Lock() |
|
|
| if len(wd.addressModelMap) == 0 { |
| wd.Unlock() |
| return |
| } |
|
|
| forceEvictionWhenBusy := wd.forceEvictionWhenBusy |
|
|
| |
| var models []modelUsageInfo |
| for address, model := range wd.addressModelMap { |
| lastUsed := wd.lastUsed[address] |
| if lastUsed.IsZero() { |
| lastUsed = time.Time{} |
| } |
| models = append(models, modelUsageInfo{ |
| address: address, |
| model: model, |
| lastUsed: lastUsed, |
| }) |
| } |
|
|
| if len(models) == 0 { |
| wd.Unlock() |
| return |
| } |
|
|
| |
| sort.Slice(models, func(i, j int) bool { |
| return models[i].lastUsed.Before(models[j].lastUsed) |
| }) |
|
|
| |
| var lruModel *modelUsageInfo |
| for i := 0; i < len(models); i++ { |
| m := models[i] |
| _, isBusy := wd.busyTime[m.address] |
| if isBusy && !forceEvictionWhenBusy { |
| |
| xlog.Warn("[WatchDog] Skipping memory reclaimer eviction for busy model", "model", m.model, "reason", "model has active API calls") |
| continue |
| } |
| lruModel = &m |
| break |
| } |
|
|
| if lruModel == nil { |
| |
| wd.Unlock() |
| xlog.Warn("[WatchDog] Memory reclaimer cannot evict: all models are busy with active API calls") |
| return |
| } |
|
|
| xlog.Info("[WatchDog] Memory reclaimer evicting LRU model", "model", lruModel.model, "lastUsed", lruModel.lastUsed) |
|
|
| |
| wd.untrack(lruModel.address) |
| wd.Unlock() |
|
|
| |
| if err := wd.pm.ShutdownModel(lruModel.model); err != nil { |
| xlog.Error("[WatchDog] error shutting down model during memory reclamation", "error", err, "model", lruModel.model) |
| } else { |
| xlog.Info("[WatchDog] Memory reclaimer eviction complete", "model", lruModel.model) |
| } |
| } |
|
|
| func (wd *WatchDog) untrack(address string) { |
| delete(wd.busyTime, address) |
| delete(wd.idleTime, address) |
| delete(wd.lastUsed, address) |
| delete(wd.addressModelMap, address) |
| delete(wd.addressMap, address) |
| } |
|
|