From 0eb365f44cbaae399aca70d5ac7e5cc703760eab Mon Sep 17 00:00:00 2001 From: Ilya Sharov Date: Fri, 11 Apr 2025 00:33:49 +0300 Subject: [PATCH] Fixed json format and endstream --- main.go | 130 ++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 93 insertions(+), 37 deletions(-) diff --git a/main.go b/main.go index dcd8ea1..6e9f5bc 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "bufio" "encoding/json" "errors" + "fmt" "io" "log/slog" "net/http" @@ -136,6 +137,7 @@ func main() { var request struct { Model string `json:"model"` Messages []openai.ChatCompletionMessage `json:"messages"` + Stream *bool `json:"stream"` // Добавим поле Stream } // Parse the JSON request @@ -144,10 +146,28 @@ func main() { return } + // Определяем, нужен ли стриминг (по умолчанию true, если не указано для /api/chat) + // ВАЖНО: Open WebUI может НЕ передавать "stream": true для /api/chat, подразумевая это. + // Нужно проверить, какой запрос шлет Open WebUI. Если не шлет, ставим true. + streamRequested := true + if request.Stream != nil { + streamRequested = *request.Stream + } + + // Если стриминг не запрошен, нужно будет реализовать отдельную логику + // для сбора полного ответа и отправки его одним JSON. + // Пока реализуем только стриминг. + if !streamRequested { + // TODO: Реализовать не-потоковый ответ, если нужно + c.JSON(http.StatusNotImplemented, gin.H{"error": "Non-streaming response not implemented yet"}) + return + } + fullModelName, err := provider.GetFullModelName(request.Model) if err != nil { slog.Error("Error getting full model name", "Error", err) - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + // Ollama возвращает 404 на неправильное имя модели + c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) return } @@ -160,72 +180,108 @@ func main() { } defer stream.Close() // Ensure stream closure - // Set headers for streaming response - c.Writer.Header().Set("Content-Type", "application/json") - c.Writer.Header().Set("Transfer-Encoding", "chunked") - c.Status(http.StatusOK) + // --- ИСПРАВЛЕНИЯ для NDJSON (Ollama-style) --- + + // Set headers CORRECTLY for Newline Delimited JSON + c.Writer.Header().Set("Content-Type", "application/x-ndjson") // <--- КЛЮЧЕВОЕ ИЗМЕНЕНИЕ + c.Writer.Header().Set("Cache-Control", "no-cache") + c.Writer.Header().Set("Connection", "keep-alive") + // Transfer-Encoding: chunked устанавливается Gin автоматически + + w := c.Writer // Получаем ResponseWriter + flusher, ok := w.(http.Flusher) + if !ok { + slog.Error("Expected http.ResponseWriter to be an http.Flusher") + // Отправить ошибку клиенту уже сложно, т.к. заголовки могли уйти + return + } + + var lastFinishReason string // Stream responses back to the client for { response, err := stream.Recv() if errors.Is(err, io.EOF) { - // End of stream + // End of stream from the backend provider break } if err != nil { - // Handle errors - slog.Error("Stream error", "Error", err) - c.Status(http.StatusInternalServerError) - c.Writer.Write([]byte("Error streaming: " + err.Error() + "\n")) - c.Writer.Flush() + slog.Error("Backend stream error", "Error", err) + // Попытка отправить ошибку в формате NDJSON + // Ollama обычно просто обрывает соединение или шлет 500 перед этим + errorMsg := map[string]string{"error": "Stream error: " + err.Error()} + errorJson, _ := json.Marshal(errorMsg) + fmt.Fprintf(w, "%s\n", string(errorJson)) // Отправляем ошибку + \n + flusher.Flush() return } - // Build JSON response structure + // Сохраняем причину остановки, если она есть в чанке + if len(response.Choices) > 0 && response.Choices[0].FinishReason != "" { + lastFinishReason = string(response.Choices[0].FinishReason) + } + + // Build JSON response structure for intermediate chunks (Ollama chat format) responseJSON := map[string]interface{}{ "model": fullModelName, "created_at": time.Now().Format(time.RFC3339), "message": map[string]string{ "role": "assistant", - "content": response.Choices[0].Delta.Content, + "content": response.Choices[0].Delta.Content, // Может быть "" }, - "done": false, - "total_duration": 0, - "load_duration": 0, - "prompt_eval_count": nil, // Replace with actual prompt tokens if available - "eval_count": nil, // Replace with actual completion tokens if available - "eval_duration": 0, + "done": false, // Всегда false для промежуточных чанков } - // Marshal and send the JSON response - if err := json.NewEncoder(c.Writer).Encode(responseJSON); err != nil { - slog.Error("Error encoding response", "Error", err) - c.Status(http.StatusInternalServerError) - return + // Marshal JSON + jsonData, err := json.Marshal(responseJSON) + if err != nil { + slog.Error("Error marshaling intermediate response JSON", "Error", err) + return // Прерываем, так как не можем отправить данные } + // Send JSON object followed by a newline + fmt.Fprintf(w, "%s\n", string(jsonData)) // <--- ИЗМЕНЕНО: Формат NDJSON (JSON + \n) + // Flush data to send it immediately - c.Writer.Flush() + flusher.Flush() } - // Final response indicating the stream has ended - endResponse := map[string]interface{}{ - "model": fullModelName, - "created_at": time.Now().Format(time.RFC3339), - "message": map[string]string{ - "role": "assistant", - "content": "", - }, + // --- Отправка финального сообщения (done: true) в стиле Ollama --- + + // Определяем причину остановки (если бэкенд не дал, ставим 'stop') + // Ollama использует 'stop', 'length', 'content_filter', 'tool_calls' + if lastFinishReason == "" { + lastFinishReason = "stop" + } + + // ВАЖНО: Замените nil на 0 для числовых полей статистики + finalResponse := map[string]interface{}{ + "model": fullModelName, + "created_at": time.Now().Format(time.RFC3339), "done": true, + "finish_reason": lastFinishReason, // Необязательно для /api/chat Ollama, но не вредит "total_duration": 0, "load_duration": 0, - "prompt_eval_count": nil, - "eval_count": nil, + "prompt_eval_count": 0, // <--- ИЗМЕНЕНО: nil заменен на 0 + "eval_count": 0, // <--- ИЗМЕНЕНО: nil заменен на 0 "eval_duration": 0, } - if err := json.NewEncoder(c.Writer).Encode(endResponse); err != nil { - slog.Error("Error encoding end response", "Error", err) + + finalJsonData, err := json.Marshal(finalResponse) + if err != nil { + slog.Error("Error marshaling final response JSON", "Error", err) + return } + + // Отправляем финальный JSON-объект + newline + fmt.Fprintf(w, "%s\n", string(finalJsonData)) // <--- ИЗМЕНЕНО: Формат NDJSON + flusher.Flush() + + // ВАЖНО: Для NDJSON НЕТ 'data: [DONE]' маркера. + // Клиент понимает конец потока по получению объекта с "done": true + // и/или по закрытию соединения сервером (что Gin сделает автоматически после выхода из хендлера). + + // --- Конец исправлений --- }) r.Run(":11434")