-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.py
83 lines (73 loc) · 3.36 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import json
import os
from dotenv import load_dotenv
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse, PlainTextResponse
from fastapi.websockets import WebSocketState
# from llm import LlmClient
from llm_with_func_calling import LlmClient
from twilio_server import TwilioClient
from retellclient.models import operations
from twilio.twiml.voice_response import VoiceResponse
import asyncio
load_dotenv()
app = FastAPI()
llm_client = LlmClient()
# twilio_client = TwilioClient() #Uncomment to use twilio client
# twilio_client.create_phone_number(213, os.environ['RETELL_AGENT_ID'])
# twilio_client.delete_phone_number("+12133548310")
# twilio_client.register_phone_agent("+14154750418", os.environ['RETELL_AGENT_ID'])
# twilio_client.create_phone_call("+14154750418", "+13123156212", os.environ['RETELL_AGENT_ID'])
@app.post("/twilio-voice-webhook/{agent_id_path}")
async def handle_twilio_voice_webhook(request: Request, agent_id_path: str):
try:
# Check if it is machine
post_data = await request.form()
if 'AnsweredBy' in post_data and post_data['AnsweredBy'] == "machine_start":
twilio_client.end_call(post_data['CallSid'])
return PlainTextResponse("")
call_response = twilio_client.retell.register_call(operations.RegisterCallRequestBody(
agent_id=agent_id_path,
audio_websocket_protocol="twilio",
audio_encoding="mulaw",
sample_rate=8000
))
if call_response.call_detail:
response = VoiceResponse()
start = response.connect()
start.stream(url=f"wss://api.retellai.com/audio-websocket/{call_response.call_detail.call_id}")
return PlainTextResponse(str(response), media_type='text/xml')
except Exception as err:
print(f"Error in twilio voice webhook: {err}")
return JSONResponse(status_code=500, content={"message": "Internal Server Error"})
@app.websocket("/llm-websocket/{call_id}")
async def websocket_handler(websocket: WebSocket, call_id: str):
await websocket.accept()
print(f"Handle llm ws for: {call_id}")
# send first message to signal ready of server
response_id = 0
first_event = llm_client.draft_begin_messsage()
await websocket.send_text(json.dumps(first_event))
async def stream_response(request):
nonlocal response_id
for event in llm_client.draft_response(request):
await websocket.send_text(json.dumps(event))
if request['response_id'] < response_id:
return # new response needed, abondon this one
try:
while True:
message = await websocket.receive_text()
request = json.loads(message)
# print out transcript
os.system('cls' if os.name == 'nt' else 'clear')
print(json.dumps(request, indent=4))
if 'response_id' not in request:
continue # no response needed, process live transcript update if needed
response_id = request['response_id']
asyncio.create_task(stream_response(request))
except WebSocketDisconnect:
print(f"LLM WebSocket disconnected for {call_id}")
except Exception as e:
print(f'LLM WebSocket error for {call_id}: {e}')
finally:
print(f"LLM WebSocket connection closed for {call_id}")