深浅模式
python
import os
import requests
import json
import pandas as pd
from dotenv import load_dotenv
class AIEngine:
def __init__(self):
# 1. 初始化:加载配置
load_dotenv()
self.api_key = os.getenv("DEEPSEEK_API_KEY")
self.url = "https://api.deepseek.com/chat/completions"
if not self.api_key:
raise ValueError("错误:未在 .env 中找到 API_KEY")
def _get_headers(self):
"""私有方法:构建请求头"""
return {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
def ask_ai(self, prompt, system_prompt="你是一个专业的助手"):
"""通用调用接口"""
data = {
"model": "deepseek-chat",
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
],
"temperature": 0.7
}
try:
response = requests.post(
self.url,
headers=self._get_headers(),
data=json.dumps(data),
timeout=30 # 增加超时控制
)
if response.status_code == 200:
return response.json()["choices"][0]["message"]["content"]
return f"Error: {response.status_code}"
except Exception as e:
return f"Exception: {str(e)}"
def batch_analyze_excel(self, file_path, target_col, output_path):
"""高级功能:批量处理 Excel"""
df = pd.read_excel(file_path)
print(f"开始处理 {len(df)} 条数据...")
# 这里的 lambda 调用类内部的 ask_ai
df['AI_Result'] = df[target_col].apply(lambda x: self.ask_ai(f"分析以下内容:{x}"))
df.to_excel(output_path, index=False)
print(f"处理完成,保存至:{output_path}")
# --- 测试代码 ---
if __name__ == "__main__":
engine = AIEngine()
# 简单的单次调用测试
res = engine.ask_ai("你好,请自我介绍")
print(res)python
from fastapi import FastAPI
from pydantic import BaseModel
from ai_engine import AIEngine # 导入你刚刚重构的类
# 1. 初始化 FastAPI 实例
app = FastAPI(title="我的 AI 业务接口")
# 2. 实例化你的 AI 引擎
engine = AIEngine()
# 3. 定义请求数据的格式(类似 Java 的 DTO 实体类)
class Question(BaseModel):
prompt: str
system_message: str = "你是一个专业的后端助手"
# 4. 定义接口路由
@app.post("/ask")
async def ask_question(item: Question):
# 调用你之前封装好的方法
answer = engine.ask_ai(item.prompt, system_prompt=item.system_message)
return {
"status": "success",
"data": answer
}
# 运行提示:在终端执行 uvicorn server:app --reloadpython
from ai_engine import AIEngine
# 初始化引擎
ai = AIEngine()
advice = ai.ask_ai("用户逾期 30 天且拒绝还款,怎么处理?", system_prompt = "你是一个资深的金融催收专家")
print(advice)