读取文件数据输出的 Agent
GitHub: filereader
使用方法
1. 创建 Agent 身份
请参考 创建身份,读写公有私有数据
2. 修改 main.py 文件
将 AGENT_NAME 修改为实际的身份信息
3. 执行代码
bash
python main.py功能简介
该 Agent 基于 agentcp 库构建的文件读取示例,可以将 agentcp 目录下的文件列出或者读取文件内容作为消息传递。主要演示以下功能:
- 安全的文件读取
- 支持遍历目录查看文件列表
- 交互式消息处理
完整示例代码
python
from agentcp import AgentCP
import asyncio
import os
import re
import json
class FileOperator:
"""文件操作类,负责安全地读取文本文件内容"""
def __init__(self, sandbox_root: str = None):
self.sandbox_root = sandbox_root or os.getcwd()
def is_text_file(self, file_path: str) -> bool:
"""通过检查文件内容判断是否为文本文件"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
f.read(1024)
return True
except UnicodeDecodeError:
return False
except Exception:
return False
def extract_file_path(self, text: str) -> str:
"""从文本中提取文件名(不需要完整路径)"""
match = re.search(r"(?:读取|打开|查看)文件\s*([^\s\/\\]+)", text)
return match.group(1) if match else None
def sanitize_path(self, filename: str) -> str:
"""遍历sandbox目录,找到这个文件"""
def recursive_search(root_dir):
try:
for root, dirs, files in os.walk(root_dir):
if filename in files:
full_path = os.path.join(root, filename)
normalized_path = os.path.normpath(full_path)
return normalized_path
except:
return None
return None
return recursive_search(self.sandbox_root)
def walk_directory(self):
"""遍历目录并打印所有文件和文件夹"""
filenamess = []
for root, dirs, files in os.walk(self.sandbox_root):
for file in files:
filenamess.append(file)
return filenamess
def exist_file(self, filename):
"""检查文件是否存在"""
safe_path = self.sanitize_path(filename)
return os.path.exists(safe_path)
def read_file(self, file_path: str) -> str:
"""安全地读取文本文件内容"""
safe_path = self.sanitize_path(file_path)
if not safe_path:
raise ValueError("文件不存在")
if not self.is_text_file(safe_path):
raise ValueError("只能读取文本文件")
with open(safe_path, "r") as file:
return file.read()
def parse_command(text):
if not text:
return False
if text.find("查询") >= 0 or text.find("列表") >= 0:
return 'list'
if text.find("读取") >= 0 or text.find("查看") >= 0:
return 'read'
class FileAgent:
def __init__(self, endpoint: str, name: str):
"""初始化文件Agent"""
self.acp = AgentCP("./", seed_password="888777")
self.endpoint = endpoint
self.name = name
self.aid = None
self.file_operator = FileOperator(self.acp.app_path)
self.last_command = ''
async def message_handler(self, msg):
"""消息处理器 - 根据消息内容安全地读取文件"""
try:
ref_msg_id = msg.get("ref_msg_id")
content = msg.get("message", "\"{}\"")
content = json.loads(content)[0]["content"]
content = json.loads(content)
text = content.get("text", "")
command = parse_command(text)
print(f"收到消息: {content}")
if self.last_command == 'read':
self.last_command = ''
return await self.read_file(msg, text)
elif command == 'list':
files = self.file_operator.walk_directory()
if not files:
await self._send_reply(msg, "当前目录下没有文件")
return True
to = "文件列表<br>" + "<br>".join(files)
await self._send_reply(msg, to)
return True
elif command == 'read':
self.last_command = 'read'
files = self.file_operator.walk_directory()
if not files:
await self._send_reply(msg, "当前目录下没有文件")
return True
to = "读取哪一个文件?<br>" + "<br>".join(files)
print(f'send message: {to}')
await self._send_reply(msg, to)
return True
else:
await self._send_reply(msg, "你可以对我说: <br>读取文件<br>查询文件")
except Exception as e:
print(f"处理消息出错: {str(e)}")
await self._send_reply(msg, f"处理文件时出错: {str(e)}")
return False
async def read_file(self, msg, text):
try:
filename = self.file_operator.extract_file_path(text)
if not filename:
try:
print(f'未提供文件名尝试读取: {text}')
file_content = self.file_operator.read_file(text)
await self._send_reply(msg, f"文件内容:<br>{file_content}")
return
except Exception as e:
print(f"处理消息出错: {str(e)}")
try:
file_content = self.file_operator.read_file(filename)
await self._send_reply(msg, f"文件内容:<br>{file_content}")
except PermissionError:
await self._send_reply(msg, "访问文件被拒绝")
except FileNotFoundError:
await self._send_reply(msg, f"文件不存在: {filename}")
except ValueError as e:
await self._send_reply(msg, f"{str(e)}")
return True
except Exception as e:
print(f"处理消息出错: {str(e)}")
await self._send_reply(msg, f"处理文件时出错: {str(e)}")
return False
async def _send_reply(self, original_msg, content: str):
"""发送回复消息"""
try:
self.aid.send_message_content(
to_aid_list=[original_msg.get("sender")],
session_id=original_msg.get("session_id"),
llm_content=content)
except Exception as e:
print(f"发送回复消息出错: {str(e)}")
async def run(self):
"""运行Agent"""
try:
print("正在启动Agent...", self.endpoint, self.name)
self.aid = self.acp.create_aid(self.endpoint, self.name)
self.aid.add_message_handler(self.message_handler)
self.aid.online()
print("Agent已上线,等待文件读取指令...")
while True:
await asyncio.sleep(1)
except Exception as e:
print(f"发生错误: {str(e)}")
finally:
if self.aid:
self.aid.offline()
print("Agent已下线")
if __name__ == "__main__":
ENDPOINT = "aid.pub"
AGENT_NAME = ""
agent = FileAgent(ENDPOINT, AGENT_NAME)
asyncio.run(agent.run())交互说明
发送指令格式:
- 查询文件列表: "查询文件" 或 "列表"
- 读取文件内容: "读取文件" 或 "查看文件" (会进入交互模式, 只需输入文件名即可)
注意事项
- 文件操作限制在沙箱目录内(AgentID 的公有数据目录)
- 只能操作文本文件