AnyTool/toolbench/utils.py
2024-02-23 15:13:06 +08:00

132 lines
No EOL
6.3 KiB
Python

import json
import re
import torch
import transformers
import transformers.models.llama.modeling_llama
from functools import partial
def process_system_message(system_message, functions):
assert "with a function call to actually excute your step." in system_message
# we find that following ReACT format and merging the thought node and function call node is easier for model to learn to integrate the action input json string in its prediction than learn to predict a json string directly.
system_message = system_message.replace("with a function call to actually excute your step.", "with a function call to actually excute your step. Your output should follow this format:\nThought:\nAction\nAction Input:\n")
# add all the function dicts in the prompt.
system_message = system_message + "\nSpecifically, you have access to the following APIs: " + str(functions)
return system_message
def get_gpu_memory(max_gpus=None):
"""Get available memory for each GPU."""
gpu_memory = []
num_gpus = (
torch.cuda.device_count()
if max_gpus is None
else min(max_gpus, torch.cuda.device_count())
)
for gpu_id in range(num_gpus):
with torch.cuda.device(gpu_id):
device = torch.cuda.current_device()
gpu_properties = torch.cuda.get_device_properties(device)
total_memory = gpu_properties.total_memory / (1024**3)
allocated_memory = torch.cuda.memory_allocated() / (1024**3)
available_memory = total_memory - allocated_memory
gpu_memory.append(available_memory)
return gpu_memory
def standardize_category(category):
save_category = category.replace(" ", "_").replace(",", "_").replace("/", "_")
while " " in save_category or "," in save_category:
save_category = save_category.replace(" ", "_").replace(",", "_")
save_category = save_category.replace("__", "_")
return save_category
def standardize(string):
res = re.compile("[^\\u4e00-\\u9fa5^a-z^A-Z^0-9^_]")
string = res.sub("_", string)
string = re.sub(r"(_)\1+","_", string).lower()
while True:
if len(string) == 0:
return string
if string[0] == "_":
string = string[1:]
else:
break
while True:
if len(string) == 0:
return string
if string[-1] == "_":
string = string[:-1]
else:
break
if string[0].isdigit():
string = "get_" + string
return string
def change_name(name):
change_list = ["from", "class", "return", "false", "true", "id", "and"]
if name in change_list:
name = "is_" + name
return name
# code adapted from https://huggingface.co/kaiokendev/superhot-13b-8k-no-rlhf-test/blob/main/llama_rope_scaled_monkey_patch.py
class CondenseRotaryEmbedding(torch.nn.Module):
def __init__(self, dim, ratio, max_position_embeddings=2048, base=10000, device=None):
super().__init__()
inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2).float().to(device) / dim))
self.register_buffer("inv_freq", inv_freq)
# Build here to make `torch.jit.trace` work.
self.ratio = ratio
max_position_embeddings *= ratio
print(f"Condensing Positional embeddings from {max_position_embeddings} to {max_position_embeddings // ratio}")
self.max_seq_len_cached = max_position_embeddings
t = torch.arange(self.max_seq_len_cached, device=self.inv_freq.device, dtype=self.inv_freq.dtype) / ratio
freqs = torch.einsum("i,j->ij", t, self.inv_freq)
# Different from paper, but it uses a different permutation in order to obtain the same calculation
emb = torch.cat((freqs, freqs), dim=-1)
dtype = torch.get_default_dtype()
self.register_buffer("cos_cached", emb.cos()[None, None, :, :].to(dtype), persistent=False)
self.register_buffer("sin_cached", emb.sin()[None, None, :, :].to(dtype), persistent=False)
def forward(self, x, seq_len=None):
# x: [bs, num_attention_heads, seq_len, head_size]
# This `if` block is unlikely to be run after we build sin/cos in `__init__`. Keep the logic here just in case.
if seq_len > self.max_seq_len_cached:
self.max_seq_len_cached = seq_len
t = torch.arange(self.max_seq_len_cached, device=x.device, dtype=self.inv_freq.dtype) / self.ratio
freqs = torch.einsum("i,j->ij", t, self.inv_freq)
# Different from paper, but it uses a different permutation in order to obtain the same calculation
emb = torch.cat((freqs, freqs), dim=-1).to(x.device)
self.register_buffer("cos_cached", emb.cos()[None, None, :, :].to(x.dtype), persistent=False)
self.register_buffer("sin_cached", emb.sin()[None, None, :, :].to(x.dtype), persistent=False)
return (
self.cos_cached[:, :, :seq_len, ...].to(dtype=x.dtype),
self.sin_cached[:, :, :seq_len, ...].to(dtype=x.dtype),
)
def replace_llama_with_condense(ratio):
transformers.models.llama.modeling_llama.LlamaRotaryEmbedding = partial(CondenseRotaryEmbedding, ratio=ratio)
def process_retrieval_ducoment(documents_df):
ir_corpus = {}
corpus2tool = {}
for row in documents_df.itertuples():
doc = json.loads(row.document_content)
ir_corpus[row.docid] = (doc.get('category_name', '') or '') + ', ' + \
(doc.get('tool_name', '') or '') + ', ' + \
(doc.get('api_name', '') or '') + ', ' + \
(doc.get('api_description', '') or '') + \
', required_params: ' + json.dumps(doc.get('required_parameters', '')) + \
', optional_params: ' + json.dumps(doc.get('optional_parameters', '')) + \
', return_schema: ' + json.dumps(doc.get('template_response', ''))
corpus2tool[(doc.get('category_name', '') or '') + ', ' + \
(doc.get('tool_name', '') or '') + ', ' + \
(doc.get('api_name', '') or '') + ', ' + \
(doc.get('api_description', '') or '') + \
', required_params: ' + json.dumps(doc.get('required_parameters', '')) + \
', optional_params: ' + json.dumps(doc.get('optional_parameters', '')) + \
', return_schema: ' + json.dumps(doc.get('template_response', ''))] = doc['category_name'] + '\t' + doc['tool_name'] + '\t' + doc['api_name']
return ir_corpus, corpus2tool