SoFunction
Updated on 2025-05-09

Python uses edge-tts to implement text to voice function

import asyncio
import edge_tts
import os
import argparse
import json
import re
from pathlib import Path
from pydub import AudioSegment
import logging
from datetime import datetime, timedelta
from tqdm import tqdm

# Configure the log system(
    level=,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        ("edge_tts.log", encoding='utf-8'),
        ()
    ]
)
logger = (__name__)

#Path configurationCACHE_FILE = () / ".edge_tts_voices.cache"
DEFAULT_OUTPUT_DIR = Path(r"C:\App\tts\Edge-TTS")
CACHE_EXPIRE_HOURS = 24

#Segmented ParametersMAX_SEGMENT_LENGTH = 500  # Maximum single segment lengthMIN_SEGMENT_LENGTH = 50   # Minimum merge lengthDELIMITER_PRIORITY = ['\n', '。', '!', '!', '?', '?', ';', ';', ',', ',']
IGNORE_PATTERNS = [
    r'(?<=\d)\.(?=\d)',       # Match the decimal point (before and after all)    r'\b[a-zA-Z]\.(?=\s)',    # Match the English abbreviation (such as "Mr." with spaces after it)    r'https?://\S+',          # Match the full URL    r'www\.\S+\.\w{2,}'       # Match URLs starting with www]

async def get_voices(force_refresh=False) -> list:
    """Dynamic fetch and cache voice list"""
    def should_refresh():
        if force_refresh or not CACHE_FILE.exists():
            return True
        cache_time = (CACHE_FILE.stat().st_mtime)
        return () > cache_time + timedelta(hours=CACHE_EXPIRE_HOURS)

    if not should_refresh():
        try:
            with open(CACHE_FILE, 'r', encoding='utf-8') as f:
                return (f)
        except Exception as e:
            (f"Cache read failed:{str(e)}")

    try:
        voices = await edge_tts.list_voices()
        chinese_voices = []

        for v in voices:
            if v['Locale'].lower().startswith('zh'):
                tags = []
                if "liaoning" in v["ShortName"].lower():
                    ("Liaoning Dialect")
                if "shaanxi" in v["ShortName"].lower():
                    ("Shaanxi dialect")
                if "HK" in v["ShortName"]:
                    ("Cantonese")
                if "TW" in v["ShortName"]:
                    ("* accent")
                if "Xiao" in v["ShortName"]:
                    ("Young voice")

                chinese_voices.append({
                    "key": v["ShortName"],
                    "name": ("LocalName") or v["ShortName"],
                    "gender": "male" if v["Gender"] == "Male" else "female",
                    "tags": tags,
                    "locale": v["Locale"]
                })

        # Save cache        DEFAULT_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
        with open(CACHE_FILE, 'w', encoding='utf-8') as f:
            (chinese_voices, f, ensure_ascii=False, indent=2)

        return chinese_voices

    except Exception as e:
        (f"Voice acquisition failed:{str(e)}")
        if CACHE_FILE.exists():
            with open(CACHE_FILE, 'r', encoding='utf-8') as f:
                return (f)
        raise RuntimeError("Unable to get voice list and no cache is available")

def format_voice_list(voices: list) -> str:
    """Format display voice list"""
    output = ["\nSupported Chinese pronunciation model (using -v all to generate all):"]

    categories = {
        "Standard Mandarin": lambda v: not v["tags"],
        "Dialect Features": lambda v: any(t in v["tags"] for t in ["Liaoning Dialect", "Shaanxi dialect"]),
        "regional pronunciation": lambda v: any(t in v["tags"] for t in ["Cantonese", "* accent"]),
        "Special voice": lambda v: "Young voice" in v["tags"]
    }

    for cat, condition in ():
        (f"\n【{cat}】")
        for v in filter(condition, voices):
            tags = " | ".join(v["tags"]) if v["tags"] else "standard"
            (f"{v['key'].ljust(28)} {v['name']} ({v['gender']}) [
  • python
  • edge-tts
  • voice
  • ]") return "\n".join(output) def smart_split_text(text: str) -> list: """Enhanced intelligent segmentation algorithm""" # Preprocess text text = (r'\n{2,}', '\n', ()) # Merge multiple empty lines chunks = [] current_chunk = [] current_length = 0 buffer = [] for char in text: (char) current_length += 1 # Find the split point when the maximum length is reached if current_length >= MAX_SEGMENT_LENGTH: split_pos = None # Find the best split point in reverse for i in range(len(buffer)-1, 0, -1): if buffer[i] in DELIMITER_PRIORITY: if any((p, ''.join(buffer[:i+1])) for p in IGNORE_PATTERNS): Continue continue split_pos = i+1 break if split_pos: (''.join(buffer[:split_pos])) buffer = buffer[split_pos:] current_length = len(buffer) else: # Forced segmentation (''.join(buffer)) buffer = [] current_length = 0 # Process the remaining content if buffer: (''.join(buffer)) # Secondary merge too short paragraph merged = [] temp_buffer = [] for chunk in chunks: chunk = () if not chunk: Continue continue if len(chunk) < MIN_SEGMENT_LENGTH: temp_buffer.append(chunk) if sum(len(c) for c in temp_buffer) >= MAX_SEGMENT_LENGTH: (' '.join(temp_buffer)) temp_buffer = [] else: if temp_buffer: (' '.join(temp_buffer)) temp_buffer = [] (chunk) if temp_buffer: (' '.join(temp_buffer)) Return merged async def convert_text(input_file: Path, voice: str): """Core conversion logic""" output_path = DEFAULT_OUTPUT_DIR / f"{input_file.stem}.{voice}.mp3" output_path.(parents=True, exist_ok=True) if output_path.exists(): (f"Skip existing file: {output_path.name}") Return try: # Read text file with open(input_file, 'r', encoding='utf-8', errors='ignore') as f: text = ().strip() if not text: raise ValueError("Input file is empty") (f"Original text length: {len(text)} characters") # Intelligent segmentation chunks = smart_split_text(text) (f"Generate valid segments: {len(chunks)}") # Segmented configuration semaphore = (5) # Concurrency Limit timeout = 30000 # Single request timeout max_retries = 3 # Maximum number of retries async def process_chunk(index, chunk): async with semaphore: temp_path = output_path.with_name(f"temp_{index:04d}.mp3") for attempt in range(max_retries): try: communicate = edge_tts.Communicate(chunk, voice) await asyncio.wait_for((temp_path), timeout) (f"Segment {index} was generated successfully") return temp_path except Exception as e: (f"Segmentation {index}The {attempt+1} attempt failed: {str(e)}") if attempt == max_retries - 1: (f"Segmentation {index}Finally failed") return None await (1) # Perform parallel conversion tasks = [process_chunk(i, c) for i, c in enumerate(chunks)] temp_files = await (*tasks) # Merge audio files valid_files = [tf for tf in temp_files if tf and ()] if not valid_files: raise RuntimeError("All segment generation failed") combined = () for tf in valid_files: audio = AudioSegment.from_mp3(tf) combined += audio.fade_in(50).fade_out(50) () (output_path, format="mp3", bitrate="192k") (f"Final audio duration: {len(combined)/1000:.2f} seconds") except Exception as e: (f"Conversion failed: {str(e)}") if output_path.exists(): output_path.unlink() Raise async def batch_convert(input_file: Path): """Batch generation of all voice versions""" voices = await get_voices() (f"Start to generate {len(voices)} voice version...") with tqdm(total=len(voices), desc="conversion progress", unit="voice") as pbar: for voice in voices: output_path = DEFAULT_OUTPUT_DIR / f"{input_file.stem}.{voice['key']}.mp3" pbar.set_postfix_str(f"Current: {voice['key']}") if output_path.exists(): (1) Continue continue try: await convert_text(input_file, voice['key']) except Exception as e: (f"{voice['key']} Generation failed: {str(e)}") Finally: (1) def main(): """Main entry function""" parser = ( description="Edge-TTS batch generation tool v2.0", formatter_class= ) parser.add_argument("input", nargs='?', help="enter text file path") parser.add_argument("-v", "--voice", help="Specify the voice model (use all to generate all)") parser.add_argument("-l", "--list", action='store_true', help="Show a list of available voices") parser.add_argument("-f", "--force", action='store_true', help="force refresh of voice cache") args = parser.parse_args() if : try: voices = (get_voices()) print(format_voice_list(voices)) except Exception as e: (str(e)) Return if not or not : ("Enter file and voice parameters must be specified") ("Example:") (' python edge_tts.py "C:\\" -v zh-CN-XiaoxiaoNeural') (' python edge_tts.py "C:\\" -v all') Return input_path = Path() if not input_path.exists(): (f"File does not exist: {input_path}") Return try: if () == "all": (batch_convert(input_path)) else: voices = (get_voices()) if not any(v['key'] == for v in voices): ("Invalid voice model, available options: \n" + format_voice_list(voices)) Return (convert_text(input_path, )) except Exception as e: (f"Fatal Error: {str(e)}") if __name__ == "__main__": main()