Skip to content

Commit

Permalink
Add detailed log for llm service
Browse files Browse the repository at this point in the history
  • Loading branch information
rainyfly committed Dec 14, 2023
1 parent 7b65795 commit 5cd3968
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 21 deletions.
76 changes: 55 additions & 21 deletions llm/fastdeploy_llm/serving/triton_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import time
import numpy as np
import functools
from collections import defaultdict
from fastdeploy_llm.serving.serving_model import ServingModel
from fastdeploy_llm.utils.logging_util import logger
from fastdeploy_llm.utils.logging_util import error_format, ErrorCode, ErrorType
from fastdeploy_llm.task import Task, BatchTask
import fastdeploy_llm as fdlm

Expand All @@ -31,6 +33,8 @@
pass


tokens_all_dict = defaultdict(list)

def stream_call_back(call_back_task, token_tuple, index, is_last_token,
sender):
out = dict()
Expand All @@ -39,14 +43,17 @@ def stream_call_back(call_back_task, token_tuple, index, is_last_token,
out["token_ids"] = [token_tuple[0]]
out['send_idx'] = index
out["is_end"] = is_last_token
tokens_all_dict[call_back_task.task_id].append(token_tuple[1])
out_tensor = pb_utils.Tensor(
"OUT", np.array(
[json.dumps(out)], dtype=np.object_))
if is_last_token:
logger.info("Model output for req_id: {} results_all: {}".format(call_back_task.task_id, ''.join(tokens_all_dict[call_back_task.task_id])))
sender[call_back_task.task_id].send(
pb_utils.InferenceResponse([out_tensor]),
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
del sender[call_back_task.task_id]
del tokens_all_dict[call_back_task.task_id]
else:
sender[call_back_task.task_id].send(
pb_utils.InferenceResponse([out_tensor]))
Expand All @@ -68,10 +75,14 @@ def initialize(self, args):
using_decoupled = pb_utils.using_decoupled_model_transaction_policy(
self.model_config)
if not using_decoupled:
raise pb_utils.TritonModelException(
"""the model `{}` can generate any number of responses per request,
error_type = ErrorType.Server
error_code = ErrorCode.S0001
error_info = """the model `{}` can generate any number of responses per request,
enable decoupled transaction policy in model configuration to
serve this model""".format(args["model_name"]))
serve this model""".format(args["model_name"])
error_msg = error_format.format(error_type.name, error_code.name, error_info)
logger.error(error_msg)
raise pb_utils.TritonModelException(error_msg)

parameters = self.model_config["parameters"]

Expand Down Expand Up @@ -112,10 +123,13 @@ def execute(self, requests):
if isinstance(data, list):
data = data[0]
except Exception as e:
error_type = ErrorType.Query
error_code = ErrorCode.C0000
error_info = "Cannot load json data from request, received data = {} error={}.".format(request_tensor, e)
error_msg = error_format.format(error_type.name, error_code.name, error_info)
logger.error(error_msg)
error_res = pb_utils.InferenceResponse(
error=pb_utils.TritonError(
"Cannot load json data from request, error={}.".format(
e)))
error=pb_utils.TritonError(error_msg))
res_sender = request.get_response_sender()
res_sender.send(
error_res,
Expand All @@ -127,9 +141,13 @@ def execute(self, requests):
try:
task.from_dict(data)
except Exception as e:
error_res = pb_utils.InferenceResponse(error=pb_utils.TritonError(
"There's error while deserializing data from request, error={}".
format(e)))
error_type = ErrorType.Query
error_code = ErrorCode.C0001
error_info = "There's error while deserializing data from request, received data = {} error={}".format(data, e)
error_msg = error_format.format(error_type.name, error_code.name, error_info)
logger.error(error_msg)
error_res = pb_utils.InferenceResponse(
error=pb_utils.TritonError(error_msg))
res_sender = request.get_response_sender()
res_sender.send(
error_res,
Expand All @@ -140,9 +158,13 @@ def execute(self, requests):
if task.task_id is None:
task.task_id = str(uuid.uuid4())
if task.task_id in self.response_handler:
error_type = ErrorType.Query
error_code = ErrorCode.C0001
error_info = "Task id conflict with {}.".format(task.task_id)
error_msg = error_format.format(error_type.name, error_code.name, error_info)
logger.error(error_msg)
error_res = pb_utils.InferenceResponse(
error=pb_utils.TritonError(
"Task id conflict with {}.".format(task.task_id)))
error=pb_utils.TritonError(error_msg))
res_sender = request.get_response_sender()
res_sender.send(
error_res,
Expand All @@ -153,10 +175,13 @@ def execute(self, requests):
try:
task.check(self.config.max_dec_len)
except Exception as e:
error_type = ErrorType.Query
error_code = ErrorCode.C0001
error_info = "There's error while checking task, task={} error={}".format(task, e)
error_msg = error_format.format(error_type.name, error_code.name, error_info)
logger.error(error_msg)
error_res = pb_utils.InferenceResponse(
error=pb_utils.TritonError(
"There's error while checking task, error={}".format(
e)))
error=pb_utils.TritonError(error_msg))
res_sender = request.get_response_sender()
res_sender.send(
error_res,
Expand All @@ -165,9 +190,12 @@ def execute(self, requests):

# 5. check if the requests queue is full
if self.model.requests_queue.qsize() > self.config.max_queue_num:
error_res = pb_utils.InferenceResponse(error=pb_utils.TritonError(
"The queue is full now(size={}), please wait for a while.".
format(self.model.max_queue_num)))
error_type = ErrorType.Server
error_code = ErrorCode.S0000
error_info = "The queue is full now(size={}), please wait for a while.".format(self.model.max_queue_num)
error_msg = error_format.format(error_type.name, error_code.name, error_info)
logger.error(error_msg)
error_res = pb_utils.InferenceResponse(error=pb_utils.TritonError(error_msg))
res_sender = request.get_response_sender()
res_sender.send(
error_res,
Expand Down Expand Up @@ -195,10 +223,12 @@ def execute(self, requests):
try:
self.model.add_request(task)
except Exception as e:
error_res = pb_utils.InferenceResponse(
error=pb_utils.TritonError(
"There's error while inserting new request, error={}".
format(e)))
error_type = ErrorType.Query
error_code = ErrorCode.C0001
error_info = "There's error while inserting new request, task={} error={}".format(task, e)
error_msg = error_format.format(error_type.name, error_code.name, error_info)
logger.error(error_msg)
error_res = pb_utils.InferenceResponse(error=pb_utils.TritonError(error_msg))
res_sender = request.get_response_sender()
res_sender.send(
error_res,
Expand All @@ -208,6 +238,10 @@ def execute(self, requests):

def finalize(self):
logger.info("The triton server is going to terminating...")
info_type = ErrorType.Server
info_code = ErrorCode.S0002
info_msg = error_format.format(info_type.name, info_code.name, "The triton server is going to terminating...")
logger.info(info_msg)
self.model.stop()
os.system("""
bash -c 'pids=$(ps auxww | grep -E "triton_python_backend_stub|multiprocessing.resource_tracker|engine.py" | grep -v grep | awk '"'"'{print $2}'"'"');
Expand Down
18 changes: 18 additions & 0 deletions llm/fastdeploy_llm/utils/logging_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import logging
import threading
import time
from enum import Enum
from typing import (Any, Generator, Optional, Union)
from logging.handlers import TimedRotatingFileHandler

Expand All @@ -42,6 +43,23 @@
}




error_format = """Error: Type {} Code {} Describe: {}"""

class ErrorCode(Enum):
C0000 = 0 # 客户端发送的query格式错误
C0001 = 1 # 客户端发送的query有效性校验
S0000 = 2 # 服务负载过大
S0001 = 3 # 服务没能正常启动
S0002 = 4 # 服务退出

class ErrorType(Enum):
Query = 0 # Query错误
Server = 1 # Server错误



class Logger(object):
_DEFAULT_NAME: str = 'FastDeploy'

Expand Down

0 comments on commit 5cd3968

Please sign in to comment.