Skip to content

通用

Register

lazyllm.common.Register

Bases: object

LazyLLM提供的Component的注册机制,可以将任意函数注册成LazyLLM的Component。被注册的函数无需显式的import,即可通过注册器提供的分组机制,在任一位置被索引到。

Source code in lazyllm/common/registry.py
class Register(object):
    """LazyLLM提供的Component的注册机制,可以将任意函数注册成LazyLLM的Component。被注册的函数无需显式的import,即可通过注册器提供的分组机制,在任一位置被索引到。

"""
    def __init__(self, base, fnames, template: str = reg_template, default_group: Optional[str] = None):
        self.basecls = base
        self.fnames = [fnames] if isinstance(fnames, str) else fnames
        self.template = template
        self._default_group = default_group
        assert len(self.fnames) > 0, 'At least one function should be given for overwrite.'

    def _wrap(self, cls, *, rewrite_func=None):
        cls = cls.__name__ if isinstance(cls, type) else cls
        cls = re.match('(LazyLLM)(.*)(Base)', cls.split('.')[-1])[2] \
            if (cls.startswith('LazyLLM') and cls.endswith('Base')) else cls
        base = _get_base_cls_from_registry(cls.lower())
        assert issubclass(base, self.basecls)
        if rewrite_func is None:
            rewrite_func = base.__reg_overwrite__ if getattr(base, '__reg_overwrite__', None) else self.fnames[0]
        assert rewrite_func in self.fnames, f'Invalid function "{rewrite_func}" provived for rewrite.'

        def impl(func, func_name=None):
            if func_name:
                func_for_wrapper = func  # avoid calling recursively

                @functools.wraps(func)
                def wrapper_func(*args, **kwargs):
                    return func_for_wrapper(*args, **kwargs)

                wrapper_func.__name__ = func_name
                func = wrapper_func
            else:
                func_name = func.__name__
            exec(self.template.format(
                name=func_name + cls.split('.')[-1].capitalize(), base=cls))
            # 'func' cannot be recognized by exec, so we use 'setattr' instead
            f = LazyLLMRegisterMetaClass.all_clses[cls.lower()].__getattr__(func_name)
            f.__name__ = func_name
            setattr(f, rewrite_func, bind_to_instance(func))
            return func
        return impl

    def __call__(self, f, *, rewrite_func=None):
        if not isinstance(f, (str, type)):
            assert self._default_group, 'default_group is not set, please set it by your register decorator'
            return self._wrap(self._default_group)(f)
        return self._wrap(f, rewrite_func=rewrite_func)

    def __getattr__(self, name):
        if name not in self.fnames:
            raise AttributeError(f'class {self.__class__} has no attribute {name}')

        def impl(cls):
            return self(cls, rewrite_func=name)
        return impl

    def new_group(self, group_name):
        exec('class LazyLLM{name}Base(self.basecls):\n    pass\n'.format(name=group_name))

lazyllm.common.registry.LazyDict

Bases: dict

一个为懒惰的程序员设计的特殊字典类。支持多种便捷的访问和操作方式。

特性: 1. 使用点号代替['str']访问字典元素 2. 支持首字母小写来使语句更像函数调用 3. 当字典只有一个元素时支持直接调用 4. 支持动态默认键 5. 如果组名出现在名称中,允许省略组名

参数

name (str): 字典的名称,默认为空字符串。 base: 基类引用,默认为None。 args: 位置参数,传递给dict父类。 *kw: 关键字参数,传递给dict父类。

Source code in lazyllm/common/registry.py
class LazyDict(dict):
    """一个为懒惰的程序员设计的特殊字典类。支持多种便捷的访问和操作方式。

特性:
1. 使用点号代替['str']访问字典元素
2. 支持首字母小写来使语句更像函数调用
3. 当字典只有一个元素时支持直接调用
4. 支持动态默认键
5. 如果组名出现在名称中,允许省略组名

参数:
    name (str): 字典的名称,默认为空字符串。
    base: 基类引用,默认为None。
    *args: 位置参数,传递给dict父类。
    **kw: 关键字参数,传递给dict父类。
"""
    def __init__(self, name='', base=None, *args, **kw):
        super(__class__, self).__init__(*args, **kw)
        self._default: Optional[str] = None
        self.name = name.capitalize()
        self.base = base

    def __setitem__(self, key, value):
        key = key.lower()
        assert key != 'default', 'LazyDict do not support key: default'
        if '.' in key:
            grp, key = key.rsplit('.', 1)
            return self[grp].__setitem__(key, value)
        return super().__setitem__(key, value)

    def __getitem__(self, key):
        key = key.lower()
        if '.' in key:
            grp, key = key.split('.', 1)
            return self[grp][key]
        return super().__getitem__(key)

    # default -> self.default
    # key -> Key, keyName, KeyName
    # if self.name ends with 's' or 'es', ignor it
    def _match(self, key: str):
        key = key.lower()
        if key == 'default':
            assert self._default or len(self) > 0, 'No default key set'
            key = self._default or list(self.keys())[0]
        keys = [key, f'{key}{self.name}', f'{key}{self.name.lower()}']
        if self.name.endswith('s'):
            n = 2 if self.name.endswith('es') else 1
            keys.extend([f'{key}{self.name[:-n]}', f'{key}{self.name[:-n].lower()}'])

        for k in set(keys):
            if k in self.keys():
                return k
        raise AttributeError(f'Attr {key} not found in `{self.name}: {self}`, conditates: {keys}')

    def __getattr__(self, key):
        return self[self._match(key)]

    def remove(self, key):
        """从字典中移除指定的键值对。

参数:
    key (str): 要移除的键。支持与__getattr__相同的键匹配规则,包括首字母小写和组名省略等特性。

注意:
    如果找不到匹配的键,将抛出AttributeError异常。
"""
        super(__class__, self).pop(self._match(key))

    def __call__(self, *args, **kwargs):
        assert self._default is not None or len(self.keys()) == 1
        return (self.default if self._default else self[list(self.keys())[0]])(*args, **kwargs)

    def set_default(self, key: str):
        """设置字典的默认键。设置后可以通过.default属性访问该键对应的值。

参数:
    key (str): 要设置为默认的键名。

注意:
    - key必须是字符串类型
    - 设置后可以通过.default访问,或在字典只有一个元素时直接调用
"""
        assert isinstance(key, str), 'default key must be str'
        self._default = key.lower()

remove(key)

从字典中移除指定的键值对。

参数

key (str): 要移除的键。支持与__getattr__相同的键匹配规则,包括首字母小写和组名省略等特性。

注意

如果找不到匹配的键,将抛出AttributeError异常。

Source code in lazyllm/common/registry.py
    def remove(self, key):
        """从字典中移除指定的键值对。

参数:
    key (str): 要移除的键。支持与__getattr__相同的键匹配规则,包括首字母小写和组名省略等特性。

注意:
    如果找不到匹配的键,将抛出AttributeError异常。
"""
        super(__class__, self).pop(self._match(key))

set_default(key)

设置字典的默认键。设置后可以通过.default属性访问该键对应的值。

参数

key (str): 要设置为默认的键名。

注意
  • key必须是字符串类型
  • 设置后可以通过.default访问,或在字典只有一个元素时直接调用
Source code in lazyllm/common/registry.py
    def set_default(self, key: str):
        """设置字典的默认键。设置后可以通过.default属性访问该键对应的值。

参数:
    key (str): 要设置为默认的键名。

注意:
    - key必须是字符串类型
    - 设置后可以通过.default访问,或在字典只有一个元素时直接调用
"""
        assert isinstance(key, str), 'default key must be str'
        self._default = key.lower()

lazyllm.common.common.ResultCollector

Bases: object

结果收集器,用于在流程或任务执行过程中按名称存储和访问结果。
它通过调用自身(传入 name)返回一个可调用的 Impl 对象来收集指定名称的结果。
适用于需要跨步骤共享中间结果的场景。

Source code in lazyllm/common/common.py
class ResultCollector(object):
    """结果收集器,用于在流程或任务执行过程中按名称存储和访问结果。  
它通过调用自身(传入 name)返回一个可调用的 Impl 对象来收集指定名称的结果。  
适用于需要跨步骤共享中间结果的场景。
"""
    class Impl(object):
        def __init__(self, name, value): self._name, self._value = name, value

        def __call__(self, *args, **kw):
            assert (len(args) == 0) ^ (len(kw) == 0), f'args({len(args)}), kwargs({len(kw)})'
            assert self._name is not None
            if len(args) > 0:
                self._value[self._name] = args[0] if len(args) == 1 else package(*args)
                return self._value[self._name]
            else:
                self._value[self._name] = kw
                return kwargs(kw)

    def __init__(self): self._value = dict()
    def __call__(self, name): return ResultCollector.Impl(name, self._value)
    def __getitem__(self, name): return self._value[name]
    def __repr__(self): return repr(self._value)
    def keys(self):
        """获取所有已存储结果的名称。

**Returns**

- KeysView[str]: 结果名称集合。
"""
        return self._value.keys()
    def items(self):
        """获取所有已存储的 (名称, 值) 对。

**Returns**

- ItemsView[str, Any]: 结果的键值对集合。
"""
        return self._value.items()

items()

获取所有已存储的 (名称, 值) 对。

Returns

  • ItemsView[str, Any]: 结果的键值对集合。
Source code in lazyllm/common/common.py
    def items(self):
        """获取所有已存储的 (名称, 值) 对。

**Returns**

- ItemsView[str, Any]: 结果的键值对集合。
"""
        return self._value.items()

keys()

获取所有已存储结果的名称。

Returns

  • KeysView[str]: 结果名称集合。
Source code in lazyllm/common/common.py
    def keys(self):
        """获取所有已存储结果的名称。

**Returns**

- KeysView[str]: 结果名称集合。
"""
        return self._value.keys()

lazyllm.common.common.EnvVarContextManager

环境变量上下文管理器,用于 在代码块执行期间临时设置环境变量,退出时自动恢复原始环境变量。

Parameters:

  • env_vars_dict (dict) –

    需要临时设置的环境变量字典,值为 None 的变量将被忽略。

Source code in lazyllm/common/common.py
class EnvVarContextManager:
    """环境变量上下文管理器,用于 在代码块执行期间临时设置环境变量,退出时自动恢复原始环境变量。

Args:
    env_vars_dict (dict): 需要临时设置的环境变量字典,值为 None 的变量将被忽略。
"""
    def __init__(self, env_vars_dict):
        self.env_vars_dict = {var: value for var, value in env_vars_dict.items() if value is not None}
        self.original_values = {}

    def __enter__(self):
        for var, value in self.env_vars_dict.items():
            if var in os.environ:
                self.original_values[var] = os.environ[var]
            os.environ[var] = value
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        for var in self.env_vars_dict:
            if var in self.original_values:
                os.environ[var] = self.original_values[var]
            else:
                del os.environ[var]

Bind

lazyllm.common.bind

Bind

Bases: object

Bind 类用于函数绑定与延迟调用,支持动态参数传入和上下文参数解析,实现灵活的函数组合与流水线式调用。

bind 函数能够将一个函数与固定的位置参数和关键字参数绑定,支持使用占位符(如 _0, _1)引用当前数据流中上游节点的输出,实现数据在流水线中的跳跃传递和灵活组合。

注意事项: - 绑定的参数可以是具体值,也可以是当前数据流中上游节点的输出占位符。 - 参数绑定仅在当前数据流上下文内生效,不能跨数据流绑定或绑定外部变量。

Parameters:

  • __bind_func (Callable 或 type, default: _None ) –

    要绑定的函数或函数类型,传入类型时会自动实例化。

  • *args

    绑定时固定的位置参数,可以包含占位符。

  • **kw

    绑定时固定的关键字参数,可以包含占位符。

Examples:

>>> from lazyllm import bind, _0, _1
>>> def f1(x):
...     return x ** 2
>>> def f21(input1, input2=0):
...     return input1 + input2 + 1
>>> def f22(input1, input2=0):
...     return input1 + input2 - 1
>>> def f3(in1='placeholder1', in2='placeholder2', in3='placeholder3'):
...     return f"get [input:{in1}], [f21:{in2}], [f22:{in3}]"
>>> from lazyllm import pipeline, parallel
>>> with pipeline() as ppl:
...     ppl.f1 = f1
...     with parallel() as ppl.subprl2:
...         ppl.subprl2.path1 = f21
...         ppl.subprl2.path2 = f22
...     ppl.f3 = bind(f3, ppl.input, _0, _1)
...
>>> print(ppl(2))
get [input:2], [f21:5], [f22:3]
>>> # Demonstrate operator '|' overloading for bind
>>> with pipeline() as ppl2:
...     ppl2.f1 = f1
...     with parallel().bind(ppl2.input, _0) as ppl2.subprl2:
...         ppl2.subprl2.path1 = f21
...         ppl2.subprl2.path2 = f22
...     ppl2.f3 = f3 | bind(ppl2.input, _0, _1)
...
>>> print(ppl2(2))
get [input:2], [f21:7], [f22:5]
Source code in lazyllm/common/bind.py
class Bind(object):
    """Bind 类用于函数绑定与延迟调用,支持动态参数传入和上下文参数解析,实现灵活的函数组合与流水线式调用。

bind 函数能够将一个函数与固定的位置参数和关键字参数绑定,支持使用占位符(如 _0, _1)引用当前数据流中上游节点的输出,实现数据在流水线中的跳跃传递和灵活组合。

注意事项:
    - 绑定的参数可以是具体值,也可以是当前数据流中上游节点的输出占位符。
    - 参数绑定仅在当前数据流上下文内生效,不能跨数据流绑定或绑定外部变量。

Args:
    __bind_func (Callable 或 type): 要绑定的函数或函数类型,传入类型时会自动实例化。
    *args: 绑定时固定的位置参数,可以包含占位符。
    **kw: 绑定时固定的关键字参数,可以包含占位符。


Examples:
    >>> from lazyllm import bind, _0, _1
    >>> def f1(x):
    ...     return x ** 2
    >>> def f21(input1, input2=0):
    ...     return input1 + input2 + 1
    >>> def f22(input1, input2=0):
    ...     return input1 + input2 - 1
    >>> def f3(in1='placeholder1', in2='placeholder2', in3='placeholder3'):
    ...     return f"get [input:{in1}], [f21:{in2}], [f22:{in3}]"

    >>> from lazyllm import pipeline, parallel

    >>> with pipeline() as ppl:
    ...     ppl.f1 = f1
    ...     with parallel() as ppl.subprl2:
    ...         ppl.subprl2.path1 = f21
    ...         ppl.subprl2.path2 = f22
    ...     ppl.f3 = bind(f3, ppl.input, _0, _1)
    ...
    >>> print(ppl(2))
    get [input:2], [f21:5], [f22:3]

    >>> # Demonstrate operator '|' overloading for bind
    >>> with pipeline() as ppl2:
    ...     ppl2.f1 = f1
    ...     with parallel().bind(ppl2.input, _0) as ppl2.subprl2:
    ...         ppl2.subprl2.path1 = f21
    ...         ppl2.subprl2.path2 = f22
    ...     ppl2.f3 = f3 | bind(ppl2.input, _0, _1)
    ...
    >>> print(ppl2(2))
    get [input:2], [f21:7], [f22:5]
    """
    class _None: pass

    class Args(object):
        class _None: pass
        class Unpack(package): pass

        def __init__(self, source_id: str, target_id: str = 'input', *, unpack: bool = False):
            self._item_key, self._attr_key = Bind.Args._None, Bind.Args._None
            self._source_id, self._target_id = source_id, target_id
            self._unpack = unpack

        def __getitem__(self, key: str):
            self._item_key = key
            return self

        def __getattr__(self, key: str):
            if key.startswith('__') and key.endswith('__'):
                raise AttributeError(f'Args has no attribute {key}')
            self._attr_key = key
            return self

        def __getstate__(self):
            return self._item_key, self._attr_key, self._source_id, self._target_id

        def __setstate__(self, state):
            self._item_key, self._attr_key, self._source_id, self._target_id = state

        def get_arg(self, source):
            if (not source or self._source_id != source['source']) and self._source_id in globals['bind_args']:
                source = globals['bind_args'][self._source_id]
            if not source or source['source'] != self._source_id:
                raise RuntimeError('Unable to find the bound parameter, possibly due to pipeline.input/output can only '
                                   'be bind in direct member of pipeline! You may solve this by defining the pipeline '
                                   'in a `with lazyllm.save_pipeline_result():` block.')
            input = result = source[self._target_id]
            source = source['source']
            if self._item_key is not Bind.Args._None: result = input[self._item_key]
            elif self._attr_key is not Bind.Args._None: result = getattr(input, self._attr_key)
            if self._unpack and isinstance(result, package): result = Bind.Args.Unpack(result)
            return result

    def __init__(self, __bind_func=_None, *args, **kw):
        self._f = __bind_func() if isinstance(__bind_func, type) and __bind_func is not Bind._None else __bind_func
        self._args = args
        self._kw = kw
        self._has_root = (any([isinstance(a, AttrTree) for a in args])
                          or any([isinstance(v, AttrTree) for v in kw.values()]))

    def __ror__(self, __value: Callable):
        if self._f is not Bind._None: self._args = (self._f,) + self._args
        self._f = __value
        return self

    # _bind_args_source: dict(input=input, args=dict(key=value))
    def __call__(self, *args, _bind_args_source=None, **kw):
        if self._f is None: return None
        keys = set(kw.keys()).intersection(set(self._kw.keys()))
        assert len(keys) == 0, f'Keys `{keys}` are already bind!'
        bind_args = args if len(self._args) == 0 else (
            [args[a.idx] if isinstance(a, Placeholder) else a for a in self._args])
        kwargs = {k: args[v.idx] if isinstance(v, Placeholder) else v for k, v in self._kw.items()}
        bind_args = [a.get_arg(_bind_args_source) if isinstance(a, Bind.Args) else a for a in bind_args]
        bind_args = list(itertools.chain.from_iterable(x if isinstance(x, Bind.Args.Unpack) else [x] for x in bind_args))
        kwargs = {k: v.get_arg(_bind_args_source) if isinstance(v, Bind.Args) else v for k, v in kwargs.items()}
        return self._f(*bind_args, **kwargs, **kw)

    # TODO: modify it
    def __repr__(self) -> str:
        return self._f.__repr__() + '(bind args:{})'.format(
            ', '.join([repr(a) if a is not self else 'self' for a in self._args]))

    def __getattr__(self, name):
        # name will be '_f' in copy.deepcopy
        if name != '_f':
            return getattr(self._f, name)
        return super(__class__, self).__getattr__(name)

    def __setattr__(self, __name: str, __value: Any) -> None:
        if __name not in ('_f', '_args', '_kw', '_has_root'):
            return setattr(self._f, __name, __value)
        return super(__class__, self).__setattr__(__name, __value)

Package

lazyllm.common.package

Bases: tuple

package类用于封装流水线或并行模块的返回值,保证传递给下游模块时自动拆包,从而支持多个值的灵活传递。

Examples:

>>> from lazyllm.common import package
>>> p = package(1, 2, 3)
>>> p
(1, 2, 3)
>>> p[1]
2
>>> p_slice = p[1:]
>>> isinstance(p_slice, package)
True
>>> p2 = package([4, 5])
>>> p + p2
(1, 2, 3, 4, 5)
Source code in lazyllm/common/common.py
class package(tuple):
    """package类用于封装流水线或并行模块的返回值,保证传递给下游模块时自动拆包,从而支持多个值的灵活传递。


Examples:
    >>> from lazyllm.common import package
    >>> p = package(1, 2, 3)
    >>> p
    (1, 2, 3)
    >>> p[1]
    2
    >>> p_slice = p[1:]
    >>> isinstance(p_slice, package)
    True
    >>> p2 = package([4, 5])
    >>> p + p2
    (1, 2, 3, 4, 5)
    """
    def __new__(cls, *args):
        if len(args) == 1 and isinstance(args[0], (tuple, list, types.GeneratorType)):
            return super(__class__, cls).__new__(cls, args[0])
        else:
            return super(__class__, cls).__new__(cls, args)

    def __getitem__(self, key):
        if isinstance(key, slice):
            return package(super(__class__, self).__getitem__(key))
        return super(__class__, self).__getitem__(key)

    def __add__(self, __other):
        return package(super().__add__(__other))

Identity

lazyllm.common.Identity

恒等模块,用于直接返回输入值。

该模块常用于模块拼接结构中占位,无实际处理逻辑。若输入为多个参数,将自动打包为一个整体结构输出。

Parameters:

  • *args

    可选的位置参数,占位用。

  • **kw

    可选的关键字参数,占位用。

Source code in lazyllm/common/common.py
class Identity():
    """恒等模块,用于直接返回输入值。

该模块常用于模块拼接结构中占位,无实际处理逻辑。若输入为多个参数,将自动打包为一个整体结构输出。

Args:
    *args: 可选的位置参数,占位用。
    **kw: 可选的关键字参数,占位用。
"""
    def __init__(self, *args, **kw):
        pass

    def __call__(self, *inputs):
        if len(inputs) == 1:
            return inputs[0]
        return package(*inputs)

    def __repr__(self):
        return make_repr('Module', 'Identity')

Compilation

lazyllm.common.compile_func(func_code, global_env=None)

将一段 python 函数字符串编译成一个可执行函数并返回。

Parameters:

  • func_code (str) –

    包含 python 函数代码的字符串

  • global_env (str, default: None ) –

    在 python 函数中用到的包和全局变量

Examples:

from lazyllm.common import compile_func
code_str = 'def Identity(v): return v'
identity = compile_func(code_str)
assert identity('hello') == 'hello'
Source code in lazyllm/common/utils.py
def compile_func(func_code: str, global_env: Optional[Dict[str, Any]] = None) -> Callable:
    """
将一段 python 函数字符串编译成一个可执行函数并返回。

Args:
    func_code (str): 包含 python 函数代码的字符串
    global_env (str): 在 python 函数中用到的包和全局变量


Examples:

    from lazyllm.common import compile_func
    code_str = 'def Identity(v): return v'
    identity = compile_func(code_str)
    assert identity('hello') == 'hello'
    """
    fname = re.search(r'def\s+(\w+)\s*\(', func_code).group(1)
    module = ast.parse(func_code)
    SecurityVisitor().visit(module)
    func = compile(module, filename="<ast>", mode="exec")
    local_dict = {}
    exec(func, global_env if global_env is not None else local_dict, local_dict)
    return local_dict.pop(fname)

Queue

lazyllm.common.FileSystemQueue

Bases: ABC

基于文件系统的队列抽象基类。

FileSystemQueue是一个抽象基类,提供了基于文件系统的队列操作接口。它支持多种后端实现(如SQLite、Redis),用于在分布式环境中进行消息传递和数据流控制。

该类实现了单例模式,确保每个类名只有一个队列实例,并提供了线程安全的队列操作。

Parameters:

  • klass (str, default: '__default__' ) –

    队列的类名标识符。默认为 '__default__'

Returns:

  • FileSystemQueue: 队列实例(单例模式)
Source code in lazyllm/common/queue.py
class FileSystemQueue(ABC):
    """基于文件系统的队列抽象基类。

FileSystemQueue是一个抽象基类,提供了基于文件系统的队列操作接口。它支持多种后端实现(如SQLite、Redis),用于在分布式环境中进行消息传递和数据流控制。

该类实现了单例模式,确保每个类名只有一个队列实例,并提供了线程安全的队列操作。

Args:
    klass (str, optional): 队列的类名标识符。默认为 ``'__default__'``。

**Returns:**

- FileSystemQueue: 队列实例(单例模式)
"""

    __queue_pool__ = dict()

    def __init__(self, *, klass='__default__'):
        super().__init__()
        self._class = klass

    def __new__(cls, *args, **kw):
        klass = kw.get('klass', '__default__')
        if klass not in __class__.__queue_pool__:
            if cls is __class__:
                __class__.__queue_pool__[klass] = cls.__default_queue__(*args, **kw)
            else:
                __class__.__queue_pool__[klass] = super().__new__(cls)
        return __class__.__queue_pool__[klass]

    @classmethod
    def get_instance(cls, klass):
        assert isinstance(klass, str) and klass != '__default__'
        return cls(klass=klass)

    @classmethod
    def set_default(cls, queue: Type):
        cls.__default_queue__ = queue

    @property
    def sid(self):
        return f'{globals._sid}-{self._class}'

    def enqueue(self, message):
        """将消息加入队列。

此方法将指定的消息添加到队列的尾部,遵循先进先出(FIFO)的原则。

Args:
    message: 要加入队列的消息内容。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='enqueue_test')
    >>> queue.enqueue(123)
    >>> queue.peek()
    '123'
    """
        return self._enqueue(self.sid, message)
    def dequeue(self, limit=None):
        """从队列中取出消息。

此方法从队列头部取出消息并移除它们,可以指定一次取出的消息数量。

Args:
    limit (int, optional): 一次取出的最大消息数量。如果为None,则取出所有消息。默认为None。

**Returns:**

- list: 取出的消息列表。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='dequeue_test')
    >>> for i in range(5):
    ...     queue.enqueue(f"Message{i}")
    >>> all_messages = queue.dequeue()
    >>> all_messages
    ['Message0', 'Message1', 'Message2', 'Message3', 'Message4']
    """
        return self._dequeue(self.sid, limit=limit)
    def peek(self):
        """查看队列头部的消息但不移除。

此方法允许查看队列中最早的消息,但不会将其从队列中移除。

**Returns:**

- 队列头部的消息,如果队列为空则返回None。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='peek_test')
    >>> queue.enqueue("First message")
    >>> queue.enqueue("Second message")
    >>> first_message = queue.peek()
    >>> first_message
    'First message'
    >>> queue.peek()
    'First message'
    """
        return self._peek(self.sid)
    def size(self):
        """获取队列大小。

此方法返回当前队列中的消息数量。

**Returns:**

- int: 队列中的消息数量。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='size_test')
    >>> queue.size()
    0
    >>> queue.enqueue("Message1")
    >>> queue.size()
    1
    >>> queue.enqueue("Message2")
    >>> queue.size()
    2
    >>> queue.dequeue()
    ['Message1', 'Message2']
    >>> queue.size()
    0
    """
        return self._size(self.sid)
    def init(self): self.clear()

    def clear(self):
        """清空队列。

此方法移除队列中的所有消息,将队列重置为空状态。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='clear_test')
    >>> for i in range(10):
    ...     queue.enqueue(f"Message{i}")
    >>> queue.size()
    10
    >>> queue.clear()
    >>> queue.size()
    0
    >>> queue.peek() is None
    True
    """
        self._clear(self.sid)

    @abstractmethod
    def _enqueue(self, id, message): pass

    @abstractmethod
    def _dequeue(self, id, limit=None): pass

    @abstractmethod
    def _peek(self, id): pass

    @abstractmethod
    def _size(self, id): pass

    @abstractmethod
    def _clear(self, id): pass

clear()

清空队列。

此方法移除队列中的所有消息,将队列重置为空状态。

Examples:

>>> import lazyllm
>>> queue = lazyllm.FileSystemQueue(klass='clear_test')
>>> for i in range(10):
...     queue.enqueue(f"Message{i}")
>>> queue.size()
10
>>> queue.clear()
>>> queue.size()
0
>>> queue.peek() is None
True
Source code in lazyllm/common/queue.py
    def clear(self):
        """清空队列。

此方法移除队列中的所有消息,将队列重置为空状态。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='clear_test')
    >>> for i in range(10):
    ...     queue.enqueue(f"Message{i}")
    >>> queue.size()
    10
    >>> queue.clear()
    >>> queue.size()
    0
    >>> queue.peek() is None
    True
    """
        self._clear(self.sid)

dequeue(limit=None)

从队列中取出消息。

此方法从队列头部取出消息并移除它们,可以指定一次取出的消息数量。

Parameters:

  • limit (int, default: None ) –

    一次取出的最大消息数量。如果为None,则取出所有消息。默认为None。

Returns:

  • list: 取出的消息列表。

Examples:

>>> import lazyllm
>>> queue = lazyllm.FileSystemQueue(klass='dequeue_test')
>>> for i in range(5):
...     queue.enqueue(f"Message{i}")
>>> all_messages = queue.dequeue()
>>> all_messages
['Message0', 'Message1', 'Message2', 'Message3', 'Message4']
Source code in lazyllm/common/queue.py
    def dequeue(self, limit=None):
        """从队列中取出消息。

此方法从队列头部取出消息并移除它们,可以指定一次取出的消息数量。

Args:
    limit (int, optional): 一次取出的最大消息数量。如果为None,则取出所有消息。默认为None。

**Returns:**

- list: 取出的消息列表。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='dequeue_test')
    >>> for i in range(5):
    ...     queue.enqueue(f"Message{i}")
    >>> all_messages = queue.dequeue()
    >>> all_messages
    ['Message0', 'Message1', 'Message2', 'Message3', 'Message4']
    """
        return self._dequeue(self.sid, limit=limit)

enqueue(message)

将消息加入队列。

此方法将指定的消息添加到队列的尾部,遵循先进先出(FIFO)的原则。

Parameters:

  • message

    要加入队列的消息内容。

Examples:

>>> import lazyllm
>>> queue = lazyllm.FileSystemQueue(klass='enqueue_test')
>>> queue.enqueue(123)
>>> queue.peek()
'123'
Source code in lazyllm/common/queue.py
    def enqueue(self, message):
        """将消息加入队列。

此方法将指定的消息添加到队列的尾部,遵循先进先出(FIFO)的原则。

Args:
    message: 要加入队列的消息内容。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='enqueue_test')
    >>> queue.enqueue(123)
    >>> queue.peek()
    '123'
    """
        return self._enqueue(self.sid, message)

peek()

查看队列头部的消息但不移除。

此方法允许查看队列中最早的消息,但不会将其从队列中移除。

Returns:

  • 队列头部的消息,如果队列为空则返回None。

Examples:

>>> import lazyllm
>>> queue = lazyllm.FileSystemQueue(klass='peek_test')
>>> queue.enqueue("First message")
>>> queue.enqueue("Second message")
>>> first_message = queue.peek()
>>> first_message
'First message'
>>> queue.peek()
'First message'
Source code in lazyllm/common/queue.py
    def peek(self):
        """查看队列头部的消息但不移除。

此方法允许查看队列中最早的消息,但不会将其从队列中移除。

**Returns:**

- 队列头部的消息,如果队列为空则返回None。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='peek_test')
    >>> queue.enqueue("First message")
    >>> queue.enqueue("Second message")
    >>> first_message = queue.peek()
    >>> first_message
    'First message'
    >>> queue.peek()
    'First message'
    """
        return self._peek(self.sid)

size()

获取队列大小。

此方法返回当前队列中的消息数量。

Returns:

  • int: 队列中的消息数量。

Examples:

>>> import lazyllm
>>> queue = lazyllm.FileSystemQueue(klass='size_test')
>>> queue.size()
0
>>> queue.enqueue("Message1")
>>> queue.size()
1
>>> queue.enqueue("Message2")
>>> queue.size()
2
>>> queue.dequeue()
['Message1', 'Message2']
>>> queue.size()
0
Source code in lazyllm/common/queue.py
    def size(self):
        """获取队列大小。

此方法返回当前队列中的消息数量。

**Returns:**

- int: 队列中的消息数量。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='size_test')
    >>> queue.size()
    0
    >>> queue.enqueue("Message1")
    >>> queue.size()
    1
    >>> queue.enqueue("Message2")
    >>> queue.size()
    2
    >>> queue.dequeue()
    ['Message1', 'Message2']
    >>> queue.size()
    0
    """
        return self._size(self.sid)

lazyllm.common.multiprocessing.SpawnProcess

Bases: Process

Source code in lazyllm/common/multiprocessing.py
class SpawnProcess(multiprocessing.Process):
    def start(self):
        """
使用spawn方式启动进程。

此方法在启动进程时强制使用spawn方式,这种方式会创建一个全新的Python解释器进程。spawn方式相比fork更安全,特别是在多线程环境下。

**说明:**
- 使用spawn方式启动新进程,避免了fork可能带来的问题
- 会临时切换启动方式为spawn,执行完后恢复原有启动方式
- 继承自multiprocessing.Process.start()的所有功能


Examples:

    ```python
    from lazyllm.common.multiprocessing import SpawnProcess

    def worker():
        print("Worker process running")

    # Create and start a process using spawn method
    process = SpawnProcess(target=worker)
    process.start()
    process.join()
    ```
    """
        with _ctx('spawn'):
            return super().start()

start()

使用spawn方式启动进程。

此方法在启动进程时强制使用spawn方式,这种方式会创建一个全新的Python解释器进程。spawn方式相比fork更安全,特别是在多线程环境下。

说明: - 使用spawn方式启动新进程,避免了fork可能带来的问题 - 会临时切换启动方式为spawn,执行完后恢复原有启动方式 - 继承自multiprocessing.Process.start()的所有功能

Examples:

```python
from lazyllm.common.multiprocessing import SpawnProcess

def worker():
    print("Worker process running")

# Create and start a process using spawn method
process = SpawnProcess(target=worker)
process.start()
process.join()
```
Source code in lazyllm/common/multiprocessing.py
    def start(self):
        """
使用spawn方式启动进程。

此方法在启动进程时强制使用spawn方式,这种方式会创建一个全新的Python解释器进程。spawn方式相比fork更安全,特别是在多线程环境下。

**说明:**
- 使用spawn方式启动新进程,避免了fork可能带来的问题
- 会临时切换启动方式为spawn,执行完后恢复原有启动方式
- 继承自multiprocessing.Process.start()的所有功能


Examples:

    ```python
    from lazyllm.common.multiprocessing import SpawnProcess

    def worker():
        print("Worker process running")

    # Create and start a process using spawn method
    process = SpawnProcess(target=worker)
    process.start()
    process.join()
    ```
    """
        with _ctx('spawn'):
            return super().start()

lazyllm.common.queue.SQLiteQueue

Bases: FileSystemQueue

基于 SQLite 的持久化文件系统队列。 该类扩展自 FileSystemQueue,使用 SQLite 数据库存储队列数据,通过 position 字段保证先进先出顺序,并支持并发安全的消息入队、出队、查看队头、队列大小查询和清空操作。 队列数据库默认存储在 ~/.lazyllm_filesystem_queue.db,通过文件锁机制确保多进程安全访问。 Args: klass (str): 队列分类名,用于逻辑隔离不同的队列,默认为 'default'。

Source code in lazyllm/common/queue.py
class SQLiteQueue(FileSystemQueue):
    """基于 SQLite 的持久化文件系统队列。
该类扩展自 FileSystemQueue,使用 SQLite 数据库存储队列数据,通过 position 字段保证先进先出顺序,并支持并发安全的消息入队、出队、查看队头、队列大小查询和清空操作。
队列数据库默认存储在 ~/.lazyllm_filesystem_queue.db,通过文件锁机制确保多进程安全访问。
Args:
    klass (str): 队列分类名,用于逻辑隔离不同的队列,默认为 '__default__'。
"""
    def __init__(self, klass='__default__'):
        super(__class__, self).__init__(klass=klass)
        self.db_path = os.path.expanduser(os.path.join(config['home'], '.lazyllm_filesystem_queue.db'))
        self._lock = FileLock(self.db_path + '.lock')
        self._check_same_thread = not sqlite3_check_threadsafety()
        self._initialize_db()

    def _initialize_db(self):
        with self._lock, sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
            cursor = conn.cursor()
            cursor.execute('''
            CREATE TABLE IF NOT EXISTS queue (
                id TEXT NOT NULL,
                position INTEGER NOT NULL,
                message TEXT NOT NULL,
                PRIMARY KEY (id, position)
            )
            ''')
            conn.commit()

    def _enqueue(self, id, message):
        with self._lock:
            with sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                SELECT MAX(position) FROM queue WHERE id = ?
                ''', (id,))
                max_pos = cursor.fetchone()[0]
                next_pos = 0 if max_pos is None else max_pos + 1
                cursor.execute('''
                INSERT INTO queue (id, position, message)
                VALUES (?, ?, ?)
                ''', (id, next_pos, message))
                conn.commit()

    def _dequeue(self, id, limit=None):
        """Retrieve and remove all messages from the queue."""
        with self._lock:
            with sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
                cursor = conn.cursor()
                if limit:
                    cursor.execute('SELECT message, position FROM queue WHERE id = ? '
                                   'ORDER BY position ASC LIMIT ?', (id, limit))
                else:
                    cursor.execute('SELECT message, position FROM queue WHERE id = ? '
                                   'ORDER BY position ASC', (id,))

                rows = cursor.fetchall()
                if not rows:
                    return []
                messages = [row[0] for row in rows]
                cursor.execute('DELETE FROM queue WHERE id = ? AND position IN '
                               f'({",".join([str(row[1]) for row in rows])})', (id, ))
                conn.commit()
                return messages

    def _peek(self, id):
        with self._lock:
            with sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                SELECT message FROM queue WHERE id = ? ORDER BY position ASC LIMIT 1
                ''', (id,))
                row = cursor.fetchone()
                if row is None:
                    return None
                return row[0]

    def _size(self, id):
        with self._lock:
            with sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                SELECT COUNT(*) FROM queue WHERE id = ?
                ''', (id,))
                return cursor.fetchone()[0]

    def _clear(self, id):
        with self._lock:
            with sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                DELETE FROM queue WHERE id = ?
                ''', (id,))
                conn.commit()

lazyllm.common.ReadOnlyWrapper

Bases: object

\ 一个轻量级只读包装器,用于包裹任意对象并对外提供只读访问(实际并未完全禁止修改,但复制时不会携带原始对象)。包装器可以动态替换内部对象,并提供判断对象是否为空的辅助方法。 Args: obj (Optional[Any]): 初始被包装的对象,默认为 None。

Source code in lazyllm/common/common.py
class ReadOnlyWrapper(object):
    """\ 
一个轻量级只读包装器,用于包裹任意对象并对外提供只读访问(实际并未完全禁止修改,但复制时不会携带原始对象)。包装器可以动态替换内部对象,并提供判断对象是否为空的辅助方法。
Args:
    obj (Optional[Any]): 初始被包装的对象,默认为 None。
"""
    def __init__(self, obj=None):
        self.obj = obj

    def set(self, obj):
        """\ 
替换当前包装的内部对象。

Args:
    obj (Any): 新的内部对象。
"""
        self.obj = obj

    def __getattr__(self, key):
        # key will be 'obj' in copy.deepcopy
        if key != 'obj' and self.obj is not None:
            return getattr(self.obj, key)
        return super(__class__, self).__getattr__(key)

    # TODO: modify it
    def __repr__(self):
        r = self.obj.__repr__()
        return (f'{r[:-1]}' if r.endswith('>') else f'<{r}') + '(Readonly)>'

    def __deepcopy__(self, memo):
        # drop obj
        return ReadOnlyWrapper()

    def isNone(self):
        """\ 
检查当前包装器是否未持有任何对象。

Args:
    None.

**Returns**

- bool: 如果内部对象为 None 返回 True,否则 False。
"""
        return self.obj is None

isNone()

\ 检查当前包装器是否未持有任何对象。

Returns

  • bool: 如果内部对象为 None 返回 True,否则 False。
Source code in lazyllm/common/common.py
    def isNone(self):
        """\ 
检查当前包装器是否未持有任何对象。

Args:
    None.

**Returns**

- bool: 如果内部对象为 None 返回 True,否则 False。
"""
        return self.obj is None

set(obj)

\ 替换当前包装的内部对象。

Parameters:

  • obj (Any) –

    新的内部对象。

Source code in lazyllm/common/common.py
    def set(self, obj):
        """\ 
替换当前包装的内部对象。

Args:
    obj (Any): 新的内部对象。
"""
        self.obj = obj

lazyllm.common.queue.RedisQueue

Bases: FileSystemQueue

\ 基于 Redis 实现的文件系统队列(继承自 FileSystemQueue),用于跨进程/节点的消息传递与队列管理。内部使用指定的 redis_url 初始化并管理底层存储,同时提供线程安全的初始化逻辑。

Parameters:

  • klass (str, default: '__default__' ) –

    队列的分类名称,用于区分不同队列实例,默认值为 'default'。

Source code in lazyllm/common/queue.py
class RedisQueue(FileSystemQueue):
    """\ 
基于 Redis 实现的文件系统队列(继承自 FileSystemQueue),用于跨进程/节点的消息传递与队列管理。内部使用指定的 redis_url 初始化并管理底层存储,同时提供线程安全的初始化逻辑。

Args:
    klass (str): 队列的分类名称,用于区分不同队列实例,默认值为 '__default__'。
"""
    def __init__(self, klass='__default__'):
        super(__class__, self).__init__(klass=klass)
        self.redis_url = config["fsqredis_url"]
        self._lock = threading.Lock()
        self._initialize_db()

    def _initialize_db(self):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            assert (
                conn.ping()
            ), "Found fsque reids config but can not connect, please check your config `LAZYLLM_FSQREDIS_URL`."
            if not conn.exists(self.sid):
                conn.rpush(self.sid, '<start>')

    def _enqueue(self, id, message):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            conn.rpush(id, message)

    def _dequeue(self, id, limit=None):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            if limit:
                limit = limit + 1
                vals = conn.lrange(id, 1, limit)
                conn.ltrim(id, limit, -1)
            else:
                vals = conn.lrange(id, 1, -1)
                conn.ltrim(id, 0, 0)
            if not vals:
                return []
            return [val.decode('utf-8') for val in vals]

    def _peek(self, id):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            val = conn.lindex(id, 1)
            if val is None:
                return None
            return val.decode('utf-8')

    def _size(self, id):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            rsize = conn.llen(id)
            return rsize - 1  # empty : [ <start> ]

    def _clear(self, id):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            conn.delete(id)

Multiprocessing

lazyllm.common.ForkProcess

Bases: Process

LazyLLM 提供的增强进程类,继承自 Python 标准库的 multiprocessing.Process。此类专门使用 fork 启动方法来创建子进程,并提供了同步/异步执行模式的支持。

Parameters:

  • group

    进程组,默认为 None

  • target

    要在进程中执行的函数,默认为 None

  • name

    进程名称,默认为 None

  • args

    传递给目标函数的参数元组,默认为 ()

  • kwargs

    传递给目标函数的关键字参数字典,默认为 {}

  • daemon

    是否为守护进程,默认为 None

  • sync

    是否为同步模式,默认为 True。在同步模式下,进程执行完目标函数后会自动退出;在异步模式下,进程会持续运行直到被手动终止。

注意: 此类主要用于 LazyLLM 内部的进程管理,特别是在需要长期运行的服务器进程中。

Examples:

>>> import lazyllm
>>> from lazyllm.common import ForkProcess
>>> import time
>>> import os
>>> def simple_task(task_id):
...     print(f"Process {os.getpid()} executing task {task_id}")
...     time.sleep(0.1)  
...     return f"Task {task_id} completed by process {os.getpid()}"
>>> process = ForkProcess(target=simple_task, args=(1,), sync=True)
>>> process.start()
Process 12345 executing task 1
Source code in lazyllm/common/multiprocessing.py
class ForkProcess(multiprocessing.Process):
    """LazyLLM 提供的增强进程类,继承自 Python 标准库的 `multiprocessing.Process`。此类专门使用 fork 启动方法来创建子进程,并提供了同步/异步执行模式的支持。

Args:
    group: 进程组,默认为 ``None``
    target: 要在进程中执行的函数,默认为 ``None``
    name: 进程名称,默认为 ``None``
    args: 传递给目标函数的参数元组,默认为 ``()``
    kwargs: 传递给目标函数的关键字参数字典,默认为 ``{}``
    daemon: 是否为守护进程,默认为 ``None``
    sync: 是否为同步模式,默认为 ``True``。在同步模式下,进程执行完目标函数后会自动退出;在异步模式下,进程会持续运行直到被手动终止。

**注意**: 此类主要用于 LazyLLM 内部的进程管理,特别是在需要长期运行的服务器进程中。


Examples:
    >>> import lazyllm
    >>> from lazyllm.common import ForkProcess
    >>> import time
    >>> import os
    >>> def simple_task(task_id):
    ...     print(f"Process {os.getpid()} executing task {task_id}")
    ...     time.sleep(0.1)  
    ...     return f"Task {task_id} completed by process {os.getpid()}"
    >>> process = ForkProcess(target=simple_task, args=(1,), sync=True)
    >>> process.start()
    Process 12345 executing task 1
    """
    def __init__(self, group=None, target=None, name=None, args=(),
                 kwargs=None, *, daemon=None, sync=True):
        super().__init__(group, ForkProcess.work(target, sync), name, args, kwargs or {}, daemon=daemon)

    @staticmethod
    def work(f, sync):
        def impl(*args, **kw):
            try:
                f(*args, **kw)
                if not sync:
                    while True: time.sleep(1)
            finally:
                atexit._run_exitfuncs()
        return impl

    def start(self):
        """启动 ForkProcess 进程。此方法会使用 fork 启动方法来创建子进程,并开始执行目标函数。

此方法的特点:

- **Fork 启动**: 使用 fork 方法创建子进程,在 Unix/Linux 系统上提供更好的性能
- **上下文管理**: 自动管理进程启动方法的上下文,确保使用正确的启动方式
- **继承父类**: 继承自 `multiprocessing.Process.start()` 的所有功能

**注意**: 此方法会实际创建新的进程并开始执行,调用后进程会立即开始运行。

"""
        with _ctx('fork'):
            return super().start()

start()

启动 ForkProcess 进程。此方法会使用 fork 启动方法来创建子进程,并开始执行目标函数。

此方法的特点:

  • Fork 启动: 使用 fork 方法创建子进程,在 Unix/Linux 系统上提供更好的性能
  • 上下文管理: 自动管理进程启动方法的上下文,确保使用正确的启动方式
  • 继承父类: 继承自 multiprocessing.Process.start() 的所有功能

注意: 此方法会实际创建新的进程并开始执行,调用后进程会立即开始运行。

Source code in lazyllm/common/multiprocessing.py
    def start(self):
        """启动 ForkProcess 进程。此方法会使用 fork 启动方法来创建子进程,并开始执行目标函数。

此方法的特点:

- **Fork 启动**: 使用 fork 方法创建子进程,在 Unix/Linux 系统上提供更好的性能
- **上下文管理**: 自动管理进程启动方法的上下文,确保使用正确的启动方式
- **继承父类**: 继承自 `multiprocessing.Process.start()` 的所有功能

**注意**: 此方法会实际创建新的进程并开始执行,调用后进程会立即开始运行。

"""
        with _ctx('fork'):
            return super().start()

Options

lazyllm.common.Option

Bases: object

LazyLLM 提供的选项管理类,用于管理多个选项值并在它们之间进行迭代。此类主要用于参数网格搜索和超参数调优场景。

Parameters:

  • *obj

    一个或多个选项值,可以是任意类型的对象。如果传入单个列表或元组,会自动展开。

此类的主要特性:

  • 多选项管理: 可以管理多个不同的选项值
  • 迭代支持: 支持标准的 Python 迭代协议,可以遍历所有选项
  • 当前值访问: 始终可以访问当前选中的选项值
  • 深度复制: 支持深度复制当前选中的选项值
  • 多进程兼容: 支持在多进程环境中使用

注意: 此类主要用于 LazyLLM 内部的参数搜索和试验管理,特别是在 TrialModule 中进行参数网格搜索时。

Examples:

>>> import lazyllm
>>> from lazyllm.common.option import Option
>>> learning_rates = Option(0.001, 0.01, 0.1)
>>> print(f"当前学习率: {learning_rates}")
当前学习率: <Option options="(0.001, 0.01, 0.1)" curr="0.001">
>>> print(f"所有选项: {list(learning_rates)}")
所有选项: [0.001, 0.01, 0.1]
Source code in lazyllm/common/option.py
class Option(object):
    """LazyLLM 提供的选项管理类,用于管理多个选项值并在它们之间进行迭代。此类主要用于参数网格搜索和超参数调优场景。

Args:
    *obj: 一个或多个选项值,可以是任意类型的对象。如果传入单个列表或元组,会自动展开。

此类的主要特性:

- **多选项管理**: 可以管理多个不同的选项值
- **迭代支持**: 支持标准的 Python 迭代协议,可以遍历所有选项
- **当前值访问**: 始终可以访问当前选中的选项值
- **深度复制**: 支持深度复制当前选中的选项值
- **多进程兼容**: 支持在多进程环境中使用

**注意**: 此类主要用于 LazyLLM 内部的参数搜索和试验管理,特别是在 TrialModule 中进行参数网格搜索时。



Examples:
    >>> import lazyllm
    >>> from lazyllm.common.option import Option
    >>> learning_rates = Option(0.001, 0.01, 0.1)
    >>> print(f"当前学习率: {learning_rates}")
    当前学习率: <Option options="(0.001, 0.01, 0.1)" curr="0.001">
    >>> print(f"所有选项: {list(learning_rates)}")
    所有选项: [0.001, 0.01, 0.1]
    """
    def __init__(self, *obj):
        if len(obj) == 1 and isinstance(obj[0], (tuple, list)): obj = obj[0]
        assert isinstance(obj, (tuple, list)) and len(obj) > 1, 'More than one option shoule be given'
        self._objs = obj
        self._idx = 0
        self._obj = self._objs[self._idx]

    def _next(self):
        self._idx += 1
        if self._idx == len(self._objs):
            self._idx = 0
            raise StopIteration

    def __setattr__(self, __name: str, __value: Any) -> None:
        object.__setattr__(self, __name, __value)
        if __name == '_idx' and 0 <= self._idx < len(self._objs):
            self._obj = self._objs[self._idx]

    def __deepcopy__(self, *args, **kw):
        return copy.deepcopy(self._obj)

    def __iter__(self):
        return _OptionIterator(self)

    def __repr__(self):
        return f'<Option options="{self._objs}" curr="{self._obj}">'

DynamicDescriptor

lazyllm.common.DynamicDescriptor

Source code in lazyllm/common/common.py
class DynamicDescriptor:
    class Impl:
        def __init__(self, func, instance, owner):
            self._func, self._instance, self._owner = func, instance, owner

        def __call__(self, *args, **kw):
            return self._func(self._instance, *args, **kw) if self._instance else self._func(self._owner, *args, **kw)

        def __repr__(self): return repr(self._func)
        __doc__ = property(lambda self: self._func.__doc__)

        @__doc__.setter
        def __doc__(self, value): self._func.__doc__ = value

    def __init__(self, func):
        self.__func__ = func

    def __get__(self, instance, owner):
        return DynamicDescriptor.Impl(self.__func__, instance, owner)

lazyllm.common.CaseInsensitiveDict

Bases: dict

大小写不敏感的字典类。

CaseInsensitiveDict 继承自 dict,提供大小写不敏感的键值存储和检索功能。所有的键都会被转换为小写形式存储,确保无论使用大写、小写或混合大小写的键名都能访问到相同的值。

特点: - 所有键在存储时自动转换为小写 - 支持标准的字典操作(获取、设置、检查包含关系) - 保持字典的原有功能,只是键名处理方式不同

Parameters:

  • *args

    传递给父类 dict 的位置参数

  • **kwargs

    传递给父类 dict 的关键字参数

Examples:

>>> from lazyllm.common import CaseInsensitiveDict
>>> # 创建大小写不敏感的字典
>>> d = CaseInsensitiveDict({'Name': 'John', 'AGE': 25, 'City': 'New York'})
>>> 
>>> # 使用不同大小写访问相同的键
>>> print(d['name'])      # 使用小写
... 'John'
>>> print(d['NAME'])      # 使用大写
... 'John'
>>> print(d['Name'])      # 使用首字母大写
... 'John'
>>> 
>>> # 设置值时也会转换为小写
>>> d['EMAIL'] = 'john@example.com'
>>> print(d['email'])     # 使用小写访问
... 'john@example.com'
>>> 
>>> # 检查键是否存在(大小写不敏感)
>>> 'AGE' in d
True
>>> 'age' in d
True
>>> 'Age' in d
True
>>> 
>>> # 支持标准字典操作
>>> d['PHONE'] = '123-456-7890'
>>> print(d.get('phone'))
... '123-456-7890'
>>> print(len(d))
... 5
Source code in lazyllm/common/common.py
class CaseInsensitiveDict(dict):
    """大小写不敏感的字典类。

CaseInsensitiveDict 继承自 dict,提供大小写不敏感的键值存储和检索功能。所有的键都会被转换为小写形式存储,确保无论使用大写、小写或混合大小写的键名都能访问到相同的值。

特点:
    - 所有键在存储时自动转换为小写
    - 支持标准的字典操作(获取、设置、检查包含关系)
    - 保持字典的原有功能,只是键名处理方式不同

Args:
    *args: 传递给父类 dict 的位置参数
    **kwargs: 传递给父类 dict 的关键字参数


Examples:
    >>> from lazyllm.common import CaseInsensitiveDict
    >>> # 创建大小写不敏感的字典
    >>> d = CaseInsensitiveDict({'Name': 'John', 'AGE': 25, 'City': 'New York'})
    >>> 
    >>> # 使用不同大小写访问相同的键
    >>> print(d['name'])      # 使用小写
    ... 'John'
    >>> print(d['NAME'])      # 使用大写
    ... 'John'
    >>> print(d['Name'])      # 使用首字母大写
    ... 'John'
    >>> 
    >>> # 设置值时也会转换为小写
    >>> d['EMAIL'] = 'john@example.com'
    >>> print(d['email'])     # 使用小写访问
    ... 'john@example.com'
    >>> 
    >>> # 检查键是否存在(大小写不敏感)
    >>> 'AGE' in d
    True
    >>> 'age' in d
    True
    >>> 'Age' in d
    True
    >>> 
    >>> # 支持标准字典操作
    >>> d['PHONE'] = '123-456-7890'
    >>> print(d.get('phone'))
    ... '123-456-7890'
    >>> print(len(d))
    ... 5
    """
    def __init__(self, *args, **kwargs):
        super().__init__()
        for key, value in dict(*args, **kwargs).items():
            assert isinstance(key, str)
            self[key] = value

    def __getitem__(self, key):
        assert isinstance(key, str)
        return super().__getitem__(key.lower())

    def __setitem__(self, key, value):
        assert isinstance(key, str)
        super().__setitem__(key.lower(), value)

    def __contains__(self, key):
        assert isinstance(key, str)
        return super().__contains__(key.lower())

lazyllm.common.ProcessPoolExecutor

Bases: ProcessPoolExecutor

Source code in lazyllm/common/multiprocessing.py
class ProcessPoolExecutor(PPE):
    def submit(self, fn, /, *args, **kwargs):
        """将任务提交到进程池中执行。

此方法将一个函数及其参数序列化后提交到进程池中执行,返回一个 `Future` 对象,用于获取任务执行结果或状态。

Args:
    fn (Callable): 要执行的函数。
    *args: 传递给函数的位置参数。
    **kwargs: 传递给函数的关键字参数。

Returns:
    concurrent.futures.Future: 表示任务执行状态的 `Future` 对象。


Examples:
    >>> from lazyllm.common.multiprocessing import ProcessPoolExecutor
    >>> import time
    >>> 
    >>> def task(x):
    ...     time.sleep(1)
    ...     return x * 2
    ... 
    >>> with ProcessPoolExecutor(max_workers=2) as executor:
    ...     future = executor.submit(task, 5)
    ...     result = future.result()
    ...     print(result)
    10
    """
        f = dump_obj(functools.partial(fn, *args, **kwargs))
        return super(__class__, self).submit(_worker, f)

submit(fn, /, *args, **kwargs)

将任务提交到进程池中执行。

此方法将一个函数及其参数序列化后提交到进程池中执行,返回一个 Future 对象,用于获取任务执行结果或状态。

Parameters:

  • fn (Callable) –

    要执行的函数。

  • *args

    传递给函数的位置参数。

  • **kwargs

    传递给函数的关键字参数。

Returns:

  • concurrent.futures.Future: 表示任务执行状态的 Future 对象。

Examples:

>>> from lazyllm.common.multiprocessing import ProcessPoolExecutor
>>> import time
>>> 
>>> def task(x):
...     time.sleep(1)
...     return x * 2
... 
>>> with ProcessPoolExecutor(max_workers=2) as executor:
...     future = executor.submit(task, 5)
...     result = future.result()
...     print(result)
10
Source code in lazyllm/common/multiprocessing.py
    def submit(self, fn, /, *args, **kwargs):
        """将任务提交到进程池中执行。

此方法将一个函数及其参数序列化后提交到进程池中执行,返回一个 `Future` 对象,用于获取任务执行结果或状态。

Args:
    fn (Callable): 要执行的函数。
    *args: 传递给函数的位置参数。
    **kwargs: 传递给函数的关键字参数。

Returns:
    concurrent.futures.Future: 表示任务执行状态的 `Future` 对象。


Examples:
    >>> from lazyllm.common.multiprocessing import ProcessPoolExecutor
    >>> import time
    >>> 
    >>> def task(x):
    ...     time.sleep(1)
    ...     return x * 2
    ... 
    >>> with ProcessPoolExecutor(max_workers=2) as executor:
    ...     future = executor.submit(task, 5)
    ...     result = future.result()
    ...     print(result)
    10
    """
        f = dump_obj(functools.partial(fn, *args, **kwargs))
        return super(__class__, self).submit(_worker, f)

Threading

lazyllm.common.Thread

Bases: Thread

LazyLLM 提供的增强线程类,继承自 Python 标准库的 threading.Thread。此类提供了额外的功能,包括会话ID管理、预钩子函数支持和异常处理机制。

Parameters:

  • group

    线程组,默认为 None

  • target

    要在线程中执行的函数,默认为 None

  • name

    线程名称,默认为 None

  • args

    传递给目标函数的参数元组,默认为 ()

  • kwargs

    传递给目标函数的关键字参数字典,默认为 None

  • prehook

    在线程执行前要调用的函数或函数列表,默认为 None

  • daemon

    是否为守护线程,默认为 None

Examples:

>>> import lazyllm
>>> from lazyllm.common.threading import Thread
>>> import time
>>> def simple_task(name):
...     time.sleep(0.1)
...     return f"Hello from {name}"
>>> thread = Thread(target=simple_task, args=("Worker",))
>>> thread.start()
>>> result = thread.get_result()
>>> print(result)
Hello from Worker
>>> def setup_environment():
...     print("Setting up environment...")
...     return "environment_ready"
>>> def validate_input(data):
...     print(f"Validating input: {data}")
...     if not isinstance(data, (int, float)):
...         raise ValueError("Input must be numeric")
>>> def process_data(data):
...     print(f"Processing data: {data}")
...     time.sleep(0.1) 
...     return data * 2
>>> thread = Thread(
...     target=process_data,
...     args=(42,),
...     prehook=[setup_environment, lambda: validate_input(42)]
... )
>>> thread.start()
Setting up environment...
Validating input: 42
Processing data: 42
>>> result = thread.get_result()
>>> print(f"Final result: {result}")
Final result: 84
Source code in lazyllm/common/threading.py
class Thread(threading.Thread):
    """LazyLLM 提供的增强线程类,继承自 Python 标准库的 `threading.Thread`。此类提供了额外的功能,包括会话ID管理、预钩子函数支持和异常处理机制。

Args:
    group: 线程组,默认为 ``None``
    target: 要在线程中执行的函数,默认为 ``None``
    name: 线程名称,默认为 ``None``
    args: 传递给目标函数的参数元组,默认为 ``()``
    kwargs: 传递给目标函数的关键字参数字典,默认为 ``None``
    prehook: 在线程执行前要调用的函数或函数列表,默认为 ``None``
    daemon: 是否为守护线程,默认为 ``None``


Examples:
    >>> import lazyllm
    >>> from lazyllm.common.threading import Thread
    >>> import time
    >>> def simple_task(name):
    ...     time.sleep(0.1)
    ...     return f"Hello from {name}"
    >>> thread = Thread(target=simple_task, args=("Worker",))
    >>> thread.start()
    >>> result = thread.get_result()
    >>> print(result)
    Hello from Worker
    >>> def setup_environment():
    ...     print("Setting up environment...")
    ...     return "environment_ready"
    >>> def validate_input(data):
    ...     print(f"Validating input: {data}")
    ...     if not isinstance(data, (int, float)):
    ...         raise ValueError("Input must be numeric")
    >>> def process_data(data):
    ...     print(f"Processing data: {data}")
    ...     time.sleep(0.1) 
    ...     return data * 2
    >>> thread = Thread(
    ...     target=process_data,
    ...     args=(42,),
    ...     prehook=[setup_environment, lambda: validate_input(42)]
    ... )
    >>> thread.start()
    Setting up environment...
    Validating input: 42
    Processing data: 42
    >>> result = thread.get_result()
    >>> print(f"Final result: {result}")
    Final result: 84
    """
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, prehook=None, daemon=None):
        self.q = Queue()
        if not isinstance(prehook, (tuple, list)): prehook = [prehook] if prehook else []
        prehook.insert(0, functools.partial(_sid_setter, sid=globals._sid))
        super().__init__(group, self.work, name, (prehook, target, args), kwargs, daemon=daemon)

    def work(self, prehook, target, args, **kw):
        """线程的核心工作方法,负责执行预钩子函数、目标函数,并处理异常和结果。

Args:
    prehook: 预钩子函数列表,在线程执行前调用
    target: 要执行的目标函数
    args: 传递给目标函数的参数
    **kw: 传递给目标函数的关键字参数

**注意**: 此方法由 `Thread` 类内部调用,用户通常不需要直接调用此方法。
"""
        [p() for p in prehook]
        try:
            r = target(*args, **kw)
        except Exception as e:
            self.q.put(e)
        else:
            self.q.put(r)

    def get_result(self):
        """获取线程执行结果的方法。此方法会阻塞直到线程执行完成,然后返回执行结果或重新抛出异常。

**Returns:**

- 线程执行的结果。如果目标函数正常执行,返回其返回值;如果发生异常,会重新抛出该异常。

**注意**: 此方法应该在调用 `thread.start()` 之后使用,用于获取线程的执行结果。
"""
        r = self.q.get()
        if isinstance(r, Exception):
            raise r
        return r

get_result()

获取线程执行结果的方法。此方法会阻塞直到线程执行完成,然后返回执行结果或重新抛出异常。

Returns:

  • 线程执行的结果。如果目标函数正常执行,返回其返回值;如果发生异常,会重新抛出该异常。

注意: 此方法应该在调用 thread.start() 之后使用,用于获取线程的执行结果。

Source code in lazyllm/common/threading.py
    def get_result(self):
        """获取线程执行结果的方法。此方法会阻塞直到线程执行完成,然后返回执行结果或重新抛出异常。

**Returns:**

- 线程执行的结果。如果目标函数正常执行,返回其返回值;如果发生异常,会重新抛出该异常。

**注意**: 此方法应该在调用 `thread.start()` 之后使用,用于获取线程的执行结果。
"""
        r = self.q.get()
        if isinstance(r, Exception):
            raise r
        return r

work(prehook, target, args, **kw)

线程的核心工作方法,负责执行预钩子函数、目标函数,并处理异常和结果。

Parameters:

  • prehook

    预钩子函数列表,在线程执行前调用

  • target

    要执行的目标函数

  • args

    传递给目标函数的参数

  • **kw

    传递给目标函数的关键字参数

注意: 此方法由 Thread 类内部调用,用户通常不需要直接调用此方法。

Source code in lazyllm/common/threading.py
    def work(self, prehook, target, args, **kw):
        """线程的核心工作方法,负责执行预钩子函数、目标函数,并处理异常和结果。

Args:
    prehook: 预钩子函数列表,在线程执行前调用
    target: 要执行的目标函数
    args: 传递给目标函数的参数
    **kw: 传递给目标函数的关键字参数

**注意**: 此方法由 `Thread` 类内部调用,用户通常不需要直接调用此方法。
"""
        [p() for p in prehook]
        try:
            r = target(*args, **kw)
        except Exception as e:
            self.q.put(e)
        else:
            self.q.put(r)

LazyLLMCMD

lazyllm.common.LazyLLMCMD

Bases: object

命令行操作封装类,提供安全、灵活的命令行管理功能。

Parameters:

  • cmd (Union[str, List[str], Callable]) –

    命令行指令,支持三种形式:字符串命令,命令列表,可调用对象。

  • return_value (Any, default: None ) –

    预设返回值。

  • checkf(Any)

    命令验证函数。

  • no_displays(Any)

    需要过滤的敏感参数名。

Examples:

>>> from lazyllm.common import LazyLLMCMD
>>> cmd = LazyLLMCMD("run --epochs=50 --batch-size=32")
>>> print(cmd.get_args("epochs"))
50
>>> print(cmd.get_args("batch-size")) 
32
>>> base = LazyLLMCMD("python train.py", checkf=lambda x: True)
>>> new = base.with_cmd("python predict.py")
Source code in lazyllm/common/common.py
class LazyLLMCMD(object):
    """命令行操作封装类,提供安全、灵活的命令行管理功能。

Args:
    cmd (Union[str, List[str], Callable]):命令行指令,支持三种形式:字符串命令,命令列表,可调用对象。
    return_value (Any):预设返回值。
    checkf(Any):命令验证函数。
    no_displays(Any):需要过滤的敏感参数名。


Examples:
    >>> from lazyllm.common import LazyLLMCMD
    >>> cmd = LazyLLMCMD("run --epochs=50 --batch-size=32")
    >>> print(cmd.get_args("epochs"))
    50
    >>> print(cmd.get_args("batch-size")) 
    32
    >>> base = LazyLLMCMD("python train.py", checkf=lambda x: True)
    >>> new = base.with_cmd("python predict.py")

    """
    def __init__(self, cmd, *, return_value=None, checkf=(lambda *a: True), no_displays=None):
        if isinstance(cmd, (tuple, list)):
            cmd = ' && '.join(cmd)
        assert isinstance(cmd, str) or callable(cmd), 'cmd must be func or (list of) bash command str.'
        self.cmd = cmd
        self.return_value = return_value
        self.checkf = checkf
        self.no_displays = no_displays

    def __hash__(self):
        return hash(self.cmd)

    def __str__(self):
        assert not callable(self.cmd), f'Cannot convert cmd function {self.cmd} to str'
        cmd = re.sub(r'\b(LAZYLLM_[A-Z0-9_]*?_(?:API|SECRET)_KEY)=\S+', r'\1=xxxxxx', self.cmd)
        if self.no_displays:
            for item in self.no_displays:
                pattern = r'(-{1,2}' + re.escape(item) + r')(\s|=|)(\S+|)'
                cmd = re.sub(pattern, "", cmd)
            return cmd
        else:
            return cmd

    def with_cmd(self, cmd):
        """创建新命令对象并继承当前配置。

参数:
    cmd: 新的命令内容(类型需与原始命令一致)

"""
        # Attention: Cannot use copy.deepcopy because of class method.
        new_instance = LazyLLMCMD(cmd, return_value=self.return_value,
                                  checkf=self.checkf, no_displays=self.no_displays)
        return new_instance

    def get_args(self, key):
        """从命令字符串中提取指定参数的值。

参数:
    key: 要提取的参数名
"""
        assert not callable(self.cmd), f'Cannot get args from function {self.cmd}'
        pattern = r'*(-{1,2}' + re.escape(key) + r')(\s|=|)(\S+|)*'
        return re.match(pattern, self.cmd)[3]

get_args(key)

从命令字符串中提取指定参数的值。

参数

key: 要提取的参数名

Source code in lazyllm/common/common.py
    def get_args(self, key):
        """从命令字符串中提取指定参数的值。

参数:
    key: 要提取的参数名
"""
        assert not callable(self.cmd), f'Cannot get args from function {self.cmd}'
        pattern = r'*(-{1,2}' + re.escape(key) + r')(\s|=|)(\S+|)*'
        return re.match(pattern, self.cmd)[3]

with_cmd(cmd)

创建新命令对象并继承当前配置。

参数

cmd: 新的命令内容(类型需与原始命令一致)

Source code in lazyllm/common/common.py
    def with_cmd(self, cmd):
        """创建新命令对象并继承当前配置。

参数:
    cmd: 新的命令内容(类型需与原始命令一致)

"""
        # Attention: Cannot use copy.deepcopy because of class method.
        new_instance = LazyLLMCMD(cmd, return_value=self.return_value,
                                  checkf=self.checkf, no_displays=self.no_displays)
        return new_instance