Skip to content

Tools

lazyllm.tools.Document

Bases: ModuleBase

初始化一个具有可选用户界面的文档模块。

此构造函数初始化一个可以有或没有用户界面的文档模块。如果启用了用户界面,它还会提供一个ui界面来管理文档操作接口,并提供一个用于用户界面交互的网页。

Parameters:

  • dataset_path (str) –

    数据集目录的路径。此目录应包含要由文档模块管理的文档。

  • embed

    用于生成文档embedding的对象。

  • create_ui (bool, default: True ) –

    指示是否为文档模块创建用户界面的标志。默认为 True。

  • launcher (optional, default: None ) –

    负责启动服务器模块的对象或函数。如果未提供,则使用 lazyllm.launchers 中的默认异步启动器 (sync=False)。

Examples:

>>> import lazyllm
>>> from lazyllm.tools import Document
>>> m = lazyllm.OnlineEmbeddingModule(source="glm")
>>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
Source code in lazyllm/tools/rag/document.py
class Document(ModuleBase):
    """初始化一个具有可选用户界面的文档模块。

此构造函数初始化一个可以有或没有用户界面的文档模块。如果启用了用户界面,它还会提供一个ui界面来管理文档操作接口,并提供一个用于用户界面交互的网页。

Args:
    dataset_path (str): 数据集目录的路径。此目录应包含要由文档模块管理的文档。
    embed: 用于生成文档embedding的对象。
    create_ui (bool, optional): 指示是否为文档模块创建用户界面的标志。默认为 True。
    launcher (optional): 负责启动服务器模块的对象或函数。如果未提供,则使用 `lazyllm.launchers` 中的默认异步启动器 (`sync=False`)。


Examples:
    >>> import lazyllm
    >>> from lazyllm.tools import Document
    >>> m = lazyllm.OnlineEmbeddingModule(source="glm")
    >>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
    """
    def __init__(self, dataset_path: str, embed, create_ui: bool = True, launcher=None):
        super().__init__()
        if not os.path.exists(dataset_path):
            defatult_path = os.path.join(lazyllm.config["data_path"], dataset_path)
            if os.path.exists(defatult_path):
                dataset_path = defatult_path
        self._create_ui = create_ui
        launcher = launcher if launcher else lazyllm.launchers.remote(sync=False)

        if create_ui:
            self._impl = DocGroupImpl(dataset_path=dataset_path, embed=embed)
            doc_manager = DocManager(self._impl)
            self.doc_server = ServerModule(doc_manager, launcher=launcher)

            self.web = DocWebModule(doc_server=self.doc_server)
        else:
            self._impl = DocGroupImpl(dataset_path=dataset_path, embed=embed)

    def forward(self, func_name: str, *args, **kwargs):
        if self._create_ui:
            kwargs["func_name"] = func_name
            return self.doc_server.forward(*args, **kwargs)
        else:
            return getattr(self._impl, func_name)(*args, **kwargs)

    def find_parent(self, group: str) -> Callable:
        """
查找指定节点的父节点。

Args:
    group (str): 需要查找的节点名称


Examples:

    >>> import lazyllm
    >>> from lazyllm.tools import Document, SentenceSplitter
    >>> m = lazyllm.OnlineEmbeddingModule(source="glm")
    >>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
    >>> documents.create_node_group(name="parent", transform=SentenceSplitter, chunk_size=1024, chunk_overlap=100)
    >>> documents.create_node_group(name="children", transform=SentenceSplitter, parent="parent", chunk_size=1024, chunk_overlap=100)
    >>> documents.find_parent('children')
    """
        return partial(self.forward, "find_parent", group=group)

    def find_children(self, group: str) -> Callable:
        """
查找指定节点的子节点。

Args:
    group (str): 需要查找的名称


Examples:

    >>> import lazyllm
    >>> from lazyllm.tools import Document, SentenceSplitter
    >>> m = lazyllm.OnlineEmbeddingModule(source="glm")
    >>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
    >>> documents.create_node_group(name="parent", transform=SentenceSplitter, chunk_size=1024, chunk_overlap=100)
    >>> documents.create_node_group(name="children", transform=SentenceSplitter, parent="parent", chunk_size=1024, chunk_overlap=100)
    >>> documents.find_children('parent')
    """
        return partial(self.forward, "find_children", group=group)

    def __repr__(self):
        return lazyllm.make_repr("Module", "Document", create_ui=self._create_ui)

    def create_node_group(
        self, name: str, transform: Callable, parent: str = LAZY_ROOT_NAME, **kwargs
    ) -> None:
        """
创建一个由指定规则生成的 node group。

Args:
    name (str): node group 的名称。
    transform (Callable): 将 node 转换成 node group 的转换规则,函数原型是 `(DocNode, group_name, **kwargs) -> List[DocNode]`。目前内置的有 [SentenceSplitter][lazyllm.tools.SentenceSplitter]。用户也可以自定义转换规则。
    parent (str): 需要进一步转换的节点。转换之后得到的一系列新的节点将会作为该父节点的子节点。如果不指定则从根节点开始转换。
    kwargs: 和具体实现相关的参数。


Examples:

    >>> import lazyllm
    >>> from lazyllm.tools import Document, SentenceSplitter
    >>> m = lazyllm.OnlineEmbeddingModule(source="glm")
    >>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
    >>> documents.create_node_group(name="sentences", transform=SentenceSplitter, chunk_size=1024, chunk_overlap=100)
    """
        self._impl.create_node_group(name, transform, parent, **kwargs)

create_node_group(name, transform, parent=LAZY_ROOT_NAME, **kwargs)

创建一个由指定规则生成的 node group。

Parameters:

  • name (str) –

    node group 的名称。

  • transform (Callable) –

    将 node 转换成 node group 的转换规则,函数原型是 (DocNode, group_name, **kwargs) -> List[DocNode]。目前内置的有 SentenceSplitter。用户也可以自定义转换规则。

  • parent (str, default: LAZY_ROOT_NAME ) –

    需要进一步转换的节点。转换之后得到的一系列新的节点将会作为该父节点的子节点。如果不指定则从根节点开始转换。

  • kwargs

    和具体实现相关的参数。

Examples:

>>> import lazyllm
>>> from lazyllm.tools import Document, SentenceSplitter
>>> m = lazyllm.OnlineEmbeddingModule(source="glm")
>>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
>>> documents.create_node_group(name="sentences", transform=SentenceSplitter, chunk_size=1024, chunk_overlap=100)
Source code in lazyllm/tools/rag/document.py
    def create_node_group(
        self, name: str, transform: Callable, parent: str = LAZY_ROOT_NAME, **kwargs
    ) -> None:
        """
创建一个由指定规则生成的 node group。

Args:
    name (str): node group 的名称。
    transform (Callable): 将 node 转换成 node group 的转换规则,函数原型是 `(DocNode, group_name, **kwargs) -> List[DocNode]`。目前内置的有 [SentenceSplitter][lazyllm.tools.SentenceSplitter]。用户也可以自定义转换规则。
    parent (str): 需要进一步转换的节点。转换之后得到的一系列新的节点将会作为该父节点的子节点。如果不指定则从根节点开始转换。
    kwargs: 和具体实现相关的参数。


Examples:

    >>> import lazyllm
    >>> from lazyllm.tools import Document, SentenceSplitter
    >>> m = lazyllm.OnlineEmbeddingModule(source="glm")
    >>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
    >>> documents.create_node_group(name="sentences", transform=SentenceSplitter, chunk_size=1024, chunk_overlap=100)
    """
        self._impl.create_node_group(name, transform, parent, **kwargs)

find_children(group)

查找指定节点的子节点。

Parameters:

  • group (str) –

    需要查找的名称

Examples:

>>> import lazyllm
>>> from lazyllm.tools import Document, SentenceSplitter
>>> m = lazyllm.OnlineEmbeddingModule(source="glm")
>>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
>>> documents.create_node_group(name="parent", transform=SentenceSplitter, chunk_size=1024, chunk_overlap=100)
>>> documents.create_node_group(name="children", transform=SentenceSplitter, parent="parent", chunk_size=1024, chunk_overlap=100)
>>> documents.find_children('parent')
Source code in lazyllm/tools/rag/document.py
    def find_children(self, group: str) -> Callable:
        """
查找指定节点的子节点。

Args:
    group (str): 需要查找的名称


Examples:

    >>> import lazyllm
    >>> from lazyllm.tools import Document, SentenceSplitter
    >>> m = lazyllm.OnlineEmbeddingModule(source="glm")
    >>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
    >>> documents.create_node_group(name="parent", transform=SentenceSplitter, chunk_size=1024, chunk_overlap=100)
    >>> documents.create_node_group(name="children", transform=SentenceSplitter, parent="parent", chunk_size=1024, chunk_overlap=100)
    >>> documents.find_children('parent')
    """
        return partial(self.forward, "find_children", group=group)

find_parent(group)

查找指定节点的父节点。

Parameters:

  • group (str) –

    需要查找的节点名称

Examples:

>>> import lazyllm
>>> from lazyllm.tools import Document, SentenceSplitter
>>> m = lazyllm.OnlineEmbeddingModule(source="glm")
>>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
>>> documents.create_node_group(name="parent", transform=SentenceSplitter, chunk_size=1024, chunk_overlap=100)
>>> documents.create_node_group(name="children", transform=SentenceSplitter, parent="parent", chunk_size=1024, chunk_overlap=100)
>>> documents.find_parent('children')
Source code in lazyllm/tools/rag/document.py
    def find_parent(self, group: str) -> Callable:
        """
查找指定节点的父节点。

Args:
    group (str): 需要查找的节点名称


Examples:

    >>> import lazyllm
    >>> from lazyllm.tools import Document, SentenceSplitter
    >>> m = lazyllm.OnlineEmbeddingModule(source="glm")
    >>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
    >>> documents.create_node_group(name="parent", transform=SentenceSplitter, chunk_size=1024, chunk_overlap=100)
    >>> documents.create_node_group(name="children", transform=SentenceSplitter, parent="parent", chunk_size=1024, chunk_overlap=100)
    >>> documents.find_parent('children')
    """
        return partial(self.forward, "find_parent", group=group)

lazyllm.tools.Reranker

Bases: ModuleBase

用于创建节点(文档)后处理和重排序的模块。

Parameters:

  • name (str, default: 'ModuleReranker' ) –

    用于后处理和重排序过程的排序器类型。默认为 'Reranker'。

  • kwargs

    传递给重新排序器实例化的其他关键字参数。

详细解释排序器类型

  • Reranker: 实例化一个具有指定模型和 top_n 参数的 SentenceTransformerRerank 重排序器。
  • KeywordFilter: 实例化一个具有指定必需和排除关键字的 KeywordNodePostprocessor。它根据这些关键字的存在或缺失来过滤节点。

Examples:

>>> import lazyllm
>>> from lazyllm.tools import Document, Reranker, Retriever
>>> m = lazyllm.OnlineEmbeddingModule()
>>> documents = Document(dataset_path='rag_master', embed=m, create_ui=False)
>>> retriever = Retriever(documents, group_name='CoarseChunk', similarity='bm25', similarity_cut_off=0.01, topk=6)
>>> reranker = Reranker(name='ModuleReranker', model='bg-reranker-large', topk=1)
>>> ppl = lazyllm.ActionModule(retriever, reranker)
>>> ppl.start()
>>> print(ppl("query"))
Source code in lazyllm/tools/rag/rerank.py
class Reranker(ModuleBase):
    """用于创建节点(文档)后处理和重排序的模块。

Args:
    name: 用于后处理和重排序过程的排序器类型。默认为 'Reranker'。
    kwargs: 传递给重新排序器实例化的其他关键字参数。

详细解释排序器类型

  - Reranker: 实例化一个具有指定模型和 top_n 参数的 SentenceTransformerRerank 重排序器。
  - KeywordFilter: 实例化一个具有指定必需和排除关键字的 KeywordNodePostprocessor。它根据这些关键字的存在或缺失来过滤节点。


Examples:

    >>> import lazyllm
    >>> from lazyllm.tools import Document, Reranker, Retriever
    >>> m = lazyllm.OnlineEmbeddingModule()
    >>> documents = Document(dataset_path='rag_master', embed=m, create_ui=False)
    >>> retriever = Retriever(documents, group_name='CoarseChunk', similarity='bm25', similarity_cut_off=0.01, topk=6)
    >>> reranker = Reranker(name='ModuleReranker', model='bg-reranker-large', topk=1)
    >>> ppl = lazyllm.ActionModule(retriever, reranker)
    >>> ppl.start()
    >>> print(ppl("query"))
    """
    registered_reranker = dict()

    def __init__(self, name: str = "ModuleReranker", **kwargs) -> None:
        super().__init__()
        self.name = name
        self.kwargs = kwargs

    def forward(self, nodes: List[DocNode], query: str = "") -> List[DocNode]:
        results = self.registered_reranker[self.name](nodes, query=query, **self.kwargs)
        LOG.debug(f"Rerank use `{self.name}` and get nodes: {results}")
        return results

    @classmethod
    def register_reranker(
        cls: "Reranker", func: Optional[Callable] = None, batch: bool = False
    ):
        def decorator(f):
            def wrapper(nodes, **kwargs):
                if batch:
                    return f(nodes, **kwargs)
                else:
                    results = [f(node, **kwargs) for node in nodes]
                    return [result for result in results if result]

            cls.registered_reranker[f.__name__] = wrapper
            return wrapper

        return decorator(func) if func else decorator

lazyllm.tools.Retriever

Bases: ModuleBase

创建一个用于文档查询和检索的检索模块。此构造函数初始化一个检索模块,该模块根据指定的相似度度量配置文档检索过程。

Parameters:

  • doc (object) –

    文档模块实例。

  • group_name (str) –

    在哪个 node group 上进行检索。

  • similarity (str, default: 'dummy' ) –

    用于设置文档检索的相似度函数。默认为 'dummy'。候选集包括 ["bm25", "bm25_chinese", "cosine"]。

  • similarity_cut_off (float, default: float('-inf') ) –

    当相似度低于指定值时丢弃该文档。

  • index (str, default: 'default' ) –

    用于文档检索的索引类型。目前仅支持 'default'。

  • topk (int, default: 6 ) –

    表示取相似度最高的多少篇文档。

  • similarity_kw

    传递给 similarity 计算函数的其它参数。

其中 group_name 有三个内置的切分策略,都是使用 SentenceSplitter 做切分,区别在于块大小不同:

  • CoarseChunk: 块大小为 1024,重合长度为 100
  • MediumChunk: 块大小为 256,重合长度为 25
  • FineChunk: 块大小为 128,重合长度为 12

Examples:

>>> import lazyllm
>>> from lazyllm.tools import Retriever
>>> from lazyllm.tools import Document
>>> m = lazyllm.OnlineEmbeddingModule()
>>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
>>> rm = Retriever(documents, group_name='CoarseChunk', similarity='bm25', similarity_cut_off=0.01, topk=6)
>>> rm.start()
>>> print(rm("query"))
Source code in lazyllm/tools/rag/retriever.py
class Retriever(ModuleBase):
    """
创建一个用于文档查询和检索的检索模块。此构造函数初始化一个检索模块,该模块根据指定的相似度度量配置文档检索过程。

Args:
    doc: 文档模块实例。
    group_name: 在哪个 node group 上进行检索。
    similarity: 用于设置文档检索的相似度函数。默认为 'dummy'。候选集包括 ["bm25", "bm25_chinese", "cosine"]。
    similarity_cut_off: 当相似度低于指定值时丢弃该文档。
    index: 用于文档检索的索引类型。目前仅支持 'default'。
    topk: 表示取相似度最高的多少篇文档。
    similarity_kw: 传递给 similarity 计算函数的其它参数。

其中 `group_name` 有三个内置的切分策略,都是使用 `SentenceSplitter` 做切分,区别在于块大小不同:

- CoarseChunk: 块大小为 1024,重合长度为 100
- MediumChunk: 块大小为 256,重合长度为 25
- FineChunk: 块大小为 128,重合长度为 12


Examples:

    >>> import lazyllm
    >>> from lazyllm.tools import Retriever
    >>> from lazyllm.tools import Document
    >>> m = lazyllm.OnlineEmbeddingModule()
    >>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
    >>> rm = Retriever(documents, group_name='CoarseChunk', similarity='bm25', similarity_cut_off=0.01, topk=6)
    >>> rm.start()
    >>> print(rm("query"))
    """
    __enable_request__ = False

    def __init__(
        self,
        doc: object,
        group_name: str,
        similarity: str = "dummy",
        similarity_cut_off: float = float("-inf"),
        index: str = "default",
        topk: int = 6,
        **kwargs,
    ):
        super().__init__()
        self.doc = doc
        self.group_name = group_name
        self.similarity = similarity  # similarity function str
        self.similarity_cut_off = similarity_cut_off
        self.index = index
        self.topk = topk
        self.similarity_kw = kwargs  # kw parameters

    def _get_post_process_tasks(self):
        return pipeline(lambda *a: self('Test Query'))

    def forward(self, query: str) -> List[DocNode]:
        return self.doc.forward(
            func_name="retrieve",
            query=query,
            group_name=self.group_name,
            similarity=self.similarity,
            similarity_cut_off=self.similarity_cut_off,
            index=self.index,
            topk=self.topk,
            similarity_kws=self.similarity_kw,
        )

lazyllm.tools.SentenceSplitter

Bases: NodeTransform

将句子拆分成指定大小的块。可以指定相邻块之间重合部分的大小。

Parameters:

  • chunk_size (int, default: 1024 ) –

    拆分之后的块大小

  • chunk_overlap (int, default: 200 ) –

    相邻两个块之间重合的内容长度

Examples:

>>> import lazyllm
>>> from lazyllm.tools import Document, SentenceSplitter
>>> m = lazyllm.OnlineEmbeddingModule(source="glm")
>>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
>>> documents.create_node_group(name="sentences", transform=SentenceSplitter, chunk_size=1024, chunk_overlap=100)
Source code in lazyllm/tools/rag/transform.py
class SentenceSplitter(NodeTransform):
    """
将句子拆分成指定大小的块。可以指定相邻块之间重合部分的大小。

Args:
    chunk_size (int): 拆分之后的块大小
    chunk_overlap (int): 相邻两个块之间重合的内容长度


Examples:

    >>> import lazyllm
    >>> from lazyllm.tools import Document, SentenceSplitter
    >>> m = lazyllm.OnlineEmbeddingModule(source="glm")
    >>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
    >>> documents.create_node_group(name="sentences", transform=SentenceSplitter, chunk_size=1024, chunk_overlap=100)
    """
    def __init__(self, chunk_size: int = 1024, chunk_overlap: int = 200):
        if chunk_overlap > chunk_size:
            raise ValueError(
                f"Got a larger chunk overlap ({chunk_overlap}) than chunk size "
                f"({chunk_size}), should be smaller."
            )

        assert (
            chunk_size > 0 and chunk_overlap >= 0
        ), "chunk size should > 0 and chunk_overlap should >= 0"

        try:
            self._tiktoken_tokenizer = tiktoken.encoding_for_model("gpt-3.5-turbo")
        except requests.exceptions.ConnectionError:
            LOG.error(
                "Unable to download the vocabulary file for tiktoken `gpt-3.5-turbo`. "
                "Please check your internet connection. "
                "Alternatively, you can manually download the file "
                "and set the `TIKTOKEN_CACHE_DIR` environment variable."
            )
            raise
        except Exception as e:
            LOG.error(f"Unable to build tiktoken tokenizer with error `{e}`")
            raise
        self._punkt_st_tokenizer = nltk.tokenize.PunktSentenceTokenizer()

        self._sentence_split_fns = [
            partial(split_text_keep_separator, separator="\n\n\n"),  # paragraph
            self._punkt_st_tokenizer.tokenize,
        ]

        self._sub_sentence_split_fns = [
            lambda t: re.findall(r"[^,.;。?!]+[,.;。?!]?", t),
            partial(split_text_keep_separator, separator=" "),
            list,  # split by character
        ]

        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    def transform(self, node: DocNode, **kwargs) -> List[str]:
        return self.split_text(
            node.get_text(),
            metadata_size=self._get_metadata_size(node),
        )

    def _get_metadata_size(self, node: DocNode) -> int:
        # Return the bigger size to ensure chunk_size < limit
        return max(
            self._token_size(node.get_metadata_str(mode=MetadataMode.EMBED)),
            self._token_size(node.get_metadata_str(mode=MetadataMode.LLM)),
        )

    def split_text(self, text: str, metadata_size: int) -> List[str]:
        if text == "":
            return [""]
        effective_chunk_size = self.chunk_size - metadata_size
        if effective_chunk_size <= 0:
            raise ValueError(
                f"Metadata length ({metadata_size}) is longer than chunk size "
                f"({self.chunk_size}). Consider increasing the chunk size or "
                "decreasing the size of your metadata to avoid this."
            )
        elif effective_chunk_size < 50:
            LOG.warning(
                f"Metadata length ({metadata_size}) is close to chunk size "
                f"({self.chunk_size}). Resulting chunks are less than 50 tokens. "
                "Consider increasing the chunk size or decreasing the size of "
                "your metadata to avoid this.",
                flush=True,
            )

        splits = self._split(text, effective_chunk_size)
        chunks = self._merge(splits, effective_chunk_size)
        return chunks

    def _split(self, text: str, chunk_size: int) -> List[_Split]:
        """Break text into splits that are smaller than chunk size.

        The order of splitting is:
        1. split by paragraph separator
        2. split by chunking tokenizer
        3. split by second chunking regex
        4. split by default separator (" ")
        5. split by character
        """
        token_size = self._token_size(text)
        if token_size <= chunk_size:
            return [_Split(text, is_sentence=True, token_size=token_size)]

        text_splits_by_fns, is_sentence = self._get_splits_by_fns(text)

        text_splits = []
        for text in text_splits_by_fns:
            token_size = self._token_size(text)
            if token_size <= chunk_size:
                text_splits.append(
                    _Split(
                        text,
                        is_sentence=is_sentence,
                        token_size=token_size,
                    )
                )
            else:
                recursive_text_splits = self._split(text, chunk_size=chunk_size)
                text_splits.extend(recursive_text_splits)
        return text_splits

    def _merge(self, splits: List[_Split], chunk_size: int) -> List[str]:
        chunks: List[str] = []
        cur_chunk: List[Tuple[str, int]] = []  # list of (text, length)
        cur_chunk_len = 0
        is_chunk_new = True

        def close_chunk() -> None:
            nonlocal chunks, cur_chunk, cur_chunk_len, is_chunk_new

            chunks.append("".join([text for text, _ in cur_chunk]))
            last_chunk = cur_chunk
            cur_chunk = []
            cur_chunk_len = 0
            is_chunk_new = True

            # Add overlap to the next chunk using the last one first
            overlap_len = 0
            for text, length in reversed(last_chunk):
                if overlap_len + length > self.chunk_overlap:
                    break
                cur_chunk.append((text, length))
                overlap_len += length
                cur_chunk_len += length
            cur_chunk.reverse()

        i = 0
        while i < len(splits):
            cur_split = splits[i]
            if cur_split.token_size > chunk_size:
                raise ValueError("Single token exceeded chunk size")
            if cur_chunk_len + cur_split.token_size > chunk_size and not is_chunk_new:
                # if adding split to current chunk exceeds chunk size
                close_chunk()
            else:
                if (
                    cur_split.is_sentence
                    or cur_chunk_len + cur_split.token_size <= chunk_size
                    or is_chunk_new  # new chunk, always add at least one split
                ):
                    # add split to chunk
                    cur_chunk_len += cur_split.token_size
                    cur_chunk.append((cur_split.text, cur_split.token_size))
                    i += 1
                    is_chunk_new = False
                else:
                    close_chunk()

        # handle the last chunk
        if not is_chunk_new:
            chunks.append("".join([text for text, _ in cur_chunk]))

        # Remove whitespace only chunks and remove leading and trailing whitespace.
        return [stripped_chunk for chunk in chunks if (stripped_chunk := chunk.strip())]

    def _token_size(self, text: str) -> int:
        return len(self._tiktoken_tokenizer.encode(text, allowed_special="all"))

    def _get_splits_by_fns(self, text: str) -> Tuple[List[str], bool]:
        for split_fn in self._sentence_split_fns:
            splits = split_fn(text)
            if len(splits) > 1:
                return splits, True

        for split_fn in self._sub_sentence_split_fns:
            splits = split_fn(text)
            if len(splits) > 1:
                break

        return splits, False

lazyllm.tools.LLMParser

Bases: NodeTransform

一个文本摘要和关键词提取器,负责分析用户输入的文本,并根据请求任务提供简洁的摘要或提取相关关键词。

Parameters:

  • llm (TrainableModule) –

    可训练的模块

  • language (str) –

    语言种类,目前只支持中文(zh)和英文(en)

  • task_type (str) –

    目前支持两种任务:摘要(summary)和关键词抽取(keywords)。

Examples:

>>> from lazyllm import TrainableModule
>>> from lazyllm.tools.rag import LLMParser
>>> llm = TrainableModule("internlm2-chat-7b")
>>> summary_parser = LLMParser(llm, language="en", task_type="summary")
Source code in lazyllm/tools/rag/transform.py
class LLMParser(NodeTransform):
    """
一个文本摘要和关键词提取器,负责分析用户输入的文本,并根据请求任务提供简洁的摘要或提取相关关键词。

Args:
    llm (TrainableModule): 可训练的模块
    language (str): 语言种类,目前只支持中文(zh)和英文(en)
    task_type (str): 目前支持两种任务:摘要(summary)和关键词抽取(keywords)。


Examples:

    >>> from lazyllm import TrainableModule
    >>> from lazyllm.tools.rag import LLMParser
    >>> llm = TrainableModule("internlm2-chat-7b")
    >>> summary_parser = LLMParser(llm, language="en", task_type="summary")
    """
    def __init__(self, llm: TrainableModule, language: str, task_type: str) -> None:
        assert language in ["en", "zh"], f"Not supported language {language}"
        assert task_type in [
            "summary",
            "keywords",
        ], f"Not supported task_type {task_type}"
        prompt = en_prompt_template if language == "en" else ch_prompt_template
        self._llm = llm.share(
            prompt=AlpacaPrompter(prompt).pre_hook(self.prompt_pre_hook)
        )
        self._task_type = task_type

    def prompt_pre_hook(
        self,
        input: Union[str, List, Dict[str, str], None] = None,
        history: List[Union[List[str], Dict[str, Any]]] = [],
        tools: Union[List[Dict[str, Any]], None] = None,
        label: Union[str, None] = None,
    ):
        input_json = {}
        if isinstance(input, str):
            input_json = {"input": input, "task_type": self._task_type}
        else:
            raise ValueError(f"Unexpected type for input: {type(input)}")

        input_text = json.dumps(input_json, ensure_ascii=False)
        return dict(input=input_text), history, tools, label

    def transform(self, node: DocNode, **kwargs) -> List[str]:
        """
在指定的文档上执行设定的任务。

Args:
    node (DocNode): 需要执行抽取任务的文档。


Examples:

    >>> import lazyllm
    >>> from lazyllm.tools import LLMParser, TrainableModule
    >>> llm = TrainableModule("internlm2-chat-7b")
    >>> m = lazyllm.TrainableModule("bge-large-zh-v1.5")
    >>> summary_parser = LLMParser(llm, language="en", task_type="summary")
    >>> keywords_parser = LLMParser(llm, language="en", task_type="keywords")
    >>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
    >>> rm = Retriever(documents, group_name='CoarseChunk', similarity='bm25', similarity_cut_off=0.01, topk=6)
    >>> summary_result = summary_parser.transform(rm[0])
    >>> keywords_result = keywords_parser.transform(rm[0])
    """
        result = self._llm(node.get_text())
        results = [result] if isinstance(result, str) else result
        LOG.debug(f"LLMParser({self._task_type}) with input: {node.get_text()}")
        return results

transform(node, **kwargs)

在指定的文档上执行设定的任务。

Parameters:

  • node (DocNode) –

    需要执行抽取任务的文档。

Examples:

>>> import lazyllm
>>> from lazyllm.tools import LLMParser, TrainableModule
>>> llm = TrainableModule("internlm2-chat-7b")
>>> m = lazyllm.TrainableModule("bge-large-zh-v1.5")
>>> summary_parser = LLMParser(llm, language="en", task_type="summary")
>>> keywords_parser = LLMParser(llm, language="en", task_type="keywords")
>>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
>>> rm = Retriever(documents, group_name='CoarseChunk', similarity='bm25', similarity_cut_off=0.01, topk=6)
>>> summary_result = summary_parser.transform(rm[0])
>>> keywords_result = keywords_parser.transform(rm[0])
Source code in lazyllm/tools/rag/transform.py
    def transform(self, node: DocNode, **kwargs) -> List[str]:
        """
在指定的文档上执行设定的任务。

Args:
    node (DocNode): 需要执行抽取任务的文档。


Examples:

    >>> import lazyllm
    >>> from lazyllm.tools import LLMParser, TrainableModule
    >>> llm = TrainableModule("internlm2-chat-7b")
    >>> m = lazyllm.TrainableModule("bge-large-zh-v1.5")
    >>> summary_parser = LLMParser(llm, language="en", task_type="summary")
    >>> keywords_parser = LLMParser(llm, language="en", task_type="keywords")
    >>> documents = Document(dataset_path='your_doc_path', embed=m, create_ui=False)
    >>> rm = Retriever(documents, group_name='CoarseChunk', similarity='bm25', similarity_cut_off=0.01, topk=6)
    >>> summary_result = summary_parser.transform(rm[0])
    >>> keywords_result = keywords_parser.transform(rm[0])
    """
        result = self._llm(node.get_text())
        results = [result] if isinstance(result, str) else result
        LOG.debug(f"LLMParser({self._task_type}) with input: {node.get_text()}")
        return results

lazyllm.tools.WebModule

Bases: ModuleBase

WebModule是LazyLLM为开发者提供的基于Web的交互界面。在初始化并启动一个WebModule之后,开发者可以从页面上看到WebModule背后的模块结构,并将Chatbot组件的输入传输给自己开发的模块进行处理。 模块返回的结果和日志会直接显示在网页的“处理日志”和Chatbot组件上。除此之外,WebModule支持在网页上动态加入Checkbox或Text组件用于向模块发送额外的参数。 WebModule页面还提供“使用上下文”,“流式输出”和“追加输出”的Checkbox,可以用来改变页面和后台模块的交互方式。

WebModule.init_web(component_descs) -> gradio.Blocks 使用gradio库生成演示web页面,初始化session相关数据以便在不同的页面保存各自的对话和日志,然后使用传入的component_descs参数为页面动态添加Checkbox和Text组件,最后设置页面上的按钮和文本框的相应函数 之后返回整个页面。WebModule的__init__函数调用此方法生成页面。

Parameters:

  • component_descs (list) –

    用于动态向页面添加组件的列表。列表中的每个元素也是一个列表,其中包含5个元素,分别是组件对应的模块ID,模块名,组件名,组件类型(目前仅支持Checkbox和Text),组件默认值。

Examples:

>>> import lazyllm
>>> def func2(in_str, do_sample=True, temperature=0.0, *args, **kwargs):
...     return f"func2:{in_str}|do_sample:{str(do_sample)}|temp:{temperature}"
...
>>> m1=lazyllm.ActionModule(func2)
>>> m1.name="Module1"
>>> w = lazyllm.WebModule(m1, port=[20570, 20571, 20572], components={
...         m1:[('do_sample', 'Checkbox', True), ('temperature', 'Text', 0.1)]},
...                       text_mode=lazyllm.tools.WebModule.Mode.Refresh)
>>> w.start()
193703: 2024-06-07 10:26:00 lazyllm SUCCESS: ...
Source code in lazyllm/tools/webpages/webmodule.py
class WebModule(ModuleBase):
    """WebModule是LazyLLM为开发者提供的基于Web的交互界面。在初始化并启动一个WebModule之后,开发者可以从页面上看到WebModule背后的模块结构,并将Chatbot组件的输入传输给自己开发的模块进行处理。
模块返回的结果和日志会直接显示在网页的“处理日志”和Chatbot组件上。除此之外,WebModule支持在网页上动态加入Checkbox或Text组件用于向模块发送额外的参数。
WebModule页面还提供“使用上下文”,“流式输出”和“追加输出”的Checkbox,可以用来改变页面和后台模块的交互方式。

<span style="font-size: 20px;">&ensp;**`WebModule.init_web(component_descs) -> gradio.Blocks`**</span>
使用gradio库生成演示web页面,初始化session相关数据以便在不同的页面保存各自的对话和日志,然后使用传入的component_descs参数为页面动态添加Checkbox和Text组件,最后设置页面上的按钮和文本框的相应函数
之后返回整个页面。WebModule的__init__函数调用此方法生成页面。

Args:
    component_descs (list): 用于动态向页面添加组件的列表。列表中的每个元素也是一个列表,其中包含5个元素,分别是组件对应的模块ID,模块名,组件名,组件类型(目前仅支持Checkbox和Text),组件默认值。


Examples:
    >>> import lazyllm
    >>> def func2(in_str, do_sample=True, temperature=0.0, *args, **kwargs):
    ...     return f"func2:{in_str}|do_sample:{str(do_sample)}|temp:{temperature}"
    ...
    >>> m1=lazyllm.ActionModule(func2)
    >>> m1.name="Module1"
    >>> w = lazyllm.WebModule(m1, port=[20570, 20571, 20572], components={
    ...         m1:[('do_sample', 'Checkbox', True), ('temperature', 'Text', 0.1)]},
    ...                       text_mode=lazyllm.tools.WebModule.Mode.Refresh)
    >>> w.start()
    193703: 2024-06-07 10:26:00 lazyllm SUCCESS: ...
    """
    class Mode:
        Dynamic = 0
        Refresh = 1
        Appendix = 2

    def __init__(self, m, *, components=dict(), title='对话演示终端', port=range(20500, 20799),
                 history=[], text_mode=None, trace_mode=None, audio=False) -> None:
        super().__init__()
        self.m = lazyllm.ActionModule(m) if isinstance(m, lazyllm.FlowBase) else m
        self.pool = lazyllm.ThreadPoolExecutor(max_workers=50)
        self.title = title
        self.port = port
        components = sum([[([k._module_id, k._module_name] + list(v)) for v in vs]
                         for k, vs in components.items()], [])
        self.ckeys = [[c[0], c[2]] for c in components]
        if isinstance(m, (OnlineChatModule, TrainableModule)) and not history:
            history = [m]
        self.history = [h._module_id for h in history]
        if trace_mode:
            LOG.warn('trace_mode is deprecated')
        self.text_mode = text_mode if text_mode else WebModule.Mode.Dynamic
        self.cach_path = self._set_up_caching()
        self.audio = audio
        self.demo = self.init_web(components)
        self.url = None
        signal.signal(signal.SIGINT, self._signal_handler)
        signal.signal(signal.SIGTERM, self._signal_handler)

    def _signal_handler(self, signum, frame):
        LOG.info(f"Signal {signum} received, terminating subprocess.")
        atexit._run_exitfuncs()
        sys.exit(0)

    def _set_up_caching(self):
        if 'GRADIO_TEMP_DIR' in os.environ:
            cach_path = os.environ['GRADIO_TEMP_DIR']
        else:
            cach_path = os.path.join(os.getcwd(), '.temp')
            os.environ['GRADIO_TEMP_DIR'] = cach_path
        if not os.path.exists(cach_path):
            os.makedirs(cach_path)
        return cach_path

    def init_web(self, component_descs):
        with gr.Blocks(css=css, title=self.title, analytics_enabled=False) as demo:
            sess_data = gr.State(value={
                'sess_titles': [''],
                'sess_logs': {},
                'sess_history': {},
                'sess_num': 1,
                'curr_sess': '',
                'frozen_query': '',
            })
            with gr.Row():
                with gr.Column(scale=3):
                    with gr.Row():
                        gr.Textbox(elem_id='module', interactive=False, show_label=True,
                                   label="模型结构", value=repr(self.m))
                    with gr.Row():
                        chat_use_context = gr.Checkbox(interactive=True, value=False, label="使用上下文")
                    with gr.Row():
                        stream_output = gr.Checkbox(interactive=True, value=True, label="流式输出")
                        text_mode = gr.Checkbox(interactive=(self.text_mode == WebModule.Mode.Dynamic),
                                                value=(self.text_mode != WebModule.Mode.Refresh), label="追加输出")
                    components = []
                    for _, gname, name, ctype, value in component_descs:
                        if ctype in ('Checkbox', 'Text'):
                            components.append(getattr(gr, ctype)(interactive=True, value=value, label=f'{gname}.{name}'))
                        elif ctype == 'Dropdown':
                            components.append(getattr(gr, ctype)(interactive=True, choices=value,
                                                                 label=f'{gname}.{name}'))
                        else:
                            raise KeyError(f'invalid component type: {ctype}')
                    with gr.Row():
                        dbg_msg = gr.Textbox(show_label=True, label='处理日志',
                                             elem_id='logging', interactive=False, max_lines=10)
                    clear_btn = gr.Button(value="🗑️  Clear history", interactive=True)
                with gr.Column(scale=6):
                    with gr.Row():
                        add_sess_btn = gr.Button("添加新会话")
                        sess_drpdn = gr.Dropdown(choices=sess_data.value['sess_titles'], label="选择会话:", value='')
                        del_sess_btn = gr.Button("删除当前会话")
                    chatbot = gr.Chatbot(height=700)
                    query_box = gr.MultimodalTextbox(show_label=False, placeholder='输入内容并回车!!!', interactive=True)
                    recordor = gr.Audio(sources=["microphone"], type="filepath", visible=self.audio)

            query_box.submit(self._init_session, [query_box, sess_data, recordor],
                                                 [sess_drpdn, chatbot, dbg_msg, sess_data, recordor], queue=True
                ).then(lambda: gr.update(interactive=False), None, query_box, queue=False
                ).then(lambda: gr.update(interactive=False), None, add_sess_btn, queue=False
                ).then(lambda: gr.update(interactive=False), None, sess_drpdn, queue=False
                ).then(lambda: gr.update(interactive=False), None, del_sess_btn, queue=False
                ).then(self._prepare, [query_box, chatbot, sess_data], [query_box, chatbot], queue=True
                ).then(self._respond_stream, [chat_use_context, chatbot, stream_output, text_mode] + components,
                                             [chatbot, dbg_msg], queue=chatbot
                ).then(lambda: gr.update(interactive=True), None, query_box, queue=False
                ).then(lambda: gr.update(interactive=True), None, add_sess_btn, queue=False
                ).then(lambda: gr.update(interactive=True), None, sess_drpdn, queue=False
                ).then(lambda: gr.update(interactive=True), None, del_sess_btn, queue=False)
            clear_btn.click(self._clear_history, [sess_data], outputs=[chatbot, query_box, dbg_msg, sess_data])

            sess_drpdn.change(self._change_session, [sess_drpdn, chatbot, dbg_msg, sess_data],
                                                    [sess_drpdn, chatbot, query_box, dbg_msg, sess_data])
            add_sess_btn.click(self._add_session, [chatbot, dbg_msg, sess_data],
                                                  [sess_drpdn, chatbot, query_box, dbg_msg, sess_data])
            del_sess_btn.click(self._delete_session, [sess_drpdn, sess_data],
                                                     [sess_drpdn, chatbot, query_box, dbg_msg, sess_data])
            recordor.change(self._sub_audio, recordor, query_box)
            return demo

    def _sub_audio(self, audio):
        if audio:
            return {'text': '', 'files': [audio]}
        else:
            return {}

    def _init_session(self, query, session, audio):
        audio = None
        session['frozen_query'] = query
        if session['curr_sess'] != '':  # remain unchanged.
            return gr.Dropdown(), gr.Chatbot(), gr.Textbox(), session, audio

        if "text" in query and query["text"] is not None:
            id_name = query['text']
        else:
            id_name = id(id_name)
        session['curr_sess'] = f"({session['sess_num']})  {id_name}"
        session['sess_num'] += 1
        session['sess_titles'][0] = session['curr_sess']

        session['sess_logs'][session['curr_sess']] = []
        session['sess_history'][session['curr_sess']] = []
        return gr.update(choices=session['sess_titles'], value=session['curr_sess']), [], '', session, audio

    def _add_session(self, chat_history, log_history, session):
        if session['curr_sess'] == '':
            LOG.warning('Cannot create new session while current session is empty.')
            return gr.Dropdown(), gr.Chatbot(), {}, gr.Textbox(), session

        self._save_history(chat_history, log_history, session)

        session['curr_sess'] = ''
        session['sess_titles'].insert(0, session['curr_sess'])
        return gr.update(choices=session['sess_titles'], value=session['curr_sess']), [], {}, '', session

    def _save_history(self, chat_history, log_history, session):
        if session['curr_sess'] in session['sess_titles']:
            session['sess_history'][session['curr_sess']] = chat_history
            session['sess_logs'][session['curr_sess']] = log_history

    def _change_session(self, session_title, chat_history, log_history, session):
        if session['curr_sess'] == '':  # new session
            return gr.Dropdown(), [], {}, '', session

        if session_title not in session['sess_titles']:
            LOG.warning(f'{session_title} is not an existing session title.')
            return gr.Dropdown(), gr.Chatbot(), {}, gr.Textbox(), session

        self._save_history(chat_history, log_history, session)

        session['curr_sess'] = session_title
        return (gr.update(choices=session['sess_titles'], value=session['curr_sess']),
                session['sess_history'][session['curr_sess']], {},
                session['sess_logs'][session['curr_sess']], session)

    def _delete_session(self, session_title, session):
        if session_title not in session['sess_titles']:
            LOG.warning(f'session {session_title} does not exist.')
            return gr.Dropdown(), session
        session['sess_titles'].remove(session_title)

        if session_title != '':
            del session['sess_history'][session_title]
            del session['sess_logs'][session_title]
            session['curr_sess'] = session_title
        else:
            session['curr_sess'] = 'dummy session'
            # add_session and change_session cannot accept an uninitialized session.
            # Here we need to imitate removal of a real session so that
            # add_session and change_session could skip saving chat history.

        if len(session['sess_titles']) == 0:
            return self._add_session(None, None, session)
        else:
            return self._change_session(session['sess_titles'][0], None, {}, session)

    def _prepare(self, query, chat_history, session):
        if not query.get('text', '') and not query.get('files', []):
            query = session['frozen_query']
        if chat_history is None:
            chat_history = []
        for x in query["files"]:
            chat_history.append([[x,], None])
        if "text" in query and query["text"]:
            chat_history.append([query['text'], None])
        return {}, chat_history

    def _respond_stream(self, use_context, chat_history, stream_output, append_text, *args):  # noqa C901
        try:
            # TODO: move context to trainable module
            files = []
            chat_history[-1][1], log_history = '', []
            for file in chat_history[::-1]:
                if file[-1]: break  # not current chat
                if isinstance(file[0], (tuple, list)):
                    files.append(file[0][0])
                elif isinstance(file[0], str) and file[0].startswith('lazyllm_img::'):  # Just for pytest
                    files.append(file[0][13:])
            if isinstance(chat_history[-1][0], str):
                string = chat_history[-1][0]
            else:
                string = ''
            if files:
                globals['global_parameters']["lazyllm-files"] = {'files': files}
                if files[0]:
                    string += f' ## Get attachments: {os.path.basename(files[0])}'
            input = string
            history = chat_history[:-1] if use_context and len(chat_history) > 1 else list()

            for k, v in zip(self.ckeys, args):
                if k[0] not in globals['global_parameters']: globals['global_parameters'][k[0]] = dict()
                globals['global_parameters'][k[0]][k[1]] = v

            if use_context:
                for h in self.history:
                    if h not in globals['chat_history']: globals['chat_history'][h] = list()
                    globals['chat_history'][h] = history

            if FileSystemQueue().size() > 0: FileSystemQueue().clear()
            kw = dict(stream_output=stream_output) if isinstance(self.m, TrainableModule) else {}
            func_future = self.pool.submit(self.m, input, **kw)
            while True:
                if value := FileSystemQueue().dequeue():
                    chat_history[-1][1] += ''.join(value) if append_text else ''.join(value)
                    if stream_output: yield chat_history, ''
                elif value := FileSystemQueue.get_instance('lazy_error').dequeue():
                    log_history.append(''.join(value))
                elif value := FileSystemQueue.get_instance('lazy_trace').dequeue():
                    log_history.append(''.join(value))
                elif func_future.done(): break
                time.sleep(0.01)
            result = func_future.result()
            if FileSystemQueue().size() > 0: FileSystemQueue().clear()
            if files:
                globals['global_parameters']["lazyllm-files"].pop('files', None)

            def get_log_and_message(s):
                if isinstance(s, dict):
                    s = s.get("message", {}).get("content", "")
                else:
                    try:
                        r = json.loads(s)
                        if 'choices' in r:
                            if "type" not in r["choices"][0] or (
                                    "type" in r["choices"][0] and r["choices"][0]["type"] != "tool_calls"):
                                delta = r["choices"][0]["delta"]
                                if "content" in delta:
                                    s = delta["content"]
                                else:
                                    s = ""
                        elif 'images_base64' in r:
                            image_data = r.pop('images_base64')[0]
                            image = Image.open(BytesIO(base64.b64decode(image_data)))
                            return "The image is: ", "".join(log_history), {'img': image}
                        elif 'sounds' in r:
                            sound_data = r.pop('sounds')
                            sound_data = (sound_data[0], np.array(sound_data[1]).astype(np.int16))
                            return "The Audio is: ", "".join(log_history), {'audio': sound_data}
                        else:
                            s = s
                    except (ValueError, KeyError, TypeError):
                        s = s
                    except Exception as e:
                        LOG.error(f"Uncaptured error `{e}` when parsing `{s}`, please contact us if you see this.")
                return s, "".join(log_history), None

            file = None
            if isinstance(result, (str, dict)):
                result, log, file = get_log_and_message(result)
            if file:
                if 'img' in file:
                    chat_history[-1][1] = gr.Image(file['img'])
                if 'audio' in file:
                    chat_history[-1][1] = gr.Audio(file['audio'])
            else:
                assert isinstance(result, (str, dict)), f'Result should only be str, but got {type(result)}'
                if isinstance(result, dict): result = result.get('message', '')
                count = (len(match.group(1)) if (match := re.search(r'(\n+)$', result)) else 0) + len(result) + 1
                if result and not (result in chat_history[-1][1][-count:]):
                    chat_history[-1][1] += "\n\n" + result
        except requests.RequestException as e:
            chat_history = None
            log = str(e)
        except Exception as e:
            chat_history = None
            log = f'{str(e)}\n--- traceback ---\n{traceback.format_exc()}'
            LOG.error(log)
        globals['chat_history'].clear()
        yield chat_history, log

    def _clear_history(self, session):
        session['sess_history'][session['curr_sess']] = []
        session['sess_logs'][session['curr_sess']] = []
        return [], {}, '', session

    def _work(self):
        if isinstance(self.port, (range, tuple, list)):
            port = self._find_can_use_network_port()
        else:
            port = self.port
            assert self._verify_port_access(port), f'port {port} is occupied'

        self.url = f'http://0.0.0.0:{port}'
        self.demo.queue().launch(server_name="0.0.0.0", server_port=port, prevent_thread_lock=True)

    def _update(self, *, mode=None, recursive=True):
        super(__class__, self)._update(mode=mode, recursive=recursive)
        self._work()
        return self

    def _get_post_process_tasks(self):
        return pipeline(self._print_url)

    def wait(self):
        self.demo.block_thread()

    def stop(self):
        if self.demo:
            self.demo.close()
            del self.demo
            self.demo = None

    def __repr__(self):
        return lazyllm.make_repr('Module', 'Web', name=self._module_name, subs=[repr(self.m)])

    def _find_can_use_network_port(self):
        for port in self.port:
            if self._verify_port_access(port):
                return port
        raise RuntimeError(
            f'The ports in the range {self.port} are all occupied. '
            'Please change the port range or release the relevant ports.'
        )

    def _verify_port_access(self, port):
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            result = s.connect_ex(('localhost', port))
            return result != 0

    def _print_url(self):
        LOG.success(f'LazyLLM webmodule launched successfully: Running on local URL: {self.url}', flush=True)

lazyllm.tools.ToolManager

Bases: ModuleBase

ToolManager是一个工具管理类,用于提供工具信息和工具调用给function call。

此管理类构造时需要传入工具名字符串列表。此处工具名可以是LazyLLM提供的,也可以是用户自定义的,如果是用户自定义的,首先需要注册进LazyLLM中才可以使用。在注册时直接使用 fc_register 注册器,该注册器已经建立 tool group,所以使用该工具管理类时,所有函数都统一注册进 tool 分组即可。待注册的函数需要对函数参数进行注解,并且需要对函数增加功能描述,以及参数类型和作用描述。以方便工具管理类能对函数解析传给LLM使用。

Parameters:

  • tools (List[str]) –

    工具名称字符串列表。

Examples:

>>> from lazyllm.tools import ToolManager, fc_register
>>> import json
>>> from typing import Literal
>>> @fc_register("tool")
>>> def get_current_weather(location: str, unit: Literal["fahrenheit", "celsius"]="fahrenheit"):
...     '''
...     Get the current weather in a given location
...
...     Args:
...         location (str): The city and state, e.g. San Francisco, CA.
...         unit (str): The temperature unit to use. Infer this from the users location.
...     '''
...     if 'tokyo' in location.lower():
...         return json.dumps({'location': 'Tokyo', 'temperature': '10', 'unit': 'celsius'})
...     elif 'san francisco' in location.lower():
...         return json.dumps({'location': 'San Francisco', 'temperature': '72', 'unit': 'fahrenheit'})
...     elif 'paris' in location.lower():
...         return json.dumps({'location': 'Paris', 'temperature': '22', 'unit': 'celsius'})
...     elif 'beijing' in location.lower():
...         return json.dumps({'location': 'Beijing', 'temperature': '90', 'unit': 'fahrenheit'})
...     else:
...         return json.dumps({'location': location, 'temperature': 'unknown'})
...
>>> @fc_register("tool")
>>> def get_n_day_weather_forecast(location: str, num_days: int, unit: Literal["celsius", "fahrenheit"]='fahrenheit'):
...     '''
...     Get an N-day weather forecast
...
...     Args:
...         location (str): The city and state, e.g. San Francisco, CA.
...         num_days (int): The number of days to forecast.
...         unit (Literal['celsius', 'fahrenheit']): The temperature unit to use. Infer this from the users location.
...     '''
...     if 'tokyo' in location.lower():
...         return json.dumps({'location': 'Tokyo', 'temperature': '10', 'unit': 'celsius', "num_days": num_days})
...     elif 'san francisco' in location.lower():
...         return json.dumps({'location': 'San Francisco', 'temperature': '75', 'unit': 'fahrenheit', "num_days": num_days})
...     elif 'paris' in location.lower():
...         return json.dumps({'location': 'Paris', 'temperature': '25', 'unit': 'celsius', "num_days": num_days})
...     elif 'beijing' in location.lower():
...         return json.dumps({'location': 'Beijing', 'temperature': '85', 'unit': 'fahrenheit', "num_days": num_days})
...     else:
...         return json.dumps({'location': location, 'temperature': 'unknown'})
...
>>> tools = ["get_current_weather", "get_n_day_weather_forecast"]
>>> tm = ToolManager(tools)
>>> print(tm([{'name': 'get_n_day_weather_forecast', 'arguments': {'location': 'Beijing', 'num_days': 3}}])[0])
'{"location": "Beijing", "temperature": "85", "unit": "fahrenheit", "num_days": 3}'
Source code in lazyllm/tools/agent/toolsManager.py
class ToolManager(ModuleBase):
    """ToolManager是一个工具管理类,用于提供工具信息和工具调用给function call。

此管理类构造时需要传入工具名字符串列表。此处工具名可以是LazyLLM提供的,也可以是用户自定义的,如果是用户自定义的,首先需要注册进LazyLLM中才可以使用。在注册时直接使用 `fc_register` 注册器,该注册器已经建立 `tool` group,所以使用该工具管理类时,所有函数都统一注册进 `tool` 分组即可。待注册的函数需要对函数参数进行注解,并且需要对函数增加功能描述,以及参数类型和作用描述。以方便工具管理类能对函数解析传给LLM使用。

Args:
    tools (List[str]): 工具名称字符串列表。


Examples:
    >>> from lazyllm.tools import ToolManager, fc_register
    >>> import json
    >>> from typing import Literal
    >>> @fc_register("tool")
    >>> def get_current_weather(location: str, unit: Literal["fahrenheit", "celsius"]="fahrenheit"):
    ...     '''
    ...     Get the current weather in a given location
    ...
    ...     Args:
    ...         location (str): The city and state, e.g. San Francisco, CA.
    ...         unit (str): The temperature unit to use. Infer this from the users location.
    ...     '''
    ...     if 'tokyo' in location.lower():
    ...         return json.dumps({'location': 'Tokyo', 'temperature': '10', 'unit': 'celsius'})
    ...     elif 'san francisco' in location.lower():
    ...         return json.dumps({'location': 'San Francisco', 'temperature': '72', 'unit': 'fahrenheit'})
    ...     elif 'paris' in location.lower():
    ...         return json.dumps({'location': 'Paris', 'temperature': '22', 'unit': 'celsius'})
    ...     elif 'beijing' in location.lower():
    ...         return json.dumps({'location': 'Beijing', 'temperature': '90', 'unit': 'fahrenheit'})
    ...     else:
    ...         return json.dumps({'location': location, 'temperature': 'unknown'})
    ...
    >>> @fc_register("tool")
    >>> def get_n_day_weather_forecast(location: str, num_days: int, unit: Literal["celsius", "fahrenheit"]='fahrenheit'):
    ...     '''
    ...     Get an N-day weather forecast
    ...
    ...     Args:
    ...         location (str): The city and state, e.g. San Francisco, CA.
    ...         num_days (int): The number of days to forecast.
    ...         unit (Literal['celsius', 'fahrenheit']): The temperature unit to use. Infer this from the users location.
    ...     '''
    ...     if 'tokyo' in location.lower():
    ...         return json.dumps({'location': 'Tokyo', 'temperature': '10', 'unit': 'celsius', "num_days": num_days})
    ...     elif 'san francisco' in location.lower():
    ...         return json.dumps({'location': 'San Francisco', 'temperature': '75', 'unit': 'fahrenheit', "num_days": num_days})
    ...     elif 'paris' in location.lower():
    ...         return json.dumps({'location': 'Paris', 'temperature': '25', 'unit': 'celsius', "num_days": num_days})
    ...     elif 'beijing' in location.lower():
    ...         return json.dumps({'location': 'Beijing', 'temperature': '85', 'unit': 'fahrenheit', "num_days": num_days})
    ...     else:
    ...         return json.dumps({'location': location, 'temperature': 'unknown'})
    ...
    >>> tools = ["get_current_weather", "get_n_day_weather_forecast"]
    >>> tm = ToolManager(tools)
    >>> print(tm([{'name': 'get_n_day_weather_forecast', 'arguments': {'location': 'Beijing', 'num_days': 3}}])[0])
    '{"location": "Beijing", "temperature": "85", "unit": "fahrenheit", "num_days": 3}'
    """
    def __init__(self, tools: List[str], return_trace: bool = False):
        super().__init__(return_trace=return_trace)
        self._tools = self._load_tools(tools)
        self._format_tools()
        self._tools_desc = self._transform_to_openai_function()

    def _load_tools(self, tools_str: List[str]):
        _tools = []
        for tool_str in tools_str:
            tool_all_str = tool_str + "tool".capitalize()
            t = lazyllm.tool.get(tool_all_str, None)
            if t:
                _tools.append(t())
            else:
                raise ValueError(f"Tool {tool_str} has not been registered yet.")
        return _tools

    @property
    def all_tools(self):
        return self._tools

    @property
    def tools_description(self):
        return self._tools_desc

    @property
    def tools_info(self):
        return self._tool_call

    def _validate_tool(self, tool_name: str, tool_arguments: Dict[str, Any]):
        # Does the tool exists
        tool = self._tool_call.get(tool_name)
        if tool: return tool.validate_parameters(tool_arguments)
        return False

    def _format_tools(self):
        if isinstance(self._tools, List):
            self._tool_call = {tool.name: tool for tool in self._tools}

    def _transform_to_openai_function(self):
        if isinstance(self._tools, List):
            format_tools = []
            for tool in self._tools:
                try:
                    parsed = docstring_parser.parse(tool.description)
                    tool_args = tool.args
                    assert len(tool_args) == len(parsed.params), ("The parameter description and the actual "
                                                                  "number of input parameters are inconsistent.")
                    args_description = {}
                    for param in parsed.params:
                        args_description[param.arg_name] = param.description
                    args = {}
                    for k, v in tool_args.items():
                        val = copy.deepcopy(v)
                        if "title" in val.keys():
                            del val["title"]
                        if "default" in val.keys():
                            del val["default"]
                        args[k] = val if val else {"type": "string"}
                        if k in args_description:
                            args[k].update({"description": args_description[k]})
                        else:
                            raise ValueError(f"The actual input parameter {k} is not found "
                                             "in the parameter description.")
                    func = {
                        "type": "function",
                        "function": {
                            "name": tool.name,
                            "description": parsed.short_description,
                            "parameters": {
                                "type": "object",
                                "properties": args,
                                "required": tool.get_params_schema().model_json_schema().get("required", [])
                            }
                        }
                    }
                    format_tools.append(func)
                except Exception:
                    typehints_template = """
                    def myfunc(arg1: str, arg2: Dict[str, Any], arg3: Literal["aaa", "bbb", "ccc"]="aaa"):
                        '''
                        Function description ...

                        Args:
                            arg1 (str): arg1 description.
                            arg2 (Dict[str, Any]): arg2 description
                            arg3 (Literal["aaa", "bbb", "ccc"]): arg3 description
                        '''
                    """
                    raise TypeError("Function description must include function description and"
                                    f"parameter description, the format is as follows: {typehints_template}")
            return format_tools
        else:
            raise TypeError(f"The tools type should be List instead of {type(self._tools)}")

    def forward(self, tools: List[Dict[str, Any]], verbose: bool = False):
        def process_tool_call(tool_calls):
            tool_calls = [{"name": tool['name'], "arguments": json.loads(tool['arguments'])
                           if isinstance(tool['arguments'], str) else tool['arguments']} for tool in tool_calls]
            tool_output = []
            flag_val = [True if self._validate_tool(tool['name'], tool['arguments']) else False for tool in tool_calls]
            tool_inputs = [tool_calls[idx]['arguments'] for idx, val in enumerate(flag_val) if val]
            tools = [self._tool_call[tool_calls[idx]['name']] for idx, val in enumerate(flag_val) if val]
            tool_diverter = lazyllm.diverter(tuple(tools))
            rets = tool_diverter(tuple(tool_inputs))
            res = iter(rets)
            rets = [next(res) if ele else None for ele in flag_val]
            for idx, tool in enumerate(tool_calls):
                if flag_val[idx]:
                    ret = rets[idx]
                    tool_output.append(json.dumps(ret, ensure_ascii=False) if not isinstance(ret, str) else ret)
                else:
                    tool_output.append(f"{tool} parameters error.")

            return tool_output
        output = process_tool_call(tools)
        return output

lazyllm.tools.FunctionCall

Bases: ModuleBase

FunctionCall是单轮工具调用类,如果LLM中的信息不足以回答用户的问题,必需结合外部知识来回答用户问题,则调用该类。如果LLM输出需要工具调用,则进行工具调用,并输出工具调用结果,输出结果为List类型,包含当前轮的输入、模型输出、工具输出。如果不需要工具调用,则直接输出LLM结果,输出结果为string类型。

Parameters:

  • llm (ModuleBase) –

    要使用的LLM可以是TrainableModule或OnlineChatModule。

  • tools (List[str]) –

    LLM使用的工具名称列表。

Examples:

>>> import lazyllm
>>> from lazyllm.tools import fc_register, FunctionCall
>>> import json
>>> from typing import Literal
>>> @fc_register("tool")
>>> def get_current_weather(location: str, unit: Literal["fahrenheit", "celsius"] = 'fahrenheit'):
...     '''
...     Get the current weather in a given location
...
...     Args:
...         location (str): The city and state, e.g. San Francisco, CA.
...         unit (str): The temperature unit to use. Infer this from the users location.
...     '''
...     if 'tokyo' in location.lower():
...         return json.dumps({'location': 'Tokyo', 'temperature': '10', 'unit': 'celsius'})
...     elif 'san francisco' in location.lower():
...         return json.dumps({'location': 'San Francisco', 'temperature': '72', 'unit': 'fahrenheit'})
...     elif 'paris' in location.lower():
...         return json.dumps({'location': 'Paris', 'temperature': '22', 'unit': 'celsius'})
...     else:
...         return json.dumps({'location': location, 'temperature': 'unknown'})
...
>>> @fc_register("tool")
>>> def get_n_day_weather_forecast(location: str, num_days: int, unit: Literal["celsius", "fahrenheit"] = 'fahrenheit'):
...     '''
...     Get an N-day weather forecast
...
...     Args:
...         location (str): The city and state, e.g. San Francisco, CA.
...         num_days (int): The number of days to forecast.
...         unit (Literal['celsius', 'fahrenheit']): The temperature unit to use. Infer this from the users location.
...     '''
...     if 'tokyo' in location.lower():
...         return json.dumps({'location': 'Tokyo', 'temperature': '10', 'unit': 'celsius', "num_days": num_days})
...     elif 'san francisco' in location.lower():
...         return json.dumps({'location': 'San Francisco', 'temperature': '72', 'unit': 'fahrenheit', "num_days": num_days})
...     elif 'paris' in location.lower():
...         return json.dumps({'location': 'Paris', 'temperature': '22', 'unit': 'celsius', "num_days": num_days})
...     else:
...         return json.dumps({'location': location, 'temperature': 'unknown'})
...
>>> tools=["get_current_weather", "get_n_day_weather_forecast"]
>>> llm = lazyllm.TrainableModule("internlm2-chat-20b").start()  # or llm = lazyllm.OnlineChatModule("openai", stream=False)
>>> query = "What's the weather like today in celsius in Tokyo."
>>> fc = FunctionCall(llm, tools)
>>> ret = fc(query)
>>> print(ret)
["What's the weather like today in celsius in Tokyo.", {'role': 'assistant', 'content': '
', 'tool_calls': [{'id': 'da19cddac0584869879deb1315356d2a', 'type': 'function', 'function': {'name': 'get_current_weather', 'arguments': {'location': 'Tokyo', 'unit': 'celsius'}}}]}, [{'role': 'tool', 'content': '{"location": "Tokyo", "temperature": "10", "unit": "celsius"}', 'tool_call_id': 'da19cddac0584869879deb1315356d2a', 'name': 'get_current_weather'}]]
>>> query = "Hello"
>>> ret = fc(query)
>>> print(ret)
'Hello! How can I assist you today?'
Source code in lazyllm/tools/agent/functionCall.py
class FunctionCall(ModuleBase):
    """FunctionCall是单轮工具调用类,如果LLM中的信息不足以回答用户的问题,必需结合外部知识来回答用户问题,则调用该类。如果LLM输出需要工具调用,则进行工具调用,并输出工具调用结果,输出结果为List类型,包含当前轮的输入、模型输出、工具输出。如果不需要工具调用,则直接输出LLM结果,输出结果为string类型。

Args:
    llm (ModuleBase): 要使用的LLM可以是TrainableModule或OnlineChatModule。
    tools (List[str]): LLM使用的工具名称列表。


Examples:
    >>> import lazyllm
    >>> from lazyllm.tools import fc_register, FunctionCall
    >>> import json
    >>> from typing import Literal
    >>> @fc_register("tool")
    >>> def get_current_weather(location: str, unit: Literal["fahrenheit", "celsius"] = 'fahrenheit'):
    ...     '''
    ...     Get the current weather in a given location
    ...
    ...     Args:
    ...         location (str): The city and state, e.g. San Francisco, CA.
    ...         unit (str): The temperature unit to use. Infer this from the users location.
    ...     '''
    ...     if 'tokyo' in location.lower():
    ...         return json.dumps({'location': 'Tokyo', 'temperature': '10', 'unit': 'celsius'})
    ...     elif 'san francisco' in location.lower():
    ...         return json.dumps({'location': 'San Francisco', 'temperature': '72', 'unit': 'fahrenheit'})
    ...     elif 'paris' in location.lower():
    ...         return json.dumps({'location': 'Paris', 'temperature': '22', 'unit': 'celsius'})
    ...     else:
    ...         return json.dumps({'location': location, 'temperature': 'unknown'})
    ...
    >>> @fc_register("tool")
    >>> def get_n_day_weather_forecast(location: str, num_days: int, unit: Literal["celsius", "fahrenheit"] = 'fahrenheit'):
    ...     '''
    ...     Get an N-day weather forecast
    ...
    ...     Args:
    ...         location (str): The city and state, e.g. San Francisco, CA.
    ...         num_days (int): The number of days to forecast.
    ...         unit (Literal['celsius', 'fahrenheit']): The temperature unit to use. Infer this from the users location.
    ...     '''
    ...     if 'tokyo' in location.lower():
    ...         return json.dumps({'location': 'Tokyo', 'temperature': '10', 'unit': 'celsius', "num_days": num_days})
    ...     elif 'san francisco' in location.lower():
    ...         return json.dumps({'location': 'San Francisco', 'temperature': '72', 'unit': 'fahrenheit', "num_days": num_days})
    ...     elif 'paris' in location.lower():
    ...         return json.dumps({'location': 'Paris', 'temperature': '22', 'unit': 'celsius', "num_days": num_days})
    ...     else:
    ...         return json.dumps({'location': location, 'temperature': 'unknown'})
    ...
    >>> tools=["get_current_weather", "get_n_day_weather_forecast"]
    >>> llm = lazyllm.TrainableModule("internlm2-chat-20b").start()  # or llm = lazyllm.OnlineChatModule("openai", stream=False)
    >>> query = "What's the weather like today in celsius in Tokyo."
    >>> fc = FunctionCall(llm, tools)
    >>> ret = fc(query)
    >>> print(ret)
    ["What's the weather like today in celsius in Tokyo.", {'role': 'assistant', 'content': '
    ', 'tool_calls': [{'id': 'da19cddac0584869879deb1315356d2a', 'type': 'function', 'function': {'name': 'get_current_weather', 'arguments': {'location': 'Tokyo', 'unit': 'celsius'}}}]}, [{'role': 'tool', 'content': '{"location": "Tokyo", "temperature": "10", "unit": "celsius"}', 'tool_call_id': 'da19cddac0584869879deb1315356d2a', 'name': 'get_current_weather'}]]
    >>> query = "Hello"
    >>> ret = fc(query)
    >>> print(ret)
    'Hello! How can I assist you today?'
    """
    def __init__(self, llm, tools: List[str], *, return_trace: bool = False, _prompt: str = None):
        super().__init__(return_trace=return_trace)
        if isinstance(llm, OnlineChatModule) and llm.series == "QWEN" and llm._stream is True:
            raise ValueError("The qwen platform does not currently support stream function calls.")
        if _prompt is None:
            _prompt = FC_PROMPT_ONLINE if isinstance(llm, OnlineChatModule) else FC_PROMPT_LOCAL

        self._tools_manager = ToolManager(tools, return_trace=return_trace)
        self._prompter = ChatPrompter(instruction=_prompt, tools=self._tools_manager.tools_description)\
            .pre_hook(function_call_hook)
        self._llm = llm.share(prompt=self._prompter, format=FunctionCallFormatter())
        with pipeline() as self._impl:
            self._impl.m1 = self._llm
            self._impl.m2 = self._parser
            self._impl.m3 = ifs(lambda x: isinstance(x, list), self._tools_manager, lambda out: out)
            self._impl.m4 = self._tool_post_action | bind(input=self._impl.input, llm_output=self._impl.m1)

    def _parser(self, llm_output: Union[str, List[Dict[str, Any]]]):
        LOG.debug(f"llm_output: {llm_output}")
        if isinstance(llm_output, list):
            res = []
            for item in llm_output:
                if isinstance(item, str):
                    continue
                arguments = item.get('function', {}).get('arguments', '')
                arguments = json.loads(arguments) if isinstance(arguments, str) else arguments
                res.append({"name": item.get('function', {}).get('name', ''), 'arguments': arguments})
            return res
        elif isinstance(llm_output, str):
            return llm_output
        else:
            raise TypeError(f"The {llm_output} type currently is only supports `list` and `str`,"
                            f" and does not support {type(llm_output)}.")

    def _tool_post_action(self, output: Union[str, List[str]], input: Union[str, List],
                          llm_output: List[Dict[str, Any]]):
        if isinstance(output, list):
            ret = []
            if isinstance(input, str):
                ret.append(input)
            elif isinstance(input, list):
                ret.append(input[-1])
            else:
                raise TypeError(f"The input type currently only supports `str` and `list`, "
                                f"and does not support {type(input)}.")

            content = "".join([item for item in llm_output if isinstance(item, str)])
            llm_output = [item for item in llm_output if not isinstance(item, str)]
            ret.append({"role": "assistant", "content": content, "tool_calls": llm_output})
            ret.append([{"role": "tool", "content": out, "tool_call_id": llm_output[idx]["id"],
                         "name": llm_output[idx]["function"]["name"]}
                        for idx, out in enumerate(output)])
            LOG.debug(f"functionCall result: {ret}")
            return ret
        elif isinstance(output, str):
            return output
        else:
            raise TypeError(f"The {output} type currently is only supports `list` and `str`,"
                            f" and does not support {type(output)}.")

    def forward(self, input: str, llm_chat_history: List[Dict[str, Any]] = None):
        globals['chat_history'].setdefault(self._llm._module_id, [])
        if llm_chat_history is not None:
            globals['chat_history'][self._llm._module_id] = llm_chat_history
        return self._impl(input)

lazyllm.tools.FunctionCallAgent

Bases: ModuleBase

FunctionCallAgent是一个使用工具调用方式进行完整工具调用的代理,即回答用户问题时,LLM如果需要通过工具获取外部知识,就会调用工具,并将工具的返回结果反馈给LLM,最后由LLM进行汇总输出。

Parameters:

  • llm (ModuleBase) –

    要使用的LLM,可以是TrainableModule或OnlineChatModule。

  • tools (List[str]) –

    LLM 使用的工具名称列表。

  • max_retries (int, default: 5 ) –

    工具调用迭代的最大次数。默认值为5。

Examples:

>>> import lazyllm
>>> from lazyllm.tools import fc_register, FunctionCallAgent
>>> import json
>>> from typing import Literal
>>> @fc_register("tool")
>>> def get_current_weather(location: str, unit: Literal["fahrenheit", "celsius"]='fahrenheit'):
...     '''
...     Get the current weather in a given location
...
...     Args:
...         location (str): The city and state, e.g. San Francisco, CA.
...         unit (str): The temperature unit to use. Infer this from the users location.
...     '''
...     if 'tokyo' in location.lower():
...         return json.dumps({'location': 'Tokyo', 'temperature': '10', 'unit': 'celsius'})
...     elif 'san francisco' in location.lower():
...         return json.dumps({'location': 'San Francisco', 'temperature': '72', 'unit': 'fahrenheit'})
...     elif 'paris' in location.lower():
...         return json.dumps({'location': 'Paris', 'temperature': '22', 'unit': 'celsius'})
...     elif 'beijing' in location.lower():
...         return json.dumps({'location': 'Beijing', 'temperature': '90', 'unit': 'Fahrenheit'})
...     else:
...         return json.dumps({'location': location, 'temperature': 'unknown'})
...
>>> @fc_register("tool")
>>> def get_n_day_weather_forecast(location: str, num_days: int, unit: Literal["celsius", "fahrenheit"]='fahrenheit'):
...     '''
...     Get an N-day weather forecast
...
...     Args:
...         location (str): The city and state, e.g. San Francisco, CA.
...         num_days (int): The number of days to forecast.
...         unit (Literal['celsius', 'fahrenheit']): The temperature unit to use. Infer this from the users location.
...     '''
...     if 'tokyo' in location.lower():
...         return json.dumps({'location': 'Tokyo', 'temperature': '10', 'unit': 'celsius', "num_days": num_days})
...     elif 'san francisco' in location.lower():
...         return json.dumps({'location': 'San Francisco', 'temperature': '75', 'unit': 'fahrenheit', "num_days": num_days})
...     elif 'paris' in location.lower():
...         return json.dumps({'location': 'Paris', 'temperature': '25', 'unit': 'celsius', "num_days": num_days})
...     elif 'beijing' in location.lower():
...         return json.dumps({'location': 'Beijing', 'temperature': '85', 'unit': 'fahrenheit', "num_days": num_days})
...     else:
...         return json.dumps({'location': location, 'temperature': 'unknown'})
...
>>> tools = ['get_current_weather', 'get_n_day_weather_forecast']
>>> llm = lazyllm.TrainableModule("internlm2-chat-20b").start()  # or llm = lazyllm.OnlineChatModule(source="sensenova")
>>> agent = FunctionCallAgent(llm, tools)
>>> query = "What's the weather like today in celsius in Tokyo and Paris."
>>> res = agent(query)
>>> print(res)
'The current weather in Tokyo is 10 degrees Celsius, and in Paris, it is 22 degrees Celsius.'
>>> query = "Hello"
>>> res = agent(query)
>>> print(res)
'Hello! How can I assist you today?'
Source code in lazyllm/tools/agent/functionCall.py
class FunctionCallAgent(ModuleBase):
    """FunctionCallAgent是一个使用工具调用方式进行完整工具调用的代理,即回答用户问题时,LLM如果需要通过工具获取外部知识,就会调用工具,并将工具的返回结果反馈给LLM,最后由LLM进行汇总输出。

Args:
    llm (ModuleBase): 要使用的LLM,可以是TrainableModule或OnlineChatModule。
    tools (List[str]): LLM 使用的工具名称列表。
    max_retries (int): 工具调用迭代的最大次数。默认值为5。


Examples:
    >>> import lazyllm
    >>> from lazyllm.tools import fc_register, FunctionCallAgent
    >>> import json
    >>> from typing import Literal
    >>> @fc_register("tool")
    >>> def get_current_weather(location: str, unit: Literal["fahrenheit", "celsius"]='fahrenheit'):
    ...     '''
    ...     Get the current weather in a given location
    ...
    ...     Args:
    ...         location (str): The city and state, e.g. San Francisco, CA.
    ...         unit (str): The temperature unit to use. Infer this from the users location.
    ...     '''
    ...     if 'tokyo' in location.lower():
    ...         return json.dumps({'location': 'Tokyo', 'temperature': '10', 'unit': 'celsius'})
    ...     elif 'san francisco' in location.lower():
    ...         return json.dumps({'location': 'San Francisco', 'temperature': '72', 'unit': 'fahrenheit'})
    ...     elif 'paris' in location.lower():
    ...         return json.dumps({'location': 'Paris', 'temperature': '22', 'unit': 'celsius'})
    ...     elif 'beijing' in location.lower():
    ...         return json.dumps({'location': 'Beijing', 'temperature': '90', 'unit': 'Fahrenheit'})
    ...     else:
    ...         return json.dumps({'location': location, 'temperature': 'unknown'})
    ...
    >>> @fc_register("tool")
    >>> def get_n_day_weather_forecast(location: str, num_days: int, unit: Literal["celsius", "fahrenheit"]='fahrenheit'):
    ...     '''
    ...     Get an N-day weather forecast
    ...
    ...     Args:
    ...         location (str): The city and state, e.g. San Francisco, CA.
    ...         num_days (int): The number of days to forecast.
    ...         unit (Literal['celsius', 'fahrenheit']): The temperature unit to use. Infer this from the users location.
    ...     '''
    ...     if 'tokyo' in location.lower():
    ...         return json.dumps({'location': 'Tokyo', 'temperature': '10', 'unit': 'celsius', "num_days": num_days})
    ...     elif 'san francisco' in location.lower():
    ...         return json.dumps({'location': 'San Francisco', 'temperature': '75', 'unit': 'fahrenheit', "num_days": num_days})
    ...     elif 'paris' in location.lower():
    ...         return json.dumps({'location': 'Paris', 'temperature': '25', 'unit': 'celsius', "num_days": num_days})
    ...     elif 'beijing' in location.lower():
    ...         return json.dumps({'location': 'Beijing', 'temperature': '85', 'unit': 'fahrenheit', "num_days": num_days})
    ...     else:
    ...         return json.dumps({'location': location, 'temperature': 'unknown'})
    ...
    >>> tools = ['get_current_weather', 'get_n_day_weather_forecast']
    >>> llm = lazyllm.TrainableModule("internlm2-chat-20b").start()  # or llm = lazyllm.OnlineChatModule(source="sensenova")
    >>> agent = FunctionCallAgent(llm, tools)
    >>> query = "What's the weather like today in celsius in Tokyo and Paris."
    >>> res = agent(query)
    >>> print(res)
    'The current weather in Tokyo is 10 degrees Celsius, and in Paris, it is 22 degrees Celsius.'
    >>> query = "Hello"
    >>> res = agent(query)
    >>> print(res)
    'Hello! How can I assist you today?'
    """
    def __init__(self, llm, tools: List[str], max_retries: int = 5, return_trace: bool = False):
        super().__init__(return_trace=return_trace)
        self._max_retries = max_retries
        self._agent = loop(FunctionCall(llm, tools, return_trace=return_trace),
                           stop_condition=lambda x: isinstance(x, str), count=self._max_retries)

    def forward(self, query: str, llm_chat_history: List[Dict[str, Any]] = None):
        ret = self._agent(query, llm_chat_history) if llm_chat_history is not None else self._agent(query)
        return ret if isinstance(ret, str) else (_ for _ in ()).throw(
            ValueError(f"After retrying {self._max_retries} times, the function call agent still "
                       "failed to call successfully."))

lazyllm.tools.ReactAgent

Bases: ModuleBase

ReactAgent是按照 Thought->Action->Observation->Thought...->Finish 的流程一步一步的通过LLM和工具调用来显示解决用户问题的步骤,以及最后给用户的答案。

Parameters:

  • llm (ModuleBase) –

    要使用的LLM,可以是TrainableModule或OnlineChatModule。

  • tools (List[str]) –

    LLM 使用的工具名称列表。

  • max_retries (int, default: 5 ) –

    工具调用迭代的最大次数。默认值为5。

Examples:

>>> import lazyllm
>>> from lazyllm.tools import fc_register, ReactAgent
>>> @fc_register("tool")
>>> def multiply_tool(a: int, b: int) -> int:
...     '''
...     Multiply two integers and return the result integer
...
...     Args:
...         a (int): multiplier
...         b (int): multiplier
...     '''
...     return a * b
...
>>> @fc_register("tool")
>>> def add_tool(a: int, b: int):
...     '''
...     Add two integers and returns the result integer
...
...     Args:
...         a (int): addend
...         b (int): addend
...     '''
...     return a + b
...
>>> tools = ["multiply_tool", "add_tool"]
>>> llm = lazyllm.TrainableModule("internlm2-chat-20b").start()   # or llm = lazyllm.OnlineChatModule(source="sensenova")
>>> agent = ReactAgent(llm, tools)
>>> query = "What is 20+(2*4)? Calculate step by step."
>>> res = agent(query)
>>> print(res)
'Answer: The result of 20+(2*4) is 28.'
Source code in lazyllm/tools/agent/reactAgent.py
class ReactAgent(ModuleBase):
    """ReactAgent是按照 `Thought->Action->Observation->Thought...->Finish` 的流程一步一步的通过LLM和工具调用来显示解决用户问题的步骤,以及最后给用户的答案。

Args:
    llm (ModuleBase): 要使用的LLM,可以是TrainableModule或OnlineChatModule。
    tools (List[str]): LLM 使用的工具名称列表。
    max_retries (int): 工具调用迭代的最大次数。默认值为5。


Examples:
    >>> import lazyllm
    >>> from lazyllm.tools import fc_register, ReactAgent
    >>> @fc_register("tool")
    >>> def multiply_tool(a: int, b: int) -> int:
    ...     '''
    ...     Multiply two integers and return the result integer
    ...
    ...     Args:
    ...         a (int): multiplier
    ...         b (int): multiplier
    ...     '''
    ...     return a * b
    ...
    >>> @fc_register("tool")
    >>> def add_tool(a: int, b: int):
    ...     '''
    ...     Add two integers and returns the result integer
    ...
    ...     Args:
    ...         a (int): addend
    ...         b (int): addend
    ...     '''
    ...     return a + b
    ...
    >>> tools = ["multiply_tool", "add_tool"]
    >>> llm = lazyllm.TrainableModule("internlm2-chat-20b").start()   # or llm = lazyllm.OnlineChatModule(source="sensenova")
    >>> agent = ReactAgent(llm, tools)
    >>> query = "What is 20+(2*4)? Calculate step by step."
    >>> res = agent(query)
    >>> print(res)
    'Answer: The result of 20+(2*4) is 28.'
    """
    def __init__(self, llm, tools: List[str], max_retries: int = 5, return_trace: bool = False):
        super().__init__(return_trace=return_trace)
        self._max_retries = max_retries
        assert llm and tools, "llm and tools cannot be empty."
        prompt = INSTRUCTION.replace("{TOKENIZED_PROMPT}", WITHOUT_TOKEN_PROMPT if isinstance(llm, OnlineChatModule)
                                     else WITH_TOKEN_PROMPT)
        self._agent = loop(FunctionCall(llm, tools,
                                        _prompt=prompt.replace("{tool_names}", json.dumps(tools, ensure_ascii=False)),
                                        return_trace=return_trace),
                           stop_condition=lambda x: isinstance(x, str), count=self._max_retries)

    def forward(self, query: str, llm_chat_history: List[Dict[str, Any]] = None):
        ret = self._agent(query, llm_chat_history) if llm_chat_history is not None else self._agent(query)
        return ret if isinstance(ret, str) else (_ for _ in ()).throw(ValueError(f"After retrying \
            {self._max_retries} times, the function call agent still failes to call successfully."))

lazyllm.tools.PlanAndSolveAgent

Bases: ModuleBase

PlanAndSolveAgent由两个组件组成,首先,由planner将整个任务分解为更小的子任务,然后由solver根据计划执行这些子任务,其中可能会涉及到工具调用,最后将答案返回给用户。

Parameters:

  • llm (ModuleBase, default: None ) –

    要使用的LLM,可以是TrainableModule或OnlineChatModule。和plan_llm、solve_llm互斥,要么设置llm(planner和solver公用一个LLM),要么设置plan_llm和solve_llm,或者只指定llm(用来设置planner)和solve_llm,其它情况均认为是无效的。

  • tools (List[str], default: [] ) –

    LLM使用的工具名称列表。

  • plan_llm (ModuleBase, default: None ) –

    planner要使用的LLM,可以是TrainableModule或OnlineChatModule。

  • solve_llm (ModuleBase, default: None ) –

    solver要使用的LLM,可以是TrainableModule或OnlineChatModule。

  • max_retries (int, default: 5 ) –

    工具调用迭代的最大次数。默认值为5。

Examples:

>>> import lazyllm
>>> from lazyllm.tools import fc_register, PlanAndSolveAgent
>>> @fc_register("tool")
>>> def multiply(a: int, b: int) -> int:
...     '''
...     Multiply two integers and return the result integer
...
...     Args:
...         a (int): multiplier
...         b (int): multiplier
...     '''
...     return a * b
...
>>> @fc_register("tool")
>>> def add(a: int, b: int):
...     '''
...     Add two integers and returns the result integer
...
...     Args:
...         a (int): addend
...         b (int): addend
...     '''
...     return a + b
...
>>> tools = ["multiply", "add"]
>>> llm = lazyllm.TrainableModule("internlm2-chat-20b").start()  # or llm = lazyllm.OnlineChatModule(source="sensenova")
>>> agent = PlanAndSolveAgent(llm, tools)
>>> query = "What is 20+(2*4)? Calculate step by step."
>>> res = agent(query)
>>> print(res)
'The final answer is 28.'
Source code in lazyllm/tools/agent/planAndSolveAgent.py
class PlanAndSolveAgent(ModuleBase):
    """PlanAndSolveAgent由两个组件组成,首先,由planner将整个任务分解为更小的子任务,然后由solver根据计划执行这些子任务,其中可能会涉及到工具调用,最后将答案返回给用户。

Args:
    llm (ModuleBase): 要使用的LLM,可以是TrainableModule或OnlineChatModule。和plan_llm、solve_llm互斥,要么设置llm(planner和solver公用一个LLM),要么设置plan_llm和solve_llm,或者只指定llm(用来设置planner)和solve_llm,其它情况均认为是无效的。
    tools (List[str]): LLM使用的工具名称列表。
    plan_llm (ModuleBase): planner要使用的LLM,可以是TrainableModule或OnlineChatModule。
    solve_llm (ModuleBase): solver要使用的LLM,可以是TrainableModule或OnlineChatModule。
    max_retries (int): 工具调用迭代的最大次数。默认值为5。


Examples:
    >>> import lazyllm
    >>> from lazyllm.tools import fc_register, PlanAndSolveAgent
    >>> @fc_register("tool")
    >>> def multiply(a: int, b: int) -> int:
    ...     '''
    ...     Multiply two integers and return the result integer
    ...
    ...     Args:
    ...         a (int): multiplier
    ...         b (int): multiplier
    ...     '''
    ...     return a * b
    ...
    >>> @fc_register("tool")
    >>> def add(a: int, b: int):
    ...     '''
    ...     Add two integers and returns the result integer
    ...
    ...     Args:
    ...         a (int): addend
    ...         b (int): addend
    ...     '''
    ...     return a + b
    ...
    >>> tools = ["multiply", "add"]
    >>> llm = lazyllm.TrainableModule("internlm2-chat-20b").start()  # or llm = lazyllm.OnlineChatModule(source="sensenova")
    >>> agent = PlanAndSolveAgent(llm, tools)
    >>> query = "What is 20+(2*4)? Calculate step by step."
    >>> res = agent(query)
    >>> print(res)
    'The final answer is 28.'
    """
    def __init__(self, llm: Union[ModuleBase, None] = None, tools: List[str] = [], *,
                 plan_llm: Union[ModuleBase, None] = None, solve_llm: Union[ModuleBase, None] = None,
                 max_retries: int = 5, return_trace: bool = False):
        super().__init__(return_trace=return_trace)
        self._max_retries = max_retries
        assert (llm is None and plan_llm and solve_llm) or (llm and plan_llm is None), 'Either specify only llm \
               without specify plan and solve, or specify only plan and solve without specifying llm, or specify \
               both llm and solve. Other situations are not allowed.'
        assert tools, "tools cannot be empty."
        self._plan_llm = (plan_llm or llm).share(prompt=ChatPrompter(instruction=PLANNER_PROMPT))
        self._solve_llm = (solve_llm or llm).share()
        self._tools = tools
        with pipeline() as self._agent:
            self._agent.plan = self._plan_llm
            self._agent.parse = (lambda text, query: package([], '', [v for v in re.split("\n\\s*\\d+\\. ", text)[1:]],
                                 query)) | bind(query=self._agent.input)
            with loop(stop_condition=lambda pre, res, steps, query: len(steps) == 0) as self._agent.lp:
                self._agent.lp.pre_action = lambda pre_steps, response, steps, query: \
                    package(SOLVER_PROMPT.format(previous_steps="\n".join(pre_steps), current_step=steps[0],
                            objective=query) + "input: " + response + "\n" + steps[0], [])
                self._agent.lp.solve = FunctionCallAgent(self._solve_llm, tools=self._tools, return_trace=return_trace)
                self._agent.lp.post_action = self._post_action | bind(self._agent.lp.input[0][0], _0,
                                                                      self._agent.lp.input[0][2],
                                                                      self._agent.lp.input[0][3])

            self._agent.post_action = lambda pre, res, steps, query: res

    def _post_action(self, pre_steps: List[str], response: str, steps: List[str], query: str):
        LOG.debug(f"current step: {steps[0]}, response: {response}")
        pre_steps.append(steps.pop(0))
        return package(pre_steps, response, steps, query)

    def forward(self, query: str):
        return self._agent(query)

lazyllm.tools.ReWOOAgent

Bases: ModuleBase

ReWOOAgent包含三个部分:Planner、Worker和Solver。其中,Planner使用可预见推理能力为复杂任务创建解决方案蓝图;Worker通过工具调用来与环境交互,并将实际证据或观察结果填充到指令中;Solver处理所有计划和证据以制定原始任务或问题的解决方案。

Parameters:

  • llm (ModuleBase, default: None ) –

    要使用的LLM,可以是TrainableModule或OnlineChatModule。和plan_llm、solve_llm互斥,要么设置llm(planner和solver公用一个LLM),要么设置plan_llm和solve_llm,或者只指定llm(用来设置planner)和solve_llm,其它情况均认为是无效的。

  • tools (List[str], default: [] ) –

    LLM使用的工具名称列表。

  • plan_llm (ModuleBase, default: None ) –

    planner要使用的LLM,可以是TrainableModule或OnlineChatModule。

  • solve_llm (ModuleBase, default: None ) –

    solver要使用的LLM,可以是TrainableModule或OnlineChatModule。

  • max_retries (int) –

    工具调用迭代的最大次数。默认值为5。

Examples:

>>> import lazyllm
>>> import wikipedia
>>> from lazyllm.tools import fc_register, ReWOOAgent
>>> @fc_register("tool")
>>> def WikipediaWorker(input: str):
...     '''
...     Worker that search for similar page contents from Wikipedia. Useful when you need to get holistic knowledge about people, places, companies, historical events, or other subjects. The response are long and might contain some irrelevant information. Input should be a search query.
...
...     Args:
...         input (str): search query.
...     '''
...     try:
...         evidence = wikipedia.page(input).content
...         evidence = evidence.split("\n\n")[0]
...     except wikipedia.PageError:
...         evidence = f"Could not find [{input}]. Similar: {wikipedia.search(input)}"
...     except wikipedia.DisambiguationError:
...         evidence = f"Could not find [{input}]. Similar: {wikipedia.search(input)}"
...     return evidence
...
>>> @fc_register("tool")
>>> def LLMWorker(input: str):
...     '''
...     A pretrained LLM like yourself. Useful when you need to act with general world knowledge and common sense. Prioritize it when you are confident in solving the problem yourself. Input can be any instruction.
...
...     Args:
...         input (str): instruction
...     '''
...     llm = lazyllm.OnlineChatModule(source="glm")
...     query = f"Respond in short directly with no extra words.\n\n{input}"
...     response = llm(query, llm_chat_history=[])
...     return response
...
>>> tools = ["WikipediaWorker", "LLMWorker"]
>>> llm = lazyllm.TrainableModule("GLM-4-9B-Chat").deploy_method(lazyllm.deploy.vllm).start()  # or llm = lazyllm.OnlineChatModule(source="sensenova")
>>> agent = ReWOOAgent(llm, tools)
>>> query = "What is the name of the cognac house that makes the main ingredient in The Hennchata?"
>>> res = agent(query)
>>> print(res)
'
Hennessy '
Source code in lazyllm/tools/agent/rewooAgent.py
class ReWOOAgent(ModuleBase):
    """ReWOOAgent包含三个部分:Planner、Worker和Solver。其中,Planner使用可预见推理能力为复杂任务创建解决方案蓝图;Worker通过工具调用来与环境交互,并将实际证据或观察结果填充到指令中;Solver处理所有计划和证据以制定原始任务或问题的解决方案。

Args:
    llm (ModuleBase): 要使用的LLM,可以是TrainableModule或OnlineChatModule。和plan_llm、solve_llm互斥,要么设置llm(planner和solver公用一个LLM),要么设置plan_llm和solve_llm,或者只指定llm(用来设置planner)和solve_llm,其它情况均认为是无效的。
    tools (List[str]): LLM使用的工具名称列表。
    plan_llm (ModuleBase): planner要使用的LLM,可以是TrainableModule或OnlineChatModule。
    solve_llm (ModuleBase): solver要使用的LLM,可以是TrainableModule或OnlineChatModule。
    max_retries (int): 工具调用迭代的最大次数。默认值为5。


Examples:
    >>> import lazyllm
    >>> import wikipedia
    >>> from lazyllm.tools import fc_register, ReWOOAgent
    >>> @fc_register("tool")
    >>> def WikipediaWorker(input: str):
    ...     '''
    ...     Worker that search for similar page contents from Wikipedia. Useful when you need to get holistic knowledge about people, places, companies, historical events, or other subjects. The response are long and might contain some irrelevant information. Input should be a search query.
    ...
    ...     Args:
    ...         input (str): search query.
    ...     '''
    ...     try:
    ...         evidence = wikipedia.page(input).content
    ...         evidence = evidence.split("\\n\\n")[0]
    ...     except wikipedia.PageError:
    ...         evidence = f"Could not find [{input}]. Similar: {wikipedia.search(input)}"
    ...     except wikipedia.DisambiguationError:
    ...         evidence = f"Could not find [{input}]. Similar: {wikipedia.search(input)}"
    ...     return evidence
    ...
    >>> @fc_register("tool")
    >>> def LLMWorker(input: str):
    ...     '''
    ...     A pretrained LLM like yourself. Useful when you need to act with general world knowledge and common sense. Prioritize it when you are confident in solving the problem yourself. Input can be any instruction.
    ...
    ...     Args:
    ...         input (str): instruction
    ...     '''
    ...     llm = lazyllm.OnlineChatModule(source="glm")
    ...     query = f"Respond in short directly with no extra words.\\n\\n{input}"
    ...     response = llm(query, llm_chat_history=[])
    ...     return response
    ...
    >>> tools = ["WikipediaWorker", "LLMWorker"]
    >>> llm = lazyllm.TrainableModule("GLM-4-9B-Chat").deploy_method(lazyllm.deploy.vllm).start()  # or llm = lazyllm.OnlineChatModule(source="sensenova")
    >>> agent = ReWOOAgent(llm, tools)
    >>> query = "What is the name of the cognac house that makes the main ingredient in The Hennchata?"
    >>> res = agent(query)
    >>> print(res)
    '
    Hennessy '
    """
    def __init__(self, llm: Union[ModuleBase, None] = None, tools: List[str] = [], *,
                 plan_llm: Union[ModuleBase, None] = None, solve_llm: Union[ModuleBase, None] = None,
                 return_trace: bool = False):
        super().__init__(return_trace=return_trace)
        assert (llm is None and plan_llm and solve_llm) or (llm and plan_llm is None), 'Either specify only llm \
               without specify plan and solve, or specify only plan and solve without specifying llm, or specify \
               both llm and solve. Other situations are not allowed.'
        assert tools, "tools cannot be empty."
        self._planner = plan_llm or llm
        self._solver = solve_llm or llm
        self._workers = tools
        self._tools_manager = ToolManager(tools, return_trace=return_trace).tools_info
        with pipeline() as self._agent:
            self._agent.planner_pre_action = self._build_planner_prompt
            self._agent.planner = self._planner
            self._agent.parse_plan = self._parse_plan
            self._agent.woker = self._get_worker_evidences
            self._agent.solver_pre_action = self._build_solver_prompt | bind(input=self._agent.input)
            self._agent.solver = self._solver

    def _build_planner_prompt(self, input: str):
        prompt = P_PROMPT_PREFIX + "Tools can be one of the following:\n"
        for name in self._workers:
            prompt += f"{name}[search query]: {self._tools_manager[name].description}\n"
        prompt += P_FEWSHOT + "\n" + P_PROMPT_SUFFIX + input + "\n"
        LOG.info(f"planner prompt: {prompt}")
        globals['chat_history'][self._planner._module_id] = []
        return prompt

    def _parse_plan(self, response: str):
        LOG.debug(f"planner plans: {response}")
        plans = []
        evidence = {}
        for line in response.splitlines():
            if line.startswith("Plan"):
                plans.append(line)
            elif line.startswith("#") and line[1] == "E" and line[2].isdigit():
                e, tool_call = line.split("=", 1)
                e, tool_call = e.strip(), tool_call.strip()
                if len(e) == 3:
                    evidence[e] = tool_call
                else:
                    evidence[e] = "No evidence found"
        return package(plans, evidence)

    def _get_worker_evidences(self, plans: List[str], evidence: Dict[str, str]):
        worker_evidences = {}
        for e, tool_call in evidence.items():
            if "[" not in tool_call:
                worker_evidences[e] = tool_call
                continue
            tool, tool_input = tool_call.split("[", 1)
            tool_input = tool_input[:-1].strip("'").strip('"')
            # find variables in input and replace with previous evidences
            for var in re.findall(r"#E\d+", tool_input):
                if var in worker_evidences:
                    tool_input = tool_input.replace(var, "[" + worker_evidences[var] + "]")
            if tool in self._workers:
                worker_evidences[e] = self._tools_manager[tool](tool_input)
            else:
                worker_evidences[e] = "No evidence found"

        worker_log = ""
        for idx, plan in enumerate(plans):
            e = f"#E{idx+1}"
            worker_log += f"{plan}\nEvidence:\n{worker_evidences[e]}\n"
        LOG.debug(f"worker_log: {worker_log}")
        return worker_log

    def _build_solver_prompt(self, worker_log, input):
        prompt = S_PROMPT_PREFIX + input + "\n" + worker_log + S_PROMPT_SUFFIX + input + "\n"
        globals['chat_history'][self._solver._module_id] = []
        return prompt

    def forward(self, query: str):
        return self._agent(query)

lazyllm.tools.SQLiteTool

Bases: SqlTool

SQLiteTool是与SQLite数据库进行交互的专用工具。它扩展了SqlTool类,提供了创建表、执行查询和对SQLite数据库进行更新的方法。

Parameters:

  • db_file (str) –

    SQLite 文件数据库的路径

Examples:

>>> from lazyllm.tools import SQLiteTool
>>> with open("personal.db", "w") as _: pass
>>> sql_tool = SQLiteTool("personal.db")
>>> tables_info = {
...     "User": {
...         "fields": {
...             "id": {
...                 "type": "integer",
...                 "comment": "user id"
...             },
...             "name": {
...                 "type": "text",
...                 "comment": "user name"
...             },
...             "email": {
...                 "type": "text",
...                 "comment": "user email"
...             }
...         }
...     }
... }
>>> sql_tool.create_tables(tables_info)
>>> sql_tool.sql_update("INSERT INTO User (id, name, email) VALUES (1, 'Alice', 'alice@example.com')")
>>> table_info = sql_tool.get_all_tables()
>>> print(table_info)
>>> result_json = sql_tool.get_query_result_in_json("SELECT * from User")
>>> print(result_json)
Source code in lazyllm/tools/sql/sql_tool.py
class SQLiteTool(SqlTool):
    """SQLiteTool是与SQLite数据库进行交互的专用工具。它扩展了SqlTool类,提供了创建表、执行查询和对SQLite数据库进行更新的方法。

Arguments:
    db_file (str): SQLite 文件数据库的路径


Examples:
        >>> from lazyllm.tools import SQLiteTool
        >>> with open("personal.db", "w") as _: pass
        >>> sql_tool = SQLiteTool("personal.db")
        >>> tables_info = {
        ...     "User": {
        ...         "fields": {
        ...             "id": {
        ...                 "type": "integer",
        ...                 "comment": "user id"
        ...             },
        ...             "name": {
        ...                 "type": "text",
        ...                 "comment": "user name"
        ...             },
        ...             "email": {
        ...                 "type": "text",
        ...                 "comment": "user email"
        ...             }
        ...         }
        ...     }
        ... }
        >>> sql_tool.create_tables(tables_info)
        >>> sql_tool.sql_update("INSERT INTO User (id, name, email) VALUES (1, 'Alice', 'alice@example.com')")
        >>> table_info = sql_tool.get_all_tables()
        >>> print(table_info)
        >>> result_json = sql_tool.get_query_result_in_json("SELECT * from User")
        >>> print(result_json)
    """
    def __init__(self, db_file, return_trace=False):
        super().__init__()
        self.db_type = ""
        assert Path(db_file).is_file()
        self._return_trace = return_trace
        self.conn = sqlite3.connect(db_file, check_same_thread=False)

    def __del__(self):
        self.close_connection()

    def create_tables(self, tables_info: dict):
        """根据描述表结构的JSON字典在SQLite数据库中创建表。
JSON格式形如:{$TABLE_NAME:{"fields":{$COLUMN_NAME:{"type":("REAL"/"TEXT"/"INT"), "comment":"..."} } } }
"""
        cursor = self.conn.cursor()
        for table_name, table_info in tables_info.items():
            # Start building the SQL for creating the table
            create_table_sql = f"CREATE TABLE {table_name} ("

            # Iterate over fields to add them to the SQL statement
            fields = []
            for field_name, field_info in table_info["fields"].items():
                field_type = field_info["type"]
                comment = field_info["comment"]

                # Add field definition
                fields.append(f"{field_name} {field_type} comment '{comment}'")

            # Join fields and complete SQL statement
            create_table_sql += ", ".join(fields) + ");"

            # Execute SQL statement to create the table
            cursor.execute(create_table_sql)
        cursor.close()
        self.conn.commit()

    def close_connection(self):
        if self.conn:
            self.conn.close()

    def get_all_tables(self) -> str:
        """检索并返回SQLite数据库中所有表的字符串表示形式。
"""
        sql_script = "SELECT sql FROM sqlite_master WHERE type='table'"
        cursor = self.conn.cursor()
        try:
            cursor.execute(sql_script)
            table_infos = cursor.fetchall()
            str_tables = ""
            for table_info in table_infos:
                str_tables += table_info[0] + "\n"
            cursor.close()
            return str_tables
        except Exception as e:
            cursor.close()
            if self._return_trace:
                globals["trace"].append(f"SQLiteTool Exception: {str(e)}. sql_script: {sql_script}")
            LOG.warning(str(e))
            return ""

    def get_query_result_in_json(self, sql_script):
        """执行SQL查询并返回JSON格式的结果。
"""
        cursor = self.conn.cursor()
        str_result = ""
        try:
            cursor.execute(sql_script)
            columns = [description[0] for description in cursor.description]
            rows = cursor.fetchall()
            # change result to json
            results = [dict(zip(columns, row)) for row in rows]
            str_result = json.dumps(results, ensure_ascii=False)
        except sqlite3.Error as e:
            lazyllm.LOG.warning(f"SQLite error: {str(e)}")
            if self._return_trace:
                globals["trace"].append(f"SQLiteTool Exception: {str(e)}. sql_script: {sql_script}")
        finally:
            cursor.close()
        return str_result

    def sql_update(self, sql_script):
        """在SQLite数据库上执行SQL插入或更新脚本。
"""
        cursor = self.conn.cursor()
        try:
            cursor.execute(sql_script)
            # For INSERT, UPDATE execution must be committed
            self.conn.commit()
            cursor.close()
        except sqlite3.Error as e:
            lazyllm.LOG.warning(f"SQLite error: {str(e)}")
            if self._return_trace:
                globals["trace"].append(f"SQLiteTool Exception: {str(e)}. sql_script: {sql_script}")
        finally:
            cursor.close()

create_tables(tables_info)

根据描述表结构的JSON字典在SQLite数据库中创建表。 JSON格式形如:{$TABLE_NAME:{"fields":{$COLUMN_NAME:{"type":("REAL"/"TEXT"/"INT"), "comment":"..."} } } }

Source code in lazyllm/tools/sql/sql_tool.py
    def create_tables(self, tables_info: dict):
        """根据描述表结构的JSON字典在SQLite数据库中创建表。
JSON格式形如:{$TABLE_NAME:{"fields":{$COLUMN_NAME:{"type":("REAL"/"TEXT"/"INT"), "comment":"..."} } } }
"""
        cursor = self.conn.cursor()
        for table_name, table_info in tables_info.items():
            # Start building the SQL for creating the table
            create_table_sql = f"CREATE TABLE {table_name} ("

            # Iterate over fields to add them to the SQL statement
            fields = []
            for field_name, field_info in table_info["fields"].items():
                field_type = field_info["type"]
                comment = field_info["comment"]

                # Add field definition
                fields.append(f"{field_name} {field_type} comment '{comment}'")

            # Join fields and complete SQL statement
            create_table_sql += ", ".join(fields) + ");"

            # Execute SQL statement to create the table
            cursor.execute(create_table_sql)
        cursor.close()
        self.conn.commit()

get_all_tables()

检索并返回SQLite数据库中所有表的字符串表示形式。

Source code in lazyllm/tools/sql/sql_tool.py
    def get_all_tables(self) -> str:
        """检索并返回SQLite数据库中所有表的字符串表示形式。
"""
        sql_script = "SELECT sql FROM sqlite_master WHERE type='table'"
        cursor = self.conn.cursor()
        try:
            cursor.execute(sql_script)
            table_infos = cursor.fetchall()
            str_tables = ""
            for table_info in table_infos:
                str_tables += table_info[0] + "\n"
            cursor.close()
            return str_tables
        except Exception as e:
            cursor.close()
            if self._return_trace:
                globals["trace"].append(f"SQLiteTool Exception: {str(e)}. sql_script: {sql_script}")
            LOG.warning(str(e))
            return ""

get_query_result_in_json(sql_script)

执行SQL查询并返回JSON格式的结果。

Source code in lazyllm/tools/sql/sql_tool.py
    def get_query_result_in_json(self, sql_script):
        """执行SQL查询并返回JSON格式的结果。
"""
        cursor = self.conn.cursor()
        str_result = ""
        try:
            cursor.execute(sql_script)
            columns = [description[0] for description in cursor.description]
            rows = cursor.fetchall()
            # change result to json
            results = [dict(zip(columns, row)) for row in rows]
            str_result = json.dumps(results, ensure_ascii=False)
        except sqlite3.Error as e:
            lazyllm.LOG.warning(f"SQLite error: {str(e)}")
            if self._return_trace:
                globals["trace"].append(f"SQLiteTool Exception: {str(e)}. sql_script: {sql_script}")
        finally:
            cursor.close()
        return str_result

sql_update(sql_script)

在SQLite数据库上执行SQL插入或更新脚本。

Source code in lazyllm/tools/sql/sql_tool.py
    def sql_update(self, sql_script):
        """在SQLite数据库上执行SQL插入或更新脚本。
"""
        cursor = self.conn.cursor()
        try:
            cursor.execute(sql_script)
            # For INSERT, UPDATE execution must be committed
            self.conn.commit()
            cursor.close()
        except sqlite3.Error as e:
            lazyllm.LOG.warning(f"SQLite error: {str(e)}")
            if self._return_trace:
                globals["trace"].append(f"SQLiteTool Exception: {str(e)}. sql_script: {sql_script}")
        finally:
            cursor.close()

lazyllm.tools.SqlModule

Bases: ModuleBase

SqlModule 是一个扩展自 ModuleBase 的类,提供了使用语言模型(LLM)生成和执行 SQL 查询的接口。 它设计用于与 SQL 数据库交互,从语言模型的响应中提取 SQL 查询,执行这些查询,并返回结果或解释。

Parameters:

  • llm

    用于生成和解释 SQL 查询及解释的大语言模型。

  • sql_tool (SqlTool) –

    一个 SqlTool 实例,用于处理与 SQL 数据库的交互。

  • use_llm_for_sql_result ((bool, 可选), default: True ) –

    默认值为True。如果设置为False, 则只输出JSON格式表示的sql执行结果;True则会使用LLM对sql执行结果进行解读并返回自然语言结果。

  • return_trace ((bool, 可选), default: False ) –

    如果设置为 True,则将结果记录在trace中。默认为 False。

Examples:

>>> # First, run SQLiteTool example
>>> import lazyllm
>>> from lazyllm.tools import SQLiteTool, SqlModule
>>> sql_tool = SQLiteTool("personal.db")
>>> sql_llm = lazyllm.OnlineChatModule(model="gpt-4o", source="openai", base_url="***")
>>> sql_module = SqlModule(sql_llm, sql_tool, use_llm_for_sql_result=True)
>>> print(sql_module("员工Alice的邮箱地址是什么?"))
Source code in lazyllm/tools/sql/sql_tool.py
class SqlModule(ModuleBase):
    """SqlModule 是一个扩展自 ModuleBase 的类,提供了使用语言模型(LLM)生成和执行 SQL 查询的接口。
它设计用于与 SQL 数据库交互,从语言模型的响应中提取 SQL 查询,执行这些查询,并返回结果或解释。

Arguments:
    llm: 用于生成和解释 SQL 查询及解释的大语言模型。
    sql_tool (SqlTool): 一个 SqlTool 实例,用于处理与 SQL 数据库的交互。
    use_llm_for_sql_result (bool, 可选): 默认值为True。如果设置为False, 则只输出JSON格式表示的sql执行结果;True则会使用LLM对sql执行结果进行解读并返回自然语言结果。
    return_trace (bool, 可选): 如果设置为 True,则将结果记录在trace中。默认为 False。


Examples:
        >>> # First, run SQLiteTool example
        >>> import lazyllm
        >>> from lazyllm.tools import SQLiteTool, SqlModule
        >>> sql_tool = SQLiteTool("personal.db")
        >>> sql_llm = lazyllm.OnlineChatModule(model="gpt-4o", source="openai", base_url="***")
        >>> sql_module = SqlModule(sql_llm, sql_tool, use_llm_for_sql_result=True)
        >>> print(sql_module("员工Alice的邮箱地址是什么?"))
    """
    def __init__(self, llm, sql_tool: SqlTool, use_llm_for_sql_result=True, return_trace: bool = False) -> None:
        super().__init__(return_trace=return_trace)
        self._sql_tool = sql_tool
        self._query_prompter = ChatPrompter(instruction=sql_query_instruct_template).pre_hook(self.sql_query_promt_hook)
        self._llm_query = llm.share(prompt=self._query_prompter)
        self._answer_prompter = ChatPrompter(instruction=sql_explain_instruct_template).pre_hook(
            self.sql_explain_prompt_hook
        )
        self._llm_answer = llm.share(prompt=self._answer_prompter)
        self._pattern = re.compile(r"```sql(.+?)```", re.DOTALL)
        with pipeline() as sql_execute_ppl:
            sql_execute_ppl.exec = self._sql_tool.get_query_result_in_json
            if not use_llm_for_sql_result:
                sql_execute_ppl.concate = (lambda q, r: [q, r]) | bind(sql_execute_ppl.input, _0)
                sql_execute_ppl.llm_answer = self._llm_answer
        with pipeline() as ppl:
            ppl.llm_query = self._llm_query
            ppl.sql_extractor = self.extract_sql_from_response
            with switch(judge_on_full_input=False) as ppl.sw:
                ppl.sw.case[False, lambda x: x]
                ppl.sw.case[True, sql_execute_ppl]
        self._impl = ppl

    def sql_query_promt_hook(
        self,
        input: Union[str, List, Dict[str, str], None] = None,
        history: List[Union[List[str], Dict[str, Any]]] = [],
        tools: Union[List[Dict[str, Any]], None] = None,
        label: Union[str, None] = None,
    ):
        current_date = datetime.datetime.now().strftime("%Y-%m-%d")
        sql_tables_info = self._sql_tool.get_all_tables()
        if not isinstance(input, str):
            raise ValueError(f"Unexpected type for input: {type(input)}")
        return (
            dict(
                current_date=current_date, db_type=self._sql_tool.db_type, sql_tables=sql_tables_info, user_query=input
            ),
            history,
            tools,
            label,
        )

    def sql_explain_prompt_hook(
        self,
        input: Union[str, List, Dict[str, str], None] = None,
        history: List[Union[List[str], Dict[str, Any]]] = [],
        tools: Union[List[Dict[str, Any]], None] = None,
        label: Union[str, None] = None,
    ):
        explain_query = "Tell the user based on the sql execution results, making sure to keep the language consistent \
            with the user's input and don't translate original result."
        if not isinstance(input, list) and len(input) != 2:
            raise ValueError(f"Unexpected type for input: {type(input)}")
        assert "root_input" in globals and self._llm_answer._module_id in globals["root_input"]
        user_query = globals["root_input"][self._llm_answer._module_id]
        globals._data.pop("root_input")
        history_info = chat_history_to_str(history, user_query)
        return (
            dict(history_info=history_info, sql_query=input[0], sql_result=input[1], explain_query=explain_query),
            history,
            tools,
            label,
        )

    def extract_sql_from_response(self, str_response: str) -> tuple[bool, str]:
        # Remove the triple backticks if present
        matches = self._pattern.findall(str_response)
        if matches:
            # Return the first match
            extracted_content = matches[0].strip()
            return True, extracted_content
        else:
            return False, str_response

    def forward(self, input: str, llm_chat_history: List[Dict[str, Any]] = None):
        globals["root_input"] = {self._llm_answer._module_id: input}
        if self._module_id in globals["chat_history"]:
            globals["chat_history"][self._llm_query._module_id] = globals["chat_history"][self._module_id]
        return self._impl(input)

lazyllm.tools.IntentClassifier

Bases: ModuleBase

IntentClassifier 是一个基于语言模型的意图识别器,用于根据用户提供的输入文本及对话上下文识别预定义的意图,并通过预处理和后处理步骤确保准确识别意图。

Parameters:

  • llm

    用于意图识别的语言模型对象,OnlineChatModule或TrainableModule类型

  • intent_list (list, default: None ) –

    包含所有可能意图的字符串列表。可以包含中文或英文的意图。

  • prompt (str, default: '' ) –

    用户附加的提示词。

  • constrain (str, default: '' ) –

    用户附加的限制。

  • examples (list[list], default: [] ) –

    额外的示例,格式为 [[query, intent], [query, intent], ...]

  • return_trace ((bool, 可选), default: False ) –

    如果设置为 True,则将结果记录在trace中。默认为 False。

Examples:

>>> import lazyllm
>>> from lazyllm.tools import IntentClassifier
>>> classifier_llm = lazyllm.OnlineChatModule(source="openai")
>>> chatflow_intent_list = ["Chat", "Financial Knowledge Q&A", "Employee Information Query", "Weather Query"]
>>> classifier = IntentClassifier(classifier_llm, intent_list=chatflow_intent_list)
>>> classifier.start()
>>> print(classifier('What is the weather today'))
Weather Query
>>>
>>> with IntentClassifier(classifier_llm) as ic:
>>>     ic.case['Weather Query', lambda x: '38.5°C']
>>>     ic.case['Chat', lambda x: 'permission denied']
>>>     ic.case['Financial Knowledge Q&A', lambda x: 'Calling Financial RAG']
>>>     ic.case['Employee Information Query', lambda x: 'Beijing']
...
>>> ic.start()
>>> print(ic('What is the weather today'))
38.5°C
Source code in lazyllm/tools/classifier/intent_classifier.py
class IntentClassifier(ModuleBase):
    """IntentClassifier 是一个基于语言模型的意图识别器,用于根据用户提供的输入文本及对话上下文识别预定义的意图,并通过预处理和后处理步骤确保准确识别意图。

Arguments:
    llm: 用于意图识别的语言模型对象,OnlineChatModule或TrainableModule类型
    intent_list (list): 包含所有可能意图的字符串列表。可以包含中文或英文的意图。
    prompt (str): 用户附加的提示词。
    constrain (str): 用户附加的限制。
    examples (list[list]): 额外的示例,格式为 `[[query, intent], [query, intent], ...]` 。
    return_trace (bool, 可选): 如果设置为 True,则将结果记录在trace中。默认为 False。


Examples:
        >>> import lazyllm
        >>> from lazyllm.tools import IntentClassifier
        >>> classifier_llm = lazyllm.OnlineChatModule(source="openai")
        >>> chatflow_intent_list = ["Chat", "Financial Knowledge Q&A", "Employee Information Query", "Weather Query"]
        >>> classifier = IntentClassifier(classifier_llm, intent_list=chatflow_intent_list)
        >>> classifier.start()
        >>> print(classifier('What is the weather today'))
        Weather Query
        >>>
        >>> with IntentClassifier(classifier_llm) as ic:
        >>>     ic.case['Weather Query', lambda x: '38.5°C']
        >>>     ic.case['Chat', lambda x: 'permission denied']
        >>>     ic.case['Financial Knowledge Q&A', lambda x: 'Calling Financial RAG']
        >>>     ic.case['Employee Information Query', lambda x: 'Beijing']
        ...
        >>> ic.start()
        >>> print(ic('What is the weather today'))
        38.5°C
    """
    def __init__(self, llm, intent_list: list = None,
                 *, prompt: str = '', constrain: str = '', attention: str = '',
                 examples: list[list[str, str]] = [], return_trace: bool = False) -> None:
        super().__init__(return_trace=return_trace)
        self._intent_list = intent_list or []
        self._llm = llm
        self._prompt, self._constrain, self._attention, self._examples = prompt, constrain, attention, examples
        if self._intent_list:
            self._init()

    def _init(self):
        def choose_prompt():
            # Use chinese prompt if intent elements have chinese character, otherwise use english version
            for ele in self._intent_list:
                for ch in ele:
                    # chinese unicode range
                    if "\u4e00" <= ch <= "\u9fff":
                        return ch_prompt_classifier_template
            return en_prompt_classifier_template

        example_template = '\nUser: {{{{"human_input": "{inp}", "intent_list": {intent}}}}}\nAssistant: {label}\n'
        examples = ''.join([example_template.format(
            inp=input, intent=self._intent_list, label=label) for input, label in self._examples])
        prompt = choose_prompt().replace(
            '{user_prompt}', f' {self._prompt}').replace('{attention}', self._attention).replace(
            '{user_constrains}', f' {self._constrain}').replace('{user_examples}', f' {examples}')
        self._llm = self._llm.share(prompt=AlpacaPrompter(dict(system=prompt, user='${input}')
                                                          ).pre_hook(self.intent_promt_hook))
        self._impl = pipeline(self._llm, self.post_process_result)

    def intent_promt_hook(
        self,
        input: Union[str, List, Dict[str, str], None] = None,
        history: List[Union[List[str], Dict[str, Any]]] = [],
        tools: Union[List[Dict[str, Any]], None] = None,
        label: Union[str, None] = None,
    ):
        input_json = {}
        if isinstance(input, str):
            input_json = {"human_input": input, "intent_list": self._intent_list}
        else:
            raise ValueError(f"Unexpected type for input: {type(input)}")

        history_info = chat_history_to_str(history)
        history = []
        input_text = json.dumps(input_json, ensure_ascii=False)
        return dict(history_info=history_info, input=input_text), history, tools, label

    def post_process_result(self, input):
        input = input.strip()
        return input if input in self._intent_list else self._intent_list[0]

    def forward(self, input: str, llm_chat_history: List[Dict[str, Any]] = None):
        if llm_chat_history is not None and self._llm._module_id not in globals["chat_history"]:
            globals["chat_history"][self._llm._module_id] = llm_chat_history
        return self._impl(input)

    def __enter__(self):
        assert not self._intent_list, 'Intent list is already set'
        self._sw = switch()
        self._sw.__enter__()
        return self

    @property
    def case(self):
        return switch.Case(self)

    @property
    def submodules(self):
        submodule = []
        if isinstance(self._impl, switch):
            self._impl.for_each(lambda x: isinstance(x, ModuleBase), lambda x: submodule.append(x))
        return super().submodules + submodule

    # used by switch.Case
    def _add_case(self, cond, func):
        assert isinstance(cond, str), 'intent must be string'
        self._intent_list.append(cond)
        self._sw.case[cond, func]

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._sw.__exit__(exc_type, exc_val, exc_tb)
        self._init()
        self._sw._set_conversion(self._impl)
        self._impl = self._sw