llama.cpp/tools/server/server-models.cpp
Xuan-Son Nguyen ec18edfcba
server: introduce API for serving / loading / unloading multiple models (#17470)
* server: add model management and proxy

* fix compile error

* does this fix windows?

* fix windows build

* use subprocess.h, better logging

* add test

* fix windows

* feat: Model/Router server architecture WIP

* more stable

* fix unsafe pointer

* also allow terminate loading model

* add is_active()

* refactor: Architecture improvements

* tmp apply upstream fix

* address most problems

* address thread safety issue

* address review comment

* add docs (first version)

* address review comment

* feat: Improved UX for model information, modality interactions etc

* chore: update webui build output

* refactor: Use only the message data `model` property for displaying model used info

* chore: update webui build output

* add --models-dir param

* feat: New Model Selection UX WIP

* chore: update webui build output

* feat: Add auto-mic setting

* feat: Attachments UX improvements

* implement LRU

* remove default model path

* better --models-dir

* add env for args

* address review comments

* fix compile

* refactor: Chat Form Submit component

* ad endpoint docs

* Merge remote-tracking branch 'webui/allozaur/server_model_management_v1_2' into xsn/server_model_maagement_v1_2

Co-authored-by: Aleksander <aleksander.grygier@gmail.com>

* feat: Add copy to clipboard to model name in model info dialog

* feat: Model unavailable UI state for model selector

* feat: Chat Form Actions UI logic improvements

* feat: Auto-select model from last assistant response

* chore: update webui build output

* expose args and exit_code in API

* add note

* support extra_args on loading model

* allow reusing args if auto_load

* typo docs

* oai-compat /models endpoint

* cleaner

* address review comments

* feat: Use `model` property for displaying the `repo/model-name` naming format

* refactor: Attachments data

* chore: update webui build output

* refactor: Enum imports

* feat: Improve Model Selector responsiveness

* chore: update webui build output

* refactor: Cleanup

* refactor: Cleanup

* refactor: Formatters

* chore: update webui build output

* refactor: Copy To Clipboard Icon component

* chore: update webui build output

* refactor: Cleanup

* chore: update webui build output

* refactor: UI badges

* chore: update webui build output

* refactor: Cleanup

* refactor: Cleanup

* chore: update webui build output

* add --models-allow-extra-args for security

* nits

* add stdin_file

* fix merge

* fix: Retrieve lost setting after resolving merge conflict

* refactor: DatabaseStore -> DatabaseService

* refactor: Database, Conversations & Chat services + stores architecture improvements (WIP)

* refactor: Remove redundant settings

* refactor: Multi-model business logic WIP

* chore: update webui build output

* feat: Switching models logic for ChatForm or when regenerating messges + modality detection logic

* chore: update webui build output

* fix: Add `untrack` inside chat processing info data logic to prevent infinite effect

* fix: Regenerate

* feat: Remove redundant settigns + rearrange

* fix: Audio attachments

* refactor: Icons

* chore: update webui build output

* feat: Model management and selection features WIP

* chore: update webui build output

* refactor: Improve server properties management

* refactor: Icons

* chore: update webui build output

* feat: Improve model loading/unloading status updates

* chore: update webui build output

* refactor: Improve API header management via utility functions

* remove support for extra args

* set hf_repo/docker_repo as model alias when posible

* refactor: Remove ConversationsService

* refactor: Chat requests abort handling

* refactor: Server store

* tmp webui build

* refactor: Model modality handling

* chore: update webui build output

* refactor: Processing state reactivity

* fix: UI

* refactor: Services/Stores syntax + logic improvements

Refactors components to access stores directly instead of using exported getter functions.

This change centralizes store access and logic, simplifying component code and improving maintainability by reducing the number of exported functions and promoting direct store interaction.

Removes exported getter functions from `chat.svelte.ts`, `conversations.svelte.ts`, `models.svelte.ts` and `settings.svelte.ts`.

* refactor: Architecture cleanup

* feat: Improve statistic badges

* feat: Condition available models based on modality + better model loading strategy & UX

* docs: Architecture documentation

* feat: Update logic for PDF as Image

* add TODO for http client

* refactor: Enhance model info and attachment handling

* chore: update webui build output

* refactor: Components naming

* chore: update webui build output

* refactor: Cleanup

* refactor: DRY `getAttachmentDisplayItems` function + fix UI

* chore: update webui build output

* fix: Modality detection improvement for text-based PDF attachments

* refactor: Cleanup

* docs: Add info comment

* refactor: Cleanup

* re

* refactor: Cleanup

* refactor: Cleanup

* feat: Attachment logic & UI improvements

* refactor: Constants

* feat: Improve UI sidebar background color

* chore: update webui build output

* refactor: Utils imports + move types to `app.d.ts`

* test: Fix Storybook mocks

* chore: update webui build output

* test: Update Chat Form UI tests

* refactor: Tooltip Provider from core layout

* refactor: Tests to separate location

* decouple server_models from server_routes

* test: Move demo test  to tests/server

* refactor: Remove redundant method

* chore: update webui build output

* also route anthropic endpoints

* fix duplicated arg

* fix invalid ptr to shutdown_handler

* server : minor

* rm unused fn

* add ?autoload=true|false query param

* refactor: Remove redundant code

* docs: Update README documentations + architecture & data flow diagrams

* fix: Disable autoload on calling server props for the model

* chore: update webui build output

* fix ubuntu build

* fix: Model status reactivity

* fix: Modality detection for MODEL mode

* chore: update webui build output

---------

Co-authored-by: Aleksander Grygier <aleksander.grygier@gmail.com>
Co-authored-by: Georgi Gerganov <ggerganov@gmail.com>
2025-12-01 19:41:04 +01:00

921 lines
31 KiB
C++

#include "server-common.h"
#include "server-models.h"
#include "download.h"
#include <cpp-httplib/httplib.h> // TODO: remove this once we use HTTP client from download.h
#include <sheredom/subprocess.h>
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <cstring>
#include <atomic>
#include <chrono>
#include <queue>
#ifdef _WIN32
#include <winsock2.h>
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#endif
#define CMD_EXIT "exit"
struct local_model {
std::string name;
std::string path;
std::string path_mmproj;
};
static std::vector<local_model> list_local_models(const std::string & dir) {
if (!std::filesystem::exists(dir) || !std::filesystem::is_directory(dir)) {
throw std::runtime_error(string_format("error: '%s' does not exist or is not a directory\n", dir.c_str()));
}
std::vector<local_model> models;
auto scan_subdir = [&models](const std::string & subdir_path, const std::string & name) {
auto files = fs_list(subdir_path, false);
common_file_info model_file;
common_file_info first_shard_file;
common_file_info mmproj_file;
for (const auto & file : files) {
if (string_ends_with(file.name, ".gguf")) {
if (file.name.find("mmproj") != std::string::npos) {
mmproj_file = file;
} else if (file.name.find("-00001-of-") != std::string::npos) {
first_shard_file = file;
} else {
model_file = file;
}
}
}
// single file model
local_model model{
/* name */ name,
/* path */ first_shard_file.path.empty() ? model_file.path : first_shard_file.path,
/* path_mmproj */ mmproj_file.path // can be empty
};
if (!model.path.empty()) {
models.push_back(model);
}
};
auto files = fs_list(dir, true);
for (const auto & file : files) {
if (file.is_dir) {
scan_subdir(file.path, file.name);
} else if (string_ends_with(file.name, ".gguf")) {
// single file model
std::string name = file.name;
string_replace_all(name, ".gguf", "");
local_model model{
/* name */ name,
/* path */ file.path,
/* path_mmproj */ ""
};
models.push_back(model);
}
}
return models;
}
//
// server_models
//
server_models::server_models(
const common_params & params,
int argc,
char ** argv,
char ** envp) : base_params(params) {
for (int i = 0; i < argc; i++) {
base_args.push_back(std::string(argv[i]));
}
for (char ** env = envp; *env != nullptr; env++) {
base_env.push_back(std::string(*env));
}
// TODO: allow refreshing cached model list
// add cached models
auto cached_models = common_list_cached_models();
for (const auto & model : cached_models) {
server_model_meta meta{
/* name */ model.to_string(),
/* path */ model.manifest_path,
/* path_mmproj */ "", // auto-detected when loading
/* in_cache */ true,
/* port */ 0,
/* status */ SERVER_MODEL_STATUS_UNLOADED,
/* last_used */ 0,
/* args */ std::vector<std::string>(),
/* exit_code */ 0
};
mapping[meta.name] = instance_t{
/* subproc */ std::make_shared<subprocess_s>(),
/* th */ std::thread(),
/* meta */ meta
};
}
// add local models specificed via --models-dir
if (!params.models_dir.empty()) {
auto local_models = list_local_models(params.models_dir);
for (const auto & model : local_models) {
if (mapping.find(model.name) != mapping.end()) {
// already exists in cached models, skip
continue;
}
server_model_meta meta{
/* name */ model.name,
/* path */ model.path,
/* path_mmproj */ model.path_mmproj,
/* in_cache */ false,
/* port */ 0,
/* status */ SERVER_MODEL_STATUS_UNLOADED,
/* last_used */ 0,
/* args */ std::vector<std::string>(),
/* exit_code */ 0
};
mapping[meta.name] = instance_t{
/* subproc */ std::make_shared<subprocess_s>(),
/* th */ std::thread(),
/* meta */ meta
};
}
}
}
void server_models::update_meta(const std::string & name, const server_model_meta & meta) {
std::lock_guard<std::mutex> lk(mutex);
auto it = mapping.find(name);
if (it != mapping.end()) {
it->second.meta = meta;
}
cv.notify_all(); // notify wait_until_loaded
}
bool server_models::has_model(const std::string & name) {
std::lock_guard<std::mutex> lk(mutex);
return mapping.find(name) != mapping.end();
}
std::optional<server_model_meta> server_models::get_meta(const std::string & name) {
std::lock_guard<std::mutex> lk(mutex);
auto it = mapping.find(name);
if (it != mapping.end()) {
return it->second.meta;
}
return std::nullopt;
}
static int get_free_port() {
#ifdef _WIN32
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
return -1;
}
typedef SOCKET native_socket_t;
#define INVALID_SOCKET_VAL INVALID_SOCKET
#define CLOSE_SOCKET(s) closesocket(s)
#else
typedef int native_socket_t;
#define INVALID_SOCKET_VAL -1
#define CLOSE_SOCKET(s) close(s)
#endif
native_socket_t sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == INVALID_SOCKET_VAL) {
#ifdef _WIN32
WSACleanup();
#endif
return -1;
}
struct sockaddr_in serv_addr;
std::memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(0);
if (bind(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) != 0) {
CLOSE_SOCKET(sock);
#ifdef _WIN32
WSACleanup();
#endif
return -1;
}
#ifdef _WIN32
int namelen = sizeof(serv_addr);
#else
socklen_t namelen = sizeof(serv_addr);
#endif
if (getsockname(sock, (struct sockaddr*)&serv_addr, &namelen) != 0) {
CLOSE_SOCKET(sock);
#ifdef _WIN32
WSACleanup();
#endif
return -1;
}
int port = ntohs(serv_addr.sin_port);
CLOSE_SOCKET(sock);
#ifdef _WIN32
WSACleanup();
#endif
return port;
}
// helper to convert vector<string> to char **
// pointers are only valid as long as the original vector is valid
static std::vector<char *> to_char_ptr_array(const std::vector<std::string> & vec) {
std::vector<char *> result;
result.reserve(vec.size() + 1);
for (const auto & s : vec) {
result.push_back(const_cast<char*>(s.c_str()));
}
result.push_back(nullptr);
return result;
}
std::vector<server_model_meta> server_models::get_all_meta() {
std::lock_guard<std::mutex> lk(mutex);
std::vector<server_model_meta> result;
result.reserve(mapping.size());
for (const auto & [name, inst] : mapping) {
result.push_back(inst.meta);
}
return result;
}
void server_models::unload_lru() {
if (base_params.models_max <= 0) {
return; // no limit
}
// remove one of the servers if we passed the models_max (least recently used - LRU)
std::string lru_model_name = "";
int64_t lru_last_used = ggml_time_ms();
size_t count_active = 0;
{
std::lock_guard<std::mutex> lk(mutex);
for (const auto & m : mapping) {
if (m.second.meta.is_active()) {
count_active++;
if (m.second.meta.last_used < lru_last_used) {
lru_model_name = m.first;
lru_last_used = m.second.meta.last_used;
}
}
}
}
if (!lru_model_name.empty() && count_active >= (size_t)base_params.models_max) {
SRV_INF("models_max limit reached, removing LRU name=%s\n", lru_model_name.c_str());
unload(lru_model_name);
}
}
static void add_or_replace_arg(std::vector<std::string> & args, const std::string & key, const std::string & value) {
for (size_t i = 0; i < args.size(); i++) {
if (args[i] == key && i + 1 < args.size()) {
args[i + 1] = value;
return;
}
}
// not found, append
args.push_back(key);
args.push_back(value);
}
void server_models::load(const std::string & name, bool auto_load) {
if (!has_model(name)) {
throw std::runtime_error("model name=" + name + " is not found");
}
unload_lru();
std::lock_guard<std::mutex> lk(mutex);
auto meta = mapping[name].meta;
if (meta.status != SERVER_MODEL_STATUS_UNLOADED) {
SRV_INF("model %s is not ready\n", name.c_str());
return;
}
// prepare new instance info
instance_t inst;
inst.meta = meta;
inst.meta.port = get_free_port();
inst.meta.status = SERVER_MODEL_STATUS_LOADING;
inst.meta.last_used = ggml_time_ms();
if (inst.meta.port <= 0) {
throw std::runtime_error("failed to get a port number");
}
inst.subproc = std::make_shared<subprocess_s>();
{
SRV_INF("spawning server instance with name=%s on port %d\n", inst.meta.name.c_str(), inst.meta.port);
std::vector<std::string> child_args;
if (auto_load && !meta.args.empty()) {
child_args = meta.args; // copy previous args
} else {
child_args = base_args; // copy
if (inst.meta.in_cache) {
add_or_replace_arg(child_args, "-hf", inst.meta.name);
} else {
add_or_replace_arg(child_args, "-m", inst.meta.path);
if (!inst.meta.path_mmproj.empty()) {
add_or_replace_arg(child_args, "--mmproj", inst.meta.path_mmproj);
}
}
}
// set model args
add_or_replace_arg(child_args, "--port", std::to_string(inst.meta.port));
add_or_replace_arg(child_args, "--alias", inst.meta.name);
std::vector<std::string> child_env = base_env; // copy
child_env.push_back("LLAMA_SERVER_ROUTER_PORT=" + std::to_string(base_params.port));
SRV_INF("%s", "spawning server instance with args:\n");
for (const auto & arg : child_args) {
SRV_INF(" %s\n", arg.c_str());
}
inst.meta.args = child_args; // save for debugging
std::vector<char *> argv = to_char_ptr_array(child_args);
std::vector<char *> envp = to_char_ptr_array(child_env);
int options = subprocess_option_no_window | subprocess_option_combined_stdout_stderr;
int result = subprocess_create_ex(argv.data(), options, envp.data(), inst.subproc.get());
if (result != 0) {
throw std::runtime_error("failed to spawn server instance");
}
inst.stdin_file = subprocess_stdin(inst.subproc.get());
}
// start a thread to manage the child process
// captured variables are guaranteed to be destroyed only after the thread is joined
inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port]() {
// read stdout/stderr and forward to main server log
FILE * p_stdout_stderr = subprocess_stdout(child_proc.get());
if (p_stdout_stderr) {
char buffer[4096];
while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) {
LOG("[%5d] %s", port, buffer);
}
} else {
SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str());
}
// we reach here when the child process exits
int exit_code = 0;
subprocess_join(child_proc.get(), &exit_code);
subprocess_destroy(child_proc.get());
// update PID and status
{
std::lock_guard<std::mutex> lk(mutex);
auto it = mapping.find(name);
if (it != mapping.end()) {
auto & meta = it->second.meta;
meta.exit_code = exit_code;
meta.status = SERVER_MODEL_STATUS_UNLOADED;
}
cv.notify_all();
}
SRV_INF("instance name=%s exited with status %d\n", name.c_str(), exit_code);
});
// clean up old process/thread if exists
{
auto & old_instance = mapping[name];
// old process should have exited already, but just in case, we clean it up here
if (subprocess_alive(old_instance.subproc.get())) {
SRV_WRN("old process for model name=%s is still alive, this is unexpected\n", name.c_str());
subprocess_terminate(old_instance.subproc.get()); // force kill
}
if (old_instance.th.joinable()) {
old_instance.th.join();
}
}
mapping[name] = std::move(inst);
cv.notify_all();
}
static void interrupt_subprocess(FILE * stdin_file) {
// because subprocess.h does not provide a way to send SIGINT,
// we will send a command to the child process to exit gracefully
if (stdin_file) {
fprintf(stdin_file, "%s\n", CMD_EXIT);
fflush(stdin_file);
}
}
void server_models::unload(const std::string & name) {
std::lock_guard<std::mutex> lk(mutex);
auto it = mapping.find(name);
if (it != mapping.end()) {
if (it->second.meta.is_active()) {
SRV_INF("unloading model instance name=%s\n", name.c_str());
interrupt_subprocess(it->second.stdin_file);
// status change will be handled by the managing thread
} else {
SRV_WRN("model instance name=%s is not loaded\n", name.c_str());
}
}
}
void server_models::unload_all() {
std::vector<std::thread> to_join;
{
std::lock_guard<std::mutex> lk(mutex);
for (auto & [name, inst] : mapping) {
if (inst.meta.is_active()) {
SRV_INF("unloading model instance name=%s\n", name.c_str());
interrupt_subprocess(inst.stdin_file);
// status change will be handled by the managing thread
}
// moving the thread to join list to avoid deadlock
to_join.push_back(std::move(inst.th));
}
}
for (auto & th : to_join) {
if (th.joinable()) {
th.join();
}
}
}
void server_models::update_status(const std::string & name, server_model_status status) {
// for now, we only allow updating to LOADED status
if (status != SERVER_MODEL_STATUS_LOADED) {
throw std::runtime_error("invalid status value");
}
auto meta = get_meta(name);
if (meta.has_value()) {
meta->status = status;
update_meta(name, meta.value());
}
}
void server_models::wait_until_loaded(const std::string & name) {
std::unique_lock<std::mutex> lk(mutex);
cv.wait(lk, [this, &name]() {
auto it = mapping.find(name);
if (it != mapping.end()) {
return it->second.meta.status != SERVER_MODEL_STATUS_LOADING;
}
return false;
});
}
bool server_models::ensure_model_loaded(const std::string & name) {
auto meta = get_meta(name);
if (!meta.has_value()) {
throw std::runtime_error("model name=" + name + " is not found");
}
if (meta->status == SERVER_MODEL_STATUS_LOADED) {
return false; // already loaded
}
if (meta->status == SERVER_MODEL_STATUS_UNLOADED) {
SRV_INF("model name=%s is not loaded, loading...\n", name.c_str());
load(name, true);
}
SRV_INF("waiting until model name=%s is fully loaded...\n", name.c_str());
wait_until_loaded(name);
// check final status
meta = get_meta(name);
if (!meta.has_value() || meta->is_failed()) {
throw std::runtime_error("model name=" + name + " failed to load");
}
return true;
}
server_http_res_ptr server_models::proxy_request(const server_http_req & req, const std::string & method, const std::string & name, bool update_last_used) {
auto meta = get_meta(name);
if (!meta.has_value()) {
throw std::runtime_error("model name=" + name + " is not found");
}
if (meta->status != SERVER_MODEL_STATUS_LOADED) {
throw std::invalid_argument("model name=" + name + " is not loaded");
}
if (update_last_used) {
std::unique_lock<std::mutex> lk(mutex);
mapping[name].meta.last_used = ggml_time_ms();
}
SRV_INF("proxying request to model %s on port %d\n", name.c_str(), meta->port);
auto proxy = std::make_unique<server_http_proxy>(
method,
base_params.hostname,
meta->port,
req.path,
req.headers,
req.body,
req.should_stop);
return proxy;
}
std::thread server_models::setup_child_server(const common_params & base_params, int router_port, const std::string & name, std::function<void(int)> & shutdown_handler) {
// send a notification to the router server that a model instance is ready
// TODO @ngxson : use HTTP client from libcommon
httplib::Client cli(base_params.hostname, router_port);
cli.set_connection_timeout(0, 200000); // 200 milliseconds
httplib::Request req;
req.method = "POST";
req.path = "/models/status";
req.set_header("Content-Type", "application/json");
if (!base_params.api_keys.empty()) {
req.set_header("Authorization", "Bearer " + base_params.api_keys[0]);
}
json body;
body["model"] = name;
body["value"] = server_model_status_to_string(SERVER_MODEL_STATUS_LOADED);
req.body = body.dump();
SRV_INF("notifying router server (port=%d) that model %s is ready\n", router_port, name.c_str());
auto result = cli.send(std::move(req));
if (result.error() != httplib::Error::Success) {
auto err_str = httplib::to_string(result.error());
SRV_ERR("failed to notify router server: %s\n", err_str.c_str());
exit(1); // force exit
}
// setup thread for monitoring stdin
return std::thread([shutdown_handler]() {
// wait for EOF on stdin
SRV_INF("%s", "child server monitoring thread started, waiting for EOF on stdin...\n");
bool eof = false;
while (true) {
std::string line;
if (!std::getline(std::cin, line)) {
// EOF detected, that means the router server is unexpectedly exit or killed
eof = true;
break;
}
if (line.find(CMD_EXIT) != std::string::npos) {
SRV_INF("%s", "exit command received, exiting...\n");
shutdown_handler(0);
break;
}
}
if (eof) {
SRV_INF("%s", "EOF on stdin detected, forcing shutdown...\n");
exit(1);
}
});
}
//
// server_models_routes
//
static void res_ok(std::unique_ptr<server_http_res> & res, const json & response_data) {
res->status = 200;
res->data = safe_json_to_str(response_data);
}
static void res_error(std::unique_ptr<server_http_res> & res, const json & error_data) {
res->status = json_value(error_data, "code", 500);
res->data = safe_json_to_str({{ "error", error_data }});
}
static bool router_validate_model(const std::string & name, server_models & models, bool models_autoload, std::unique_ptr<server_http_res> & res) {
if (name.empty()) {
res_error(res, format_error_response("model name is missing from the request", ERROR_TYPE_INVALID_REQUEST));
return false;
}
auto meta = models.get_meta(name);
if (!meta.has_value()) {
res_error(res, format_error_response("model not found", ERROR_TYPE_INVALID_REQUEST));
return false;
}
if (models_autoload) {
models.ensure_model_loaded(name);
} else {
if (meta->status != SERVER_MODEL_STATUS_LOADED) {
res_error(res, format_error_response("model is not loaded", ERROR_TYPE_INVALID_REQUEST));
return false;
}
}
return true;
}
static bool is_autoload(const common_params & params, const server_http_req & req) {
std::string autoload = req.get_param("autoload");
if (autoload.empty()) {
return params.models_autoload;
} else {
return autoload == "true" || autoload == "1";
}
}
void server_models_routes::init_routes() {
this->get_router_props = [this](const server_http_req & req) {
std::string name = req.get_param("model");
if (name.empty()) {
// main instance
auto res = std::make_unique<server_http_res>();
res_ok(res, {
// TODO: add support for this on web UI
{"role", "router"},
{"max_instances", 4}, // dummy value for testing
// this is a dummy response to make sure webui doesn't break
{"model_alias", "llama-server"},
{"model_path", "none"},
{"default_generation_settings", {
{"params", json{}},
{"n_ctx", 0},
}},
});
return res;
}
return proxy_get(req);
};
this->proxy_get = [this](const server_http_req & req) {
std::string method = "GET";
std::string name = req.get_param("model");
bool autoload = is_autoload(params, req);
auto error_res = std::make_unique<server_http_res>();
if (!router_validate_model(name, models, autoload, error_res)) {
return error_res;
}
return models.proxy_request(req, method, name, false);
};
this->proxy_post = [this](const server_http_req & req) {
std::string method = "POST";
json body = json::parse(req.body);
std::string name = json_value(body, "model", std::string());
bool autoload = is_autoload(params, req);
auto error_res = std::make_unique<server_http_res>();
if (!router_validate_model(name, models, autoload, error_res)) {
return error_res;
}
return models.proxy_request(req, method, name, true); // update last usage for POST request only
};
this->get_router_models = [this](const server_http_req &) {
auto res = std::make_unique<server_http_res>();
json models_json = json::array();
auto all_models = models.get_all_meta();
std::time_t t = std::time(0);
for (const auto & meta : all_models) {
json status {
{"value", server_model_status_to_string(meta.status)},
{"args", meta.args},
};
if (meta.is_failed()) {
status["exit_code"] = meta.exit_code;
status["failed"] = true;
}
models_json.push_back(json {
{"id", meta.name},
{"object", "model"}, // for OAI-compat
{"owned_by", "llamacpp"}, // for OAI-compat
{"created", t}, // for OAI-compat
{"in_cache", meta.in_cache},
{"path", meta.path},
{"status", status},
// TODO: add other fields, may require reading GGUF metadata
});
}
res_ok(res, {
{"data", models_json},
{"object", "list"},
});
return res;
};
this->post_router_models_load = [this](const server_http_req & req) {
auto res = std::make_unique<server_http_res>();
json body = json::parse(req.body);
std::string name = json_value(body, "model", std::string());
auto model = models.get_meta(name);
if (!model.has_value()) {
res_error(res, format_error_response("model is not found", ERROR_TYPE_NOT_FOUND));
return res;
}
if (model->status == SERVER_MODEL_STATUS_LOADED) {
res_error(res, format_error_response("model is already loaded", ERROR_TYPE_INVALID_REQUEST));
return res;
}
models.load(name, false);
res_ok(res, {{"success", true}});
return res;
};
// used by child process to notify the router about status change
// TODO @ngxson : maybe implement authentication for this endpoint in the future
this->post_router_models_status = [this](const server_http_req & req) {
auto res = std::make_unique<server_http_res>();
json body = json::parse(req.body);
std::string model = json_value(body, "model", std::string());
std::string value = json_value(body, "value", std::string());
models.update_status(model, server_model_status_from_string(value));
res_ok(res, {{"success", true}});
return res;
};
this->get_router_models = [this](const server_http_req &) {
auto res = std::make_unique<server_http_res>();
json models_json = json::array();
auto all_models = models.get_all_meta();
std::time_t t = std::time(0);
for (const auto & meta : all_models) {
json status {
{"value", server_model_status_to_string(meta.status)},
{"args", meta.args},
};
if (meta.is_failed()) {
status["exit_code"] = meta.exit_code;
status["failed"] = true;
}
models_json.push_back(json {
{"id", meta.name},
{"object", "model"}, // for OAI-compat
{"owned_by", "llamacpp"}, // for OAI-compat
{"created", t}, // for OAI-compat
{"in_cache", meta.in_cache},
{"path", meta.path},
{"status", status},
// TODO: add other fields, may require reading GGUF metadata
});
}
res_ok(res, {
{"data", models_json},
{"object", "list"},
});
return res;
};
this->post_router_models_unload = [this](const server_http_req & req) {
auto res = std::make_unique<server_http_res>();
json body = json::parse(req.body);
std::string name = json_value(body, "model", std::string());
auto model = models.get_meta(name);
if (!model.has_value()) {
res_error(res, format_error_response("model is not found", ERROR_TYPE_INVALID_REQUEST));
return res;
}
if (model->status != SERVER_MODEL_STATUS_LOADED) {
res_error(res, format_error_response("model is not loaded", ERROR_TYPE_INVALID_REQUEST));
return res;
}
models.unload(name);
res_ok(res, {{"success", true}});
return res;
};
}
//
// server_http_proxy
//
// simple implementation of a pipe
// used for streaming data between threads
template<typename T>
struct pipe_t {
std::mutex mutex;
std::condition_variable cv;
std::queue<T> queue;
std::atomic<bool> writer_closed{false};
std::atomic<bool> reader_closed{false};
void close_write() {
writer_closed.store(true, std::memory_order_relaxed);
cv.notify_all();
}
void close_read() {
reader_closed.store(true, std::memory_order_relaxed);
cv.notify_all();
}
bool read(T & output, const std::function<bool()> & should_stop) {
std::unique_lock<std::mutex> lk(mutex);
constexpr auto poll_interval = std::chrono::milliseconds(500);
while (true) {
if (!queue.empty()) {
output = std::move(queue.front());
queue.pop();
return true;
}
if (writer_closed.load()) {
return false; // clean EOF
}
if (should_stop()) {
close_read(); // signal broken pipe to writer
return false; // cancelled / reader no longer alive
}
cv.wait_for(lk, poll_interval);
}
}
bool write(T && data) {
std::lock_guard<std::mutex> lk(mutex);
if (reader_closed.load()) {
return false; // broken pipe
}
queue.push(std::move(data));
cv.notify_one();
return true;
}
};
server_http_proxy::server_http_proxy(
const std::string & method,
const std::string & host,
int port,
const std::string & path,
const std::map<std::string, std::string> & headers,
const std::string & body,
const std::function<bool()> should_stop) {
// shared between reader and writer threads
auto cli = std::make_shared<httplib::Client>(host, port);
auto pipe = std::make_shared<pipe_t<msg_t>>();
// setup Client
cli->set_connection_timeout(0, 200000); // 200 milliseconds
this->status = 500; // to be overwritten upon response
this->cleanup = [pipe]() {
pipe->close_read();
pipe->close_write();
};
// wire up the receive end of the pipe
this->next = [pipe, should_stop](std::string & out) -> bool {
msg_t msg;
bool has_next = pipe->read(msg, should_stop);
if (!msg.data.empty()) {
out = std::move(msg.data);
}
return has_next; // false if EOF or pipe broken
};
// wire up the HTTP client
// note: do NOT capture `this` pointer, as it may be destroyed before the thread ends
httplib::ResponseHandler response_handler = [pipe, cli](const httplib::Response & response) {
msg_t msg;
msg.status = response.status;
for (const auto & [key, value] : response.headers) {
msg.headers[key] = value;
}
return pipe->write(std::move(msg)); // send headers first
};
httplib::ContentReceiverWithProgress content_receiver = [pipe](const char * data, size_t data_length, size_t, size_t) {
// send data chunks
// returns false if pipe is closed / broken (signal to stop receiving)
return pipe->write({{}, 0, std::string(data, data_length)});
};
// prepare the request to destination server
httplib::Request req;
{
req.method = method;
req.path = path;
for (const auto & [key, value] : headers) {
req.set_header(key, value);
}
req.body = body;
req.response_handler = response_handler;
req.content_receiver = content_receiver;
}
// start the proxy thread
SRV_DBG("start proxy thread %s %s\n", req.method.c_str(), req.path.c_str());
this->thread = std::thread([cli, pipe, req]() {
auto result = cli->send(std::move(req));
if (result.error() != httplib::Error::Success) {
auto err_str = httplib::to_string(result.error());
SRV_ERR("http client error: %s\n", err_str.c_str());
pipe->write({{}, 500, ""}); // header
pipe->write({{}, 0, "proxy error: " + err_str}); // body
}
pipe->close_write(); // signal EOF to reader
SRV_DBG("%s", "client request thread ended\n");
});
this->thread.detach();
// wait for the first chunk (headers)
msg_t header;
if (pipe->read(header, should_stop)) {
SRV_DBG("%s", "received response headers\n");
this->status = header.status;
this->headers = header.headers;
} else {
SRV_DBG("%s", "no response headers received (request cancelled?)\n");
}
}