[GH-ISSUE #97] [Question]: 如何让DeepClaude在向Gemini发送请求时不以流式传输方式进行请求 #69

Closed
opened 2026-02-27 01:56:04 +03:00 by kerem · 2 comments
Owner

Originally created by @FFei-Zhou on GitHub (Mar 15, 2025).
Original GitHub issue: https://github.com/ErlichLiu/DeepClaude/issues/97

提问前确认

  • 我已经查看了项目文档并没有找到答案
  • 我已经搜索了现有的 issues,确认没有类似的问题

问题类别

使用方法

您的问题

感谢大佬开源此项目!

各位大佬,如题↑
我使用SillyTavern时,发现我关闭流式传输只是关闭了本项目的流式传输,实际向Gemini发送请求时还是以流式传输的方式发送请求。

因为Gemini的流式传输和非流式传输的审核是不一样的,如果是流式传输,会直接在生成到一半时半路截断,但是非流式的截断是完全没有内容。我关闭ST的流式传输,得到的内容是半路截断的,因此我暂时认为是项目发送请求时没有关闭流式传输。

我是小白,不知道对不对,求大佬看到了指点两句,帮帮忙。

背景信息

最新版本

代码示例(如适用)


其他信息

No response

Originally created by @FFei-Zhou on GitHub (Mar 15, 2025). Original GitHub issue: https://github.com/ErlichLiu/DeepClaude/issues/97 ### 提问前确认 - [x] 我已经查看了项目文档并没有找到答案 - [x] 我已经搜索了现有的 issues,确认没有类似的问题 ### 问题类别 使用方法 ### 您的问题 感谢大佬开源此项目! 各位大佬,如题↑ 我使用SillyTavern时,发现我关闭流式传输只是关闭了本项目的流式传输,实际向Gemini发送请求时还是以流式传输的方式发送请求。 因为Gemini的流式传输和非流式传输的审核是不一样的,如果是流式传输,会直接在生成到一半时半路截断,但是非流式的截断是完全没有内容。我关闭ST的流式传输,得到的内容是半路截断的,因此我暂时认为是项目发送请求时没有关闭流式传输。 我是小白,不知道对不对,求大佬看到了指点两句,帮帮忙。 ### 背景信息 最新版本 ### 代码示例(如适用) ```shell ``` ### 其他信息 _No response_
kerem 2026-02-27 01:56:04 +03:00
  • closed this issue
  • added the
    question
    label
Author
Owner

@FFei-Zhou commented on GitHub (Mar 15, 2025):

最终在深蓝和哈基米的联手协助下,三次回复就搞定了,真的逆天
下面是深度基米提供的解决方案:
openai_composite.py的代码改成:

"""OpenAI 兼容的组合模型服务,用于协调 DeepSeek 和其他 OpenAI 兼容模型的调用"""

import asyncio
import json
import time
from typing import AsyncGenerator, Dict, Any, List

from app.clients import DeepSeekClient
from app.clients.openai_compatible_client import OpenAICompatibleClient
from app.utils.logger import logger


class OpenAICompatibleComposite:
    """处理 DeepSeek 和其他 OpenAI 兼容模型的流式输出衔接"""

    def __init__(
        self,
        deepseek_api_key: str,
        openai_api_key: str,
        deepseek_api_url: str = "https://api.deepseek.com/v1/chat/completions",
        openai_api_url: str = "",  # 将由具体实现提供
        is_origin_reasoning: bool = True,
        proxy: str = None,
    ):
        """初始化 API 客户端

        Args:
            deepseek_api_key: DeepSeek API密钥
            openai_api_key: OpenAI 兼容服务的 API密钥
            deepseek_api_url: DeepSeek API地址
            openai_api_url: OpenAI 兼容服务的 API地址
            is_origin_reasoning: 是否使用原始推理过程
            proxy: 代理服务器地址
        """
        self.deepseek_client = DeepSeekClient(deepseek_api_key, deepseek_api_url, proxy=proxy)
        self.openai_client = OpenAICompatibleClient(openai_api_key, openai_api_url, proxy=proxy)
        self.is_origin_reasoning = is_origin_reasoning

    async def chat_completions_with_stream(
        self,
        messages: List[Dict[str, str]],
        model_arg: tuple[float, float, float, float],
        deepseek_model: str = "deepseek-reasoner",
        target_model: str = "",
    ) -> AsyncGenerator[bytes, None]:
        """处理完整的流式输出过程 (unchanged - still handles streaming for both)"""
        # 生成唯一的会话ID和时间戳
        chat_id = f"chatcmpl-{hex(int(time.time() * 1000))[2:]}"
        created_time = int(time.time())

        # 创建队列,用于收集输出数据
        output_queue = asyncio.Queue()
        # 队列,用于传递 DeepSeek 推理内容
        reasoning_queue = asyncio.Queue()

        # 用于存储 DeepSeek 的推理累积内容
        reasoning_content = []

        async def process_deepseek():
            logger.info(f"开始处理 DeepSeek 流,使用模型:{deepseek_model}")
            try:
                async for content_type, content in self.deepseek_client.stream_chat(
                    messages, deepseek_model, self.is_origin_reasoning
                ):
                    if content_type == "reasoning":
                        reasoning_content.append(content)
                        response = {
                            "id": chat_id,
                            "object": "chat.completion.chunk",
                            "created": created_time,
                            "model": deepseek_model,
                            "choices": [
                                {
                                    "index": 0,
                                    "delta": {
                                        "role": "assistant",
                                        "reasoning_content": content,
                                        "content": "",
                                    },
                                }
                            ],
                        }
                        await output_queue.put(
                            f"data: {json.dumps(response)}\n\n".encode("utf-8")
                        )
                    elif content_type == "content":
                        # 当收到 content 类型时,将完整的推理内容发送到 reasoning_queue
                        logger.info(
                            f"DeepSeek 推理完成,收集到的推理内容长度:{len(''.join(reasoning_content))}"
                        )
                        await reasoning_queue.put("".join(reasoning_content))
                        break
            except Exception as e:
                logger.error(f"处理 DeepSeek 流时发生错误: {e}")
                await reasoning_queue.put("")
            # 标记 DeepSeek 任务结束
            logger.info("DeepSeek 任务处理完成,标记结束")
            await output_queue.put(None)

        async def process_openai():
            try:
                logger.info("等待获取 DeepSeek 的推理内容...")
                reasoning = await reasoning_queue.get()
                logger.debug(
                    f"获取到推理内容,内容长度:{len(reasoning) if reasoning else 0}"
                )
                if not reasoning:
                    logger.warning("未能获取到有效的推理内容,将使用默认提示继续")
                    reasoning = "获取推理内容失败"

                # 构造 OpenAI 的输入消息
                openai_messages = messages.copy()
                combined_content = f"""
                Here's my another model's reasoning process:\n{reasoning}\n\n
                Based on this reasoning, provide your response directly to me:"""

                # 检查过滤后的消息列表是否为空
                if not openai_messages:
                    raise ValueError("消息列表为空,无法处理请求")

                # 获取最后一个消息并检查其角色
                last_message = openai_messages[-1]
                if last_message.get("role", "") != "user":
                    raise ValueError("最后一个消息的角色不是用户,无法处理请求")

                # 修改最后一个消息的内容
                original_content = last_message["content"]
                fixed_content = f"Here's my original input:\n{original_content}\n\n{combined_content}"
                last_message["content"] = fixed_content

                logger.info(f"开始处理 OpenAI 兼容流,使用模型: {target_model}")

                async for role, content in self.openai_client.stream_chat(
                    messages=openai_messages,
                    model=target_model,
                ):
                    response = {
                        "id": chat_id,
                        "object": "chat.completion.chunk",
                        "created": created_time,
                        "model": target_model,
                        "choices": [
                            {
                                "index": 0,
                                "delta": {"role": role, "content": content},
                            }
                        ],
                    }
                    await output_queue.put(
                        f"data: {json.dumps(response)}\n\n".encode("utf-8")
                    )
            except Exception as e:
                logger.error(f"处理 OpenAI 兼容流时发生错误: {e}")
            # 标记 OpenAI 任务结束
            logger.info("OpenAI 兼容任务处理完成,标记结束")
            await output_queue.put(None)

        # 创建并发任务
        asyncio.create_task(process_deepseek())
        asyncio.create_task(process_openai())

        # 等待两个任务完成
        finished_tasks = 0
        while finished_tasks < 2:
            item = await output_queue.get()
            if item is None:
                finished_tasks += 1
                continue
            yield item

        # 发送结束标记
        yield b"data: [DONE]\n\n"

    async def chat_completions_without_stream(
        self,
        messages: List[Dict[str, str]],
        model_arg: tuple[float, float, float, float],
        deepseek_model: str = "deepseek-reasoner",
        target_model: str = "",
    ) -> Dict[str, Any]:
        """处理非流式输出请求 (Gemini non-streaming, DeepSeek streaming)"""
        chat_id = f"chatcmpl-{hex(int(time.time() * 1000))[2:]}"
        created_time = int(time.time())

        try:
            # 1.  Get DeepSeek's reasoning *using streaming* (but collect all chunks).
            reasoning_chunks = []
            async for content_type, content in self.deepseek_client.stream_chat(
                messages, deepseek_model, self.is_origin_reasoning
            ):
                if content_type == "reasoning":
                    reasoning_chunks.append(content)
            reasoning = "".join(reasoning_chunks)


            # 2. Construct OpenAI's input messages.
            openai_messages = messages.copy()
            combined_content = (
                f"Here's my another model's reasoning process:\n{reasoning}\n\n"
                f"Based on this reasoning, provide your response directly to me:"
            )
            last_message = openai_messages[-1]
            original_content = last_message["content"]
            fixed_content = (
                f"Here's my original input:\n{original_content}\n\n{combined_content}"
            )
            last_message["content"] = fixed_content

            # 3. Call OpenAI's *non-streaming* method.
            openai_response = await self.openai_client.chat(
                messages=openai_messages, model=target_model
            )

            # 4. Construct the final response.
            full_response = {
                "id": chat_id,
                "object": "chat.completion",
                "created": created_time,
                "model": target_model,
                "choices": [
                    {
                        "index": 0,
                        "message": {
                            "role": "assistant",
                            "content": openai_response["choices"][0]["message"]["content"],
                            "reasoning_content": reasoning,  # Include DeepSeek's reasoning
                        },
                        "finish_reason": "stop",
                    }
                ],
                "usage": openai_response.get("usage", {}),  # Populate with actual usage data
            }
            return full_response

        except Exception as e:
            logger.error(f"处理非流式请求时发生错误: {e}")
            raise

我不懂编程,不知道有没有问题,但是暂时使用没出现什么异常.

一开始代码行数一下子少了20行,我怕有问题还确认了一下,深基米给出的答复是:

1. 代码行数减少的原因

之前的代码版本中,chat_completions_without_stream 方法为了获取非流式结果,采取了一种“迂回”的策略:

它首先调用 chat_completions_with_stream,这个函数 总是 使用流式处理。
然后,它从 chat_completions_with_stream 产生的流式数据块中,提取出 content 和 reasoning_content 部分,并将它们拼接起来。
最后,它将拼接后的结果组装成一个非流式的响应。
这种做法虽然能得到最终的非流式结果,但有两个缺点:

效率低: 即使不需要流式,也进行了完整的流式处理,增加了不必要的开销。
可能触发 Gemini 的流式审核: 即使最后组装成了非流式响应,但 Gemini 在处理过程中仍然是流式的,这可能触发与非流式请求不同的审核机制。
新的代码版本直接解决了这两个问题:

DeepSeek 保持流式: 使用 self.deepseek_client.stream_chat 获取 DeepSeek 的推理结果,并收集所有 reasoning chunks。这符合你的要求,即 DeepSeek 始终保持流式。
Gemini 使用非流式: 直接调用 self.openai_client.chat,这是明确的非流式请求,避免了 Gemini 的流式审核。
代码更简洁: 去掉了原来“迂回”策略中的冗余代码,例如不再需要处理 output_queue、process_openai 等与流式处理相关的逻辑。这就是代码行数减少的主要原因。
<!-- gh-comment-id:2726264131 --> @FFei-Zhou commented on GitHub (Mar 15, 2025): **最终在深蓝和哈基米的联手协助下,三次回复就搞定了,真的逆天** 下面是深度基米提供的解决方案: 将`openai_composite.py`的代码改成: ``` """OpenAI 兼容的组合模型服务,用于协调 DeepSeek 和其他 OpenAI 兼容模型的调用""" import asyncio import json import time from typing import AsyncGenerator, Dict, Any, List from app.clients import DeepSeekClient from app.clients.openai_compatible_client import OpenAICompatibleClient from app.utils.logger import logger class OpenAICompatibleComposite: """处理 DeepSeek 和其他 OpenAI 兼容模型的流式输出衔接""" def __init__( self, deepseek_api_key: str, openai_api_key: str, deepseek_api_url: str = "https://api.deepseek.com/v1/chat/completions", openai_api_url: str = "", # 将由具体实现提供 is_origin_reasoning: bool = True, proxy: str = None, ): """初始化 API 客户端 Args: deepseek_api_key: DeepSeek API密钥 openai_api_key: OpenAI 兼容服务的 API密钥 deepseek_api_url: DeepSeek API地址 openai_api_url: OpenAI 兼容服务的 API地址 is_origin_reasoning: 是否使用原始推理过程 proxy: 代理服务器地址 """ self.deepseek_client = DeepSeekClient(deepseek_api_key, deepseek_api_url, proxy=proxy) self.openai_client = OpenAICompatibleClient(openai_api_key, openai_api_url, proxy=proxy) self.is_origin_reasoning = is_origin_reasoning async def chat_completions_with_stream( self, messages: List[Dict[str, str]], model_arg: tuple[float, float, float, float], deepseek_model: str = "deepseek-reasoner", target_model: str = "", ) -> AsyncGenerator[bytes, None]: """处理完整的流式输出过程 (unchanged - still handles streaming for both)""" # 生成唯一的会话ID和时间戳 chat_id = f"chatcmpl-{hex(int(time.time() * 1000))[2:]}" created_time = int(time.time()) # 创建队列,用于收集输出数据 output_queue = asyncio.Queue() # 队列,用于传递 DeepSeek 推理内容 reasoning_queue = asyncio.Queue() # 用于存储 DeepSeek 的推理累积内容 reasoning_content = [] async def process_deepseek(): logger.info(f"开始处理 DeepSeek 流,使用模型:{deepseek_model}") try: async for content_type, content in self.deepseek_client.stream_chat( messages, deepseek_model, self.is_origin_reasoning ): if content_type == "reasoning": reasoning_content.append(content) response = { "id": chat_id, "object": "chat.completion.chunk", "created": created_time, "model": deepseek_model, "choices": [ { "index": 0, "delta": { "role": "assistant", "reasoning_content": content, "content": "", }, } ], } await output_queue.put( f"data: {json.dumps(response)}\n\n".encode("utf-8") ) elif content_type == "content": # 当收到 content 类型时,将完整的推理内容发送到 reasoning_queue logger.info( f"DeepSeek 推理完成,收集到的推理内容长度:{len(''.join(reasoning_content))}" ) await reasoning_queue.put("".join(reasoning_content)) break except Exception as e: logger.error(f"处理 DeepSeek 流时发生错误: {e}") await reasoning_queue.put("") # 标记 DeepSeek 任务结束 logger.info("DeepSeek 任务处理完成,标记结束") await output_queue.put(None) async def process_openai(): try: logger.info("等待获取 DeepSeek 的推理内容...") reasoning = await reasoning_queue.get() logger.debug( f"获取到推理内容,内容长度:{len(reasoning) if reasoning else 0}" ) if not reasoning: logger.warning("未能获取到有效的推理内容,将使用默认提示继续") reasoning = "获取推理内容失败" # 构造 OpenAI 的输入消息 openai_messages = messages.copy() combined_content = f""" Here's my another model's reasoning process:\n{reasoning}\n\n Based on this reasoning, provide your response directly to me:""" # 检查过滤后的消息列表是否为空 if not openai_messages: raise ValueError("消息列表为空,无法处理请求") # 获取最后一个消息并检查其角色 last_message = openai_messages[-1] if last_message.get("role", "") != "user": raise ValueError("最后一个消息的角色不是用户,无法处理请求") # 修改最后一个消息的内容 original_content = last_message["content"] fixed_content = f"Here's my original input:\n{original_content}\n\n{combined_content}" last_message["content"] = fixed_content logger.info(f"开始处理 OpenAI 兼容流,使用模型: {target_model}") async for role, content in self.openai_client.stream_chat( messages=openai_messages, model=target_model, ): response = { "id": chat_id, "object": "chat.completion.chunk", "created": created_time, "model": target_model, "choices": [ { "index": 0, "delta": {"role": role, "content": content}, } ], } await output_queue.put( f"data: {json.dumps(response)}\n\n".encode("utf-8") ) except Exception as e: logger.error(f"处理 OpenAI 兼容流时发生错误: {e}") # 标记 OpenAI 任务结束 logger.info("OpenAI 兼容任务处理完成,标记结束") await output_queue.put(None) # 创建并发任务 asyncio.create_task(process_deepseek()) asyncio.create_task(process_openai()) # 等待两个任务完成 finished_tasks = 0 while finished_tasks < 2: item = await output_queue.get() if item is None: finished_tasks += 1 continue yield item # 发送结束标记 yield b"data: [DONE]\n\n" async def chat_completions_without_stream( self, messages: List[Dict[str, str]], model_arg: tuple[float, float, float, float], deepseek_model: str = "deepseek-reasoner", target_model: str = "", ) -> Dict[str, Any]: """处理非流式输出请求 (Gemini non-streaming, DeepSeek streaming)""" chat_id = f"chatcmpl-{hex(int(time.time() * 1000))[2:]}" created_time = int(time.time()) try: # 1. Get DeepSeek's reasoning *using streaming* (but collect all chunks). reasoning_chunks = [] async for content_type, content in self.deepseek_client.stream_chat( messages, deepseek_model, self.is_origin_reasoning ): if content_type == "reasoning": reasoning_chunks.append(content) reasoning = "".join(reasoning_chunks) # 2. Construct OpenAI's input messages. openai_messages = messages.copy() combined_content = ( f"Here's my another model's reasoning process:\n{reasoning}\n\n" f"Based on this reasoning, provide your response directly to me:" ) last_message = openai_messages[-1] original_content = last_message["content"] fixed_content = ( f"Here's my original input:\n{original_content}\n\n{combined_content}" ) last_message["content"] = fixed_content # 3. Call OpenAI's *non-streaming* method. openai_response = await self.openai_client.chat( messages=openai_messages, model=target_model ) # 4. Construct the final response. full_response = { "id": chat_id, "object": "chat.completion", "created": created_time, "model": target_model, "choices": [ { "index": 0, "message": { "role": "assistant", "content": openai_response["choices"][0]["message"]["content"], "reasoning_content": reasoning, # Include DeepSeek's reasoning }, "finish_reason": "stop", } ], "usage": openai_response.get("usage", {}), # Populate with actual usage data } return full_response except Exception as e: logger.error(f"处理非流式请求时发生错误: {e}") raise ``` 我不懂编程,不知道有没有问题,但是暂时使用没出现什么异常. 一开始代码行数一下子少了20行,我怕有问题还确认了一下,深基米给出的答复是: ``` 1. 代码行数减少的原因 之前的代码版本中,chat_completions_without_stream 方法为了获取非流式结果,采取了一种“迂回”的策略: 它首先调用 chat_completions_with_stream,这个函数 总是 使用流式处理。 然后,它从 chat_completions_with_stream 产生的流式数据块中,提取出 content 和 reasoning_content 部分,并将它们拼接起来。 最后,它将拼接后的结果组装成一个非流式的响应。 这种做法虽然能得到最终的非流式结果,但有两个缺点: 效率低: 即使不需要流式,也进行了完整的流式处理,增加了不必要的开销。 可能触发 Gemini 的流式审核: 即使最后组装成了非流式响应,但 Gemini 在处理过程中仍然是流式的,这可能触发与非流式请求不同的审核机制。 新的代码版本直接解决了这两个问题: DeepSeek 保持流式: 使用 self.deepseek_client.stream_chat 获取 DeepSeek 的推理结果,并收集所有 reasoning chunks。这符合你的要求,即 DeepSeek 始终保持流式。 Gemini 使用非流式: 直接调用 self.openai_client.chat,这是明确的非流式请求,避免了 Gemini 的流式审核。 代码更简洁: 去掉了原来“迂回”策略中的冗余代码,例如不再需要处理 output_queue、process_openai 等与流式处理相关的逻辑。这就是代码行数减少的主要原因。 ```
Author
Owner

@FFei-Zhou commented on GitHub (Mar 15, 2025):

希望可以帮到有需要的兄弟

<!-- gh-comment-id:2726264280 --> @FFei-Zhou commented on GitHub (Mar 15, 2025): **希望可以帮到有需要的兄弟**
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/DeepClaude#69
No description provided.