Compare commits
2 Commits
main
...
brucemacd/
Author | SHA1 | Date | |
---|---|---|---|
|
4ad24c6ca6 | ||
|
de0f833ce3 |
@ -164,6 +164,8 @@ func newDynExtServer(library, model string, adapters, projectors []string, opts
|
|||||||
return llm, nil
|
return llm, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ErrPredictTimeout = fmt.Errorf("timed out waiting for next token")
|
||||||
|
|
||||||
func (llm *dynExtServer) Predict(ctx context.Context, predict PredictOpts, fn func(PredictResult)) error {
|
func (llm *dynExtServer) Predict(ctx context.Context, predict PredictOpts, fn func(PredictResult)) error {
|
||||||
resp := newExtServerResp(128)
|
resp := newExtServerResp(128)
|
||||||
defer freeExtServerResp(resp)
|
defer freeExtServerResp(resp)
|
||||||
@ -237,8 +239,19 @@ func (llm *dynExtServer) Predict(ctx context.Context, predict PredictOpts, fn fu
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return cancelCompletion(llm, resp)
|
return cancelCompletion(llm, resp)
|
||||||
default:
|
default:
|
||||||
|
// this channel is used to communicate the result of each call, while allowing for a timeout
|
||||||
|
resultChan := make(chan C.ext_server_task_result_t)
|
||||||
|
// timeout waiting for a token from this specific call
|
||||||
|
timeout := time.After(30 * time.Second)
|
||||||
|
|
||||||
|
go func() {
|
||||||
var result C.ext_server_task_result_t
|
var result C.ext_server_task_result_t
|
||||||
C.dyn_llama_server_completion_next_result(llm.s, resp.id, &result)
|
C.dyn_llama_server_completion_next_result(llm.s, resp.id, &result)
|
||||||
|
resultChan <- result
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case result := <-resultChan:
|
||||||
json_resp := C.GoString(result.json_resp)
|
json_resp := C.GoString(result.json_resp)
|
||||||
C.dyn_llama_server_release_task_result(llm.s, &result)
|
C.dyn_llama_server_release_task_result(llm.s, &result)
|
||||||
|
|
||||||
@ -288,6 +301,12 @@ func (llm *dynExtServer) Predict(ctx context.Context, predict PredictOpts, fn fu
|
|||||||
})
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
case <-timeout:
|
||||||
|
if err := cancelCompletion(llm, resp); err != nil {
|
||||||
|
slog.Error("failed to cancel completion on predict timeout: ", err)
|
||||||
|
}
|
||||||
|
return ErrPredictTimeout
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !retryNeeded {
|
if !retryNeeded {
|
||||||
|
@ -1362,6 +1362,15 @@ func ChatHandler(c *gin.Context) {
|
|||||||
Options: opts,
|
Options: opts,
|
||||||
}
|
}
|
||||||
if err := loaded.runner.Predict(c.Request.Context(), predictReq, fn); err != nil {
|
if err := loaded.runner.Predict(c.Request.Context(), predictReq, fn); err != nil {
|
||||||
|
if errors.Is(err, llm.ErrPredictTimeout) {
|
||||||
|
// the loaded runner may be unresponsive, stop it now
|
||||||
|
if loaded.runner != nil {
|
||||||
|
loaded.runner.Close()
|
||||||
|
}
|
||||||
|
loaded.runner = nil
|
||||||
|
loaded.Model = nil
|
||||||
|
loaded.Options = nil
|
||||||
|
}
|
||||||
ch <- gin.H{"error": err.Error()}
|
ch <- gin.H{"error": err.Error()}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user