import asyncio
import edge_tts
import os
import argparse
import json
import re
from pathlib import Path
from pydub import AudioSegment
import logging
from datetime import datetime, timedelta
from tqdm import tqdm
# Configure the log system(
level=,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
("edge_tts.log", encoding='utf-8'),
()
]
)
logger = (__name__)
#Path configurationCACHE_FILE = () / ".edge_tts_voices.cache"
DEFAULT_OUTPUT_DIR = Path(r"C:\App\tts\Edge-TTS")
CACHE_EXPIRE_HOURS = 24
#Segmented ParametersMAX_SEGMENT_LENGTH = 500 # Maximum single segment lengthMIN_SEGMENT_LENGTH = 50 # Minimum merge lengthDELIMITER_PRIORITY = ['\n', '。', '!', '!', '?', '?', ';', ';', ',', ',']
IGNORE_PATTERNS = [
r'(?<=\d)\.(?=\d)', # Match the decimal point (before and after all) r'\b[a-zA-Z]\.(?=\s)', # Match the English abbreviation (such as "Mr." with spaces after it) r'https?://\S+', # Match the full URL r'www\.\S+\.\w{2,}' # Match URLs starting with www]
async def get_voices(force_refresh=False) -> list:
"""Dynamic fetch and cache voice list"""
def should_refresh():
if force_refresh or not CACHE_FILE.exists():
return True
cache_time = (CACHE_FILE.stat().st_mtime)
return () > cache_time + timedelta(hours=CACHE_EXPIRE_HOURS)
if not should_refresh():
try:
with open(CACHE_FILE, 'r', encoding='utf-8') as f:
return (f)
except Exception as e:
(f"Cache read failed:{str(e)}")
try:
voices = await edge_tts.list_voices()
chinese_voices = []
for v in voices:
if v['Locale'].lower().startswith('zh'):
tags = []
if "liaoning" in v["ShortName"].lower():
("Liaoning Dialect")
if "shaanxi" in v["ShortName"].lower():
("Shaanxi dialect")
if "HK" in v["ShortName"]:
("Cantonese")
if "TW" in v["ShortName"]:
("* accent")
if "Xiao" in v["ShortName"]:
("Young voice")
chinese_voices.append({
"key": v["ShortName"],
"name": ("LocalName") or v["ShortName"],
"gender": "male" if v["Gender"] == "Male" else "female",
"tags": tags,
"locale": v["Locale"]
})
# Save cache DEFAULT_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
with open(CACHE_FILE, 'w', encoding='utf-8') as f:
(chinese_voices, f, ensure_ascii=False, indent=2)
return chinese_voices
except Exception as e:
(f"Voice acquisition failed:{str(e)}")
if CACHE_FILE.exists():
with open(CACHE_FILE, 'r', encoding='utf-8') as f:
return (f)
raise RuntimeError("Unable to get voice list and no cache is available")
def format_voice_list(voices: list) -> str:
"""Format display voice list"""
output = ["\nSupported Chinese pronunciation model (using -v all to generate all):"]
categories = {
"Standard Mandarin": lambda v: not v["tags"],
"Dialect Features": lambda v: any(t in v["tags"] for t in ["Liaoning Dialect", "Shaanxi dialect"]),
"regional pronunciation": lambda v: any(t in v["tags"] for t in ["Cantonese", "* accent"]),
"Special voice": lambda v: "Young voice" in v["tags"]
}
for cat, condition in ():
(f"\n【{cat}】")
for v in filter(condition, voices):
tags = " | ".join(v["tags"]) if v["tags"] else "standard"
(f"{v['key'].ljust(28)} {v['name']} ({v['gender']}) [
python
edge-tts
voice]")
return "\n".join(output)
def smart_split_text(text: str) -> list:
"""Enhanced intelligent segmentation algorithm"""
# Preprocess text
text = (r'\n{2,}', '\n', ()) # Merge multiple empty lines
chunks = []
current_chunk = []
current_length = 0
buffer = []
for char in text:
(char)
current_length += 1
# Find the split point when the maximum length is reached
if current_length >= MAX_SEGMENT_LENGTH:
split_pos = None
# Find the best split point in reverse
for i in range(len(buffer)-1, 0, -1):
if buffer[i] in DELIMITER_PRIORITY:
if any((p, ''.join(buffer[:i+1])) for p in IGNORE_PATTERNS):
Continue continue
split_pos = i+1
break
if split_pos:
(''.join(buffer[:split_pos]))
buffer = buffer[split_pos:]
current_length = len(buffer)
else:
# Forced segmentation
(''.join(buffer))
buffer = []
current_length = 0
# Process the remaining content
if buffer:
(''.join(buffer))
# Secondary merge too short paragraph
merged = []
temp_buffer = []
for chunk in chunks:
chunk = ()
if not chunk:
Continue continue
if len(chunk) < MIN_SEGMENT_LENGTH:
temp_buffer.append(chunk)
if sum(len(c) for c in temp_buffer) >= MAX_SEGMENT_LENGTH:
(' '.join(temp_buffer))
temp_buffer = []
else:
if temp_buffer:
(' '.join(temp_buffer))
temp_buffer = []
(chunk)
if temp_buffer:
(' '.join(temp_buffer))
Return merged
async def convert_text(input_file: Path, voice: str):
"""Core conversion logic"""
output_path = DEFAULT_OUTPUT_DIR / f"{input_file.stem}.{voice}.mp3"
output_path.(parents=True, exist_ok=True)
if output_path.exists():
(f"Skip existing file: {output_path.name}")
Return
try:
# Read text file
with open(input_file, 'r', encoding='utf-8', errors='ignore') as f:
text = ().strip()
if not text:
raise ValueError("Input file is empty")
(f"Original text length: {len(text)} characters")
# Intelligent segmentation
chunks = smart_split_text(text)
(f"Generate valid segments: {len(chunks)}")
# Segmented configuration
semaphore = (5) # Concurrency Limit
timeout = 30000 # Single request timeout
max_retries = 3 # Maximum number of retries
async def process_chunk(index, chunk):
async with semaphore:
temp_path = output_path.with_name(f"temp_{index:04d}.mp3")
for attempt in range(max_retries):
try:
communicate = edge_tts.Communicate(chunk, voice)
await asyncio.wait_for((temp_path), timeout)
(f"Segment {index} was generated successfully")
return temp_path
except Exception as e:
(f"Segmentation {index}The {attempt+1} attempt failed: {str(e)}")
if attempt == max_retries - 1:
(f"Segmentation {index}Finally failed")
return None
await (1)
# Perform parallel conversion
tasks = [process_chunk(i, c) for i, c in enumerate(chunks)]
temp_files = await (*tasks)
# Merge audio files
valid_files = [tf for tf in temp_files if tf and ()]
if not valid_files:
raise RuntimeError("All segment generation failed")
combined = ()
for tf in valid_files:
audio = AudioSegment.from_mp3(tf)
combined += audio.fade_in(50).fade_out(50)
()
(output_path, format="mp3", bitrate="192k")
(f"Final audio duration: {len(combined)/1000:.2f} seconds")
except Exception as e:
(f"Conversion failed: {str(e)}")
if output_path.exists():
output_path.unlink()
Raise
async def batch_convert(input_file: Path):
"""Batch generation of all voice versions"""
voices = await get_voices()
(f"Start to generate {len(voices)} voice version...")
with tqdm(total=len(voices), desc="conversion progress", unit="voice") as pbar:
for voice in voices:
output_path = DEFAULT_OUTPUT_DIR / f"{input_file.stem}.{voice['key']}.mp3"
pbar.set_postfix_str(f"Current: {voice['key']}")
if output_path.exists():
(1)
Continue continue
try:
await convert_text(input_file, voice['key'])
except Exception as e:
(f"{voice['key']} Generation failed: {str(e)}")
Finally:
(1)
def main():
"""Main entry function"""
parser = (
description="Edge-TTS batch generation tool v2.0",
formatter_class=
)
parser.add_argument("input", nargs='?', help="enter text file path")
parser.add_argument("-v", "--voice", help="Specify the voice model (use all to generate all)")
parser.add_argument("-l", "--list", action='store_true', help="Show a list of available voices")
parser.add_argument("-f", "--force", action='store_true', help="force refresh of voice cache")
args = parser.parse_args()
if :
try:
voices = (get_voices())
print(format_voice_list(voices))
except Exception as e:
(str(e))
Return
if not or not :
("Enter file and voice parameters must be specified")
("Example:")
(' python edge_tts.py "C:\\" -v zh-CN-XiaoxiaoNeural')
(' python edge_tts.py "C:\\" -v all')
Return
input_path = Path()
if not input_path.exists():
(f"File does not exist: {input_path}")
Return
try:
if () == "all":
(batch_convert(input_path))
else:
voices = (get_voices())
if not any(v['key'] == for v in voices):
("Invalid voice model, available options: \n" + format_voice_list(voices))
Return
(convert_text(input_path, ))
except Exception as e:
(f"Fatal Error: {str(e)}")
if __name__ == "__main__":
main()