Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions models/tongyi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@ Tongyi Qwen, developed by Alibaba Cloud, is a sophisticated series of LLMs. It i
After installation, you need to get API keys from [Alibaba Cloud](https://bailian.console.aliyun.com/?apiKey=1#/api-key) and setup in Settings -> Model Provider.

![](_assets/tongyi_latest.png)

# Speech-to-text runtime note

Tongyi speech-to-text patches DashScope's async-to-sync websocket bridge when it runs under Dify's gevent-based plugin runtime. This avoids sustained high CPU after STT requests while keeping the normal in-process recognition path.

If you need stronger isolation for STT recognition, set `TONGYI_STT_SUBPROCESS=1` in the plugin daemon environment. `TONGYI_STT_RECOGNITION_TIMEOUT` can be used to tune the subprocess timeout.
4 changes: 2 additions & 2 deletions models/tongyi/manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ resource:
model:
enabled: false
type: plugin
version: 0.2.0
created_at: "2026-06-04T10:13:50.29298939+08:00"
version: 0.2.1
created_at: "2026-06-04T10:13:50.29298939+08:00"
108 changes: 108 additions & 0 deletions models/tongyi/models/speech2text/speech2text.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
import os
import subprocess
import sys
Expand All @@ -14,6 +15,8 @@

from ..constant import BURY_POINT_HEADER

logger = logging.getLogger(__name__)

_AUDIO_MAGIC = [
(0, b"fLaC", "flac"),
(0, b"ID3", "mp3"),
Expand Down Expand Up @@ -47,13 +50,117 @@
_SUBPROCESS_TRUE_VALUES = {"1", "true", "yes", "on"}
_RECOGNITION_TIMEOUT_ENV = "TONGYI_STT_RECOGNITION_TIMEOUT"
_DEFAULT_RECOGNITION_TIMEOUT = 120
_DASHSCOPE_ASYNC_BRIDGE_PATCHED = False


def _is_subprocess_enabled() -> bool:
value = os.getenv(_SUBPROCESS_ENV)
return value is not None and value.strip().lower() in _SUBPROCESS_TRUE_VALUES


def _patch_dashscope_async_bridge_for_gevent() -> None:
"""Patch DashScope's global async bridge to use native threads under gevent.

This replaces ``dashscope.common.utils.iter_over_async`` for the current
plugin process. It affects all subsequent DashScope calls that use that
helper, and is intentionally guarded so it is only applied once.
"""

global _DASHSCOPE_ASYNC_BRIDGE_PATCHED
if _DASHSCOPE_ASYNC_BRIDGE_PATCHED:
return

try:
from gevent import monkey as gevent_monkey
except ImportError:
_DASHSCOPE_ASYNC_BRIDGE_PATCHED = True
return
except Exception as ex:
logger.warning("DashScope async bridge patch skipped: failed to import gevent: %s", ex)
_DASHSCOPE_ASYNC_BRIDGE_PATCHED = True
return

if not gevent_monkey.is_module_patched("threading"):
_DASHSCOPE_ASYNC_BRIDGE_PATCHED = True
return

import asyncio
import queue
import threading

try:
import dashscope.common.utils as dashscope_utils
from dashscope.api_entities.dashscope_response import DashScopeAPIResponse
from dashscope.common.logging import logger as dashscope_logger
except ImportError as ex:
logger.warning("DashScope async bridge patch skipped: incompatible DashScope SDK: %s", ex)
_DASHSCOPE_ASYNC_BRIDGE_PATCHED = True
return
except Exception as ex:
logger.warning(
"DashScope async bridge patch skipped: failed to import DashScope internals: %s", ex
)
_DASHSCOPE_ASYNC_BRIDGE_PATCHED = True
return

try:
native_thread = gevent_monkey.get_original("threading", "Thread")
native_queue = gevent_monkey.get_original("queue", "Queue")
except Exception as ex:
logger.warning(
"DashScope async bridge patch falling back to current thread primitives: %s", ex
)
native_thread = threading.Thread
native_queue = queue.Queue

def iter_over_async_with_native_bridge(ait):
loop = asyncio.new_event_loop()
ait = ait.__aiter__()

async def get_next():
try:
obj = await ait.__anext__()
return False, obj
except StopAsyncIteration:
return True, None

def iter_thread(loop, message_queue):
asyncio.set_event_loop(loop)
try:
while True:
try:
done, obj = loop.run_until_complete(get_next())
if done:
message_queue.put((True, None, None))
break
message_queue.put((False, None, obj))
except BaseException as ex: # noqa: E722
dashscope_logger.exception(ex)
message_queue.put((True, ex, None))
break
finally:
loop.close()

message_queue = native_queue()
worker = native_thread(
target=iter_thread, args=(loop, message_queue), name="dashscope_iter_async_thread"
)
worker.daemon = True
worker.start()
while True:
finished, error, obj = message_queue.get()
if finished:
if error is not None:
yield DashScopeAPIResponse(
-1, "", "Unknown", message=f"Error type: {type(error)}, message: {error}"
)
break
yield obj

dashscope_utils.iter_over_async = iter_over_async_with_native_bridge
_DASHSCOPE_ASYNC_BRIDGE_PATCHED = True


def _get_recognition_timeout() -> int:
value = os.getenv(_RECOGNITION_TIMEOUT_ENV)
if not value:
Expand Down Expand Up @@ -173,6 +280,7 @@ def _invoke(
raise ValueError(data or "Unknown error in STT worker subprocess")
sentence_list = data
else:
_patch_dashscope_async_bridge_for_gevent()
recognition = Recognition(
model=str(model),
format=str(audio_format),
Expand Down
2 changes: 1 addition & 1 deletion models/tongyi/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ requires-python = ">=3.12"
dependencies = [
"dify_plugin>=0.9.0",
"numpy>=2.4.6",
"dashscope>=1.25.18",
"dashscope>=1.25.18,<1.26.0",
"openai>=2.38.0",
"pydub>=0.25.1",
"pyyaml>=6.0.3",
Expand Down
120 changes: 77 additions & 43 deletions models/tongyi/tests/test_speech2text.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import sys
from io import BytesIO, StringIO
from collections.abc import AsyncIterator
from unittest.mock import MagicMock, patch

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
Expand Down Expand Up @@ -47,6 +48,10 @@ def _worker_payload() -> dict:
}


def _without_subprocess_env() -> dict:
return {key: value for key, value in os.environ.items() if key != "TONGYI_STT_SUBPROCESS"}


def test_get_audio_type_prefers_magic_bytes_without_decoding() -> None:
file_obj = _named_bytes(b"RIFF\x24\x00\x00\x00WAVE" + b"\x00" * 16, "temp.mp3")
file_obj.seek(5)
Expand Down Expand Up @@ -79,16 +84,19 @@ def test_invoke_reuses_detected_audio_format_for_decoding() -> None:
recognition = MagicMock()
recognition.call.return_value = result

with patch("models.speech2text.speech2text.AudioSegment.from_file", return_value=audio) as decode_mock:
with patch("models.speech2text.speech2text.Recognition", return_value=recognition):
assert (
_model()._invoke(
model="paraformer-realtime-v1",
credentials={"dashscope_api_key": "test-key"},
file=file_obj,
with patch.dict(os.environ, {"TONGYI_STT_SUBPROCESS": "false"}):
with patch(
"models.speech2text.speech2text.AudioSegment.from_file", return_value=audio
) as decode_mock:
with patch("models.speech2text.speech2text.Recognition", return_value=recognition):
assert (
_model()._invoke(
model="paraformer-realtime-v1",
credentials={"dashscope_api_key": "test-key"},
file=file_obj,
)
== "hello"
)
== "hello"
)

decode_mock.assert_called_once()
assert decode_mock.call_args.kwargs["format"] == "wav"
Expand All @@ -102,22 +110,72 @@ def test_invoke_normalizes_non_dict_sentences() -> None:
recognition = MagicMock()
recognition.call.return_value = result

with patch("models.speech2text.speech2text.AudioSegment.from_file", return_value=audio):
with patch("models.speech2text.speech2text.Recognition", return_value=recognition):
assert (
_model()._invoke(
model="paraformer-realtime-v1",
credentials={"dashscope_api_key": "test-key"},
file=file_obj,
with patch.dict(os.environ, {"TONGYI_STT_SUBPROCESS": "false"}):
with patch("models.speech2text.speech2text.AudioSegment.from_file", return_value=audio):
with patch("models.speech2text.speech2text.Recognition", return_value=recognition):
assert (
_model()._invoke(
model="paraformer-realtime-v1",
credentials={"dashscope_api_key": "test-key"},
file=file_obj,
)
== "hello\nworld"
)
== "hello\nworld"
)


def test_invoke_uses_subprocess_when_env_set() -> None:
def test_invoke_patches_dashscope_async_bridge_by_default() -> None:
audio = MagicMock(frame_rate=16000)
result = MagicMock()
result.get_sentence.return_value = [{"text": "hello"}]
recognition = MagicMock()
recognition.call.return_value = result

with patch.dict(os.environ, _without_subprocess_env(), clear=True):
with patch("models.speech2text.speech2text.AudioSegment.from_file", return_value=audio):
with patch("models.speech2text.speech2text.Recognition", return_value=recognition):
with patch(
"models.speech2text.speech2text._patch_dashscope_async_bridge_for_gevent"
) as patch_mock:
with patch(
"models.speech2text.speech2text._run_recognition_in_subprocess",
side_effect=AssertionError("subprocess path should stay disabled"),
) as run_mock:
result_text = _model()._invoke(
model="paraformer-realtime-v1",
credentials={"dashscope_api_key": "test-key"},
file=_wav_file(),
)

assert result_text == "hello"
patch_mock.assert_called_once()
run_mock.assert_not_called()


def test_dashscope_async_bridge_patch_yields_results() -> None:
import dashscope.common.utils as dashscope_utils
from models.speech2text import speech2text as st2

async def stream() -> AsyncIterator[str]:
yield "hello"
yield "world"

original_iter_over_async = dashscope_utils.iter_over_async
original_flag = st2._DASHSCOPE_ASYNC_BRIDGE_PATCHED
st2._DASHSCOPE_ASYNC_BRIDGE_PATCHED = False
try:
st2._patch_dashscope_async_bridge_for_gevent()

assert list(dashscope_utils.iter_over_async(stream())) == ["hello", "world"]
finally:
dashscope_utils.iter_over_async = original_iter_over_async
st2._DASHSCOPE_ASYNC_BRIDGE_PATCHED = original_flag


def test_invoke_uses_subprocess_when_env_is_true() -> None:
from models.speech2text import speech2text as st2

audio = MagicMock(frame_rate=16000)

with patch.dict(os.environ, {"TONGYI_STT_SUBPROCESS": "1"}):
with patch("models.speech2text.speech2text.AudioSegment.from_file", return_value=audio):
with patch(
Expand All @@ -140,30 +198,6 @@ def test_invoke_uses_subprocess_when_env_set() -> None:
assert headers is not st2.BURY_POINT_HEADER


def test_invoke_does_not_use_subprocess_when_env_is_zero() -> None:
audio = MagicMock(frame_rate=16000)
result = MagicMock()
result.get_sentence.return_value = [{"text": "hello"}]
recognition = MagicMock()
recognition.call.return_value = result

with patch.dict(os.environ, {"TONGYI_STT_SUBPROCESS": "0"}):
with patch("models.speech2text.speech2text.AudioSegment.from_file", return_value=audio):
with patch("models.speech2text.speech2text.Recognition", return_value=recognition):
with patch(
"models.speech2text.speech2text._run_recognition_in_subprocess",
side_effect=AssertionError("subprocess path should stay disabled"),
) as run_mock:
result_text = _model()._invoke(
model="paraformer-realtime-v1",
credentials={"dashscope_api_key": "test-key"},
file=_wav_file(),
)

assert result_text == "hello"
run_mock.assert_not_called()


def test_invoke_raises_when_subprocess_returns_error() -> None:
audio = MagicMock(frame_rate=16000)
with patch.dict(os.environ, {"TONGYI_STT_SUBPROCESS": "1"}):
Expand Down
2 changes: 1 addition & 1 deletion models/tongyi/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading