Skip to content

通用

Register

lazyllm.common.Register

Bases: object

LazyLLM提供了一套组件注册机制,允许将任意函数注册为LazyLLM的Component。通过注册器提供的分组机制,注册后的函数可在任意位置通过分组索引进行调用,无需显式导入。

lazyllm.components.register(cls, *, rewrite_func)→ 装饰器

该函数调用后返回一个装饰器,将被装饰函数包装为Component并注册到名为cls的分组中。

Parameters:

  • base (type) –

    基类

  • fnames (Union[str, List[str]]) –

    要重写的函数名或函数名列表

  • template (str, default: None ) –

    注册模板字符串,默认为标准注册模板

  • default_group (str, default: None ) –

    默认组名,默认为None

Examples:

>>> import lazyllm
>>> @lazyllm.component_register('mygroup')
... def myfunc(input):
...    return input
...
>>> lazyllm.mygroup.myfunc()(1)
1
>>> @lazyllm.component_register.cmd('mygroup')
... def mycmdfunc(input):
...     return f'echo {input}'
...
>>> lazyllm.mygroup.mycmdfunc()(1)
PID: 2024-06-01 00:00:00 lazyllm INFO: (lazyllm.launcher) Command: echo 1
PID: 2024-06-01 00:00:00 lazyllm INFO: (lazyllm.launcher) PID: 1
Source code in lazyllm/common/registry.py
class Register(object):
    """LazyLLM提供了一套组件注册机制,允许将任意函数注册为LazyLLM的Component。通过注册器提供的分组机制,注册后的函数可在任意位置通过分组索引进行调用,无需显式导入。

<span style="font-size: 18px;">&ensp;**`lazyllm.components.register(cls, *, rewrite_func)→ 装饰器`**</span>

该函数调用后返回一个装饰器,将被装饰函数包装为Component并注册到名为cls的分组中。

Args:
    base (type): 基类
    fnames (Union[str, List[str]]): 要重写的函数名或函数名列表
    template (str, optional): 注册模板字符串,默认为标准注册模板
    default_group (str, optional): 默认组名,默认为None


Examples:
    >>> import lazyllm
    >>> @lazyllm.component_register('mygroup')
    ... def myfunc(input):
    ...    return input
    ...
    >>> lazyllm.mygroup.myfunc()(1)
    1
    >>> @lazyllm.component_register.cmd('mygroup')
    ... def mycmdfunc(input):
    ...     return f'echo {input}'
    ...
    >>> lazyllm.mygroup.mycmdfunc()(1)
    PID: 2024-06-01 00:00:00 lazyllm INFO: (lazyllm.launcher) Command: echo 1
    PID: 2024-06-01 00:00:00 lazyllm INFO: (lazyllm.launcher) PID: 1
    """
    def __init__(self, base, fnames, template: Optional[str] = None, default_group: Optional[str] = None,
                 allowed_parameter: Optional[Union[str, List[str]]] = None):
        if template is not None:
            warnings.warn('The "template" parameter of Register is deprecated and ignored.',
                          DeprecationWarning, stacklevel=2)
        self.basecls = base
        self.fnames = [fnames] if isinstance(fnames, str) else fnames
        self._default_group = default_group
        if isinstance(allowed_parameter, str):
            self._allowed_parameter = {allowed_parameter}
        elif isinstance(allowed_parameter, list):
            assert all(isinstance(p, str) for p in allowed_parameter), 'allowed_parameter must be list of str'
            self._allowed_parameter = set(allowed_parameter)
        elif allowed_parameter is None:
            self._allowed_parameter = set()
        else:
            raise TypeError('allowed_parameter must be str or list[str]')
        assert len(self.fnames) > 0, 'At least one function should be given for overwrite.'

    def _wrap(self, cls, *, rewrite_func=None, **kwargs):
        cls = cls.__name__ if isinstance(cls, type) else cls
        cls = re.match('(LazyLLM)(.*)(Base)', cls.split('.')[-1])[2] \
            if (cls.startswith('LazyLLM') and cls.endswith('Base')) else cls
        base = _get_base_cls_from_registry(cls.lower())
        assert issubclass(base, self.basecls)
        if rewrite_func is None:
            rewrite_func = base.__reg_overwrite__ if getattr(base, '__reg_overwrite__', None) else self.fnames[0]
        assert rewrite_func in self.fnames, f'Invalid function "{rewrite_func}" provived for rewrite.'

        def impl(func, func_name=None):
            if func_name:
                func_for_wrapper = func  # avoid calling recursively

                @functools.wraps(func)
                def wrapper_func(*args, **kwargs):
                    return func_for_wrapper(*args, **kwargs)

                wrapper_func.__name__ = func_name
                func = wrapper_func
            else:
                func_name = func.__name__
            base_cls = LazyLLMRegisterMetaClass.all_clses[cls.lower()].base
            type(func_name + cls.split('.')[-1].capitalize(), (base_cls,), {})
            f = LazyLLMRegisterMetaClass.all_clses[cls.lower()].__getattr__(func_name)

            # Support multiprocessing: Register the class in the module where the function is defined
            if func.__module__ in sys.modules:
                setattr(sys.modules[func.__module__], f.__name__, f)
                f.__module__ = func.__module__

            f.__name__ = func_name
            setattr(f, rewrite_func, bind_to_instance(func))
            for k, v in kwargs.items():
                setattr(f, k, v)
            return func
        return impl

    def __call__(self, f, *, rewrite_func=None, **kwargs):
        if not isinstance(f, (str, type)):
            assert self._default_group, 'default_group is not set, please set it by your register decorator'
            return self._wrap(self._default_group)(f)
        assert all(k in self._allowed_parameter for k in kwargs.keys()), \
            f'Only allowed parameters: {self._allowed_parameter}, but got {kwargs.keys()}'
        return self._wrap(f, rewrite_func=rewrite_func, **kwargs)

    def __getattr__(self, name):
        if name not in self.fnames:
            raise AttributeError(f'class {self.__class__} has no attribute {name}')

        def impl(cls):
            return self(cls, rewrite_func=name)
        return impl

    def new_group(self, group_name):
        """创建一个新的ComponentGroup。新建的group会自动加入到__builtin__中,无需import即可在任一位置访问到该group。

Args:
    group_name (str): 待创建的group的名字
"""
        return type(f'LazyLLM{group_name}Base', (self.basecls,), {})

new_group(group_name)

创建一个新的ComponentGroup。新建的group会自动加入到__builtin__中,无需import即可在任一位置访问到该group。

Parameters:

  • group_name (str) –

    待创建的group的名字

Source code in lazyllm/common/registry.py
    def new_group(self, group_name):
        """创建一个新的ComponentGroup。新建的group会自动加入到__builtin__中,无需import即可在任一位置访问到该group。

Args:
    group_name (str): 待创建的group的名字
"""
        return type(f'LazyLLM{group_name}Base', (self.basecls,), {})

lazyllm.common.registry.LazyDict

Bases: dict

一个为懒惰的程序员设计的特殊字典类。支持多种便捷的访问和操作方式。

特性:

  1. 使用点号代替['str']访问字典元素
  2. 支持首字母小写来使语句更像函数调用
  3. 当字典只有一个元素时支持直接调用
  4. 支持动态默认键
  5. 如果组名出现在名称中,允许省略组名

Parameters:

  • name (str, default: '' ) –

    字典的名称,默认为空字符串。

  • base

    基类引用,默认为None。

  • *args

    位置参数,传递给dict父类。

  • **kw

    关键字参数,传递给dict父类。

Source code in lazyllm/common/registry.py
class LazyDict(dict):
    """一个为懒惰的程序员设计的特殊字典类。支持多种便捷的访问和操作方式。

特性:

1. 使用点号代替['str']访问字典元素 
2. 支持首字母小写来使语句更像函数调用
3. 当字典只有一个元素时支持直接调用
4. 支持动态默认键
5. 如果组名出现在名称中,允许省略组名

Args:
    name (str): 字典的名称,默认为空字符串。
    base: 基类引用,默认为None。
    *args: 位置参数,传递给dict父类。
    **kw: 关键字参数,传递给dict父类。
"""
    def __init__(self, name='', base=None, *args, **kw):
        super(__class__, self).__init__(*args, **kw)
        self._default: Optional[str] = None
        self.name = name.capitalize()
        self.base = base

    def __setitem__(self, key, value):
        key = key.lower()
        assert key != 'default', 'LazyDict do not support key: default'
        if '.' in key:
            grp, key = key.rsplit('.', 1)
            return self[grp].__setitem__(key, value)
        return super().__setitem__(key, value)

    def __getitem__(self, key):
        key = key.lower()
        if '.' in key:
            grp, key = key.split('.', 1)
            return self[grp][key]
        return super().__getitem__(key)

    # default -> self.default
    # key -> Key, keyName, KeyName
    # if self.name ends with 's' or 'es', ignor it
    def _match(self, key: str):
        key = key.lower()
        if key == 'default':
            assert self._default or len(self) > 0, 'No default key set'
            key = self._default or list(self.keys())[0]
        keys = [key, f'{key}{self.name}', f'{key}{self.name.lower()}']
        if self.name.endswith('s'):
            n = 2 if self.name.endswith('es') else 1
            keys.extend([f'{key}{self.name[:-n]}', f'{key}{self.name[:-n].lower()}'])

        for k in set(keys):
            if k in self.keys():
                return k
        raise AttributeError(f'Attr {key} not found in `{self.name}: {self}`, conditates: {keys}')

    def __getattr__(self, key):
        return self[self._match(key)]

    def resolve(self, key):
        """解析键名并返回字典中匹配的值。

Args:
    key (str): 要解析的键。支持与__getattr__相同的键匹配规则,包括默认键、首字母小写和组名省略等特性。

Returns:
    Any: 匹配键对应的值。

注意:
    如果找不到匹配的键,将抛出AttributeError异常。
"""
        return self[self._match(key)]

    def remove(self, key):
        """从字典中移除指定的键值对。

Args:
    key (str): 要移除的键。支持与__getattr__相同的键匹配规则,包括首字母小写和组名省略等特性。

注意:
    如果找不到匹配的键,将抛出AttributeError异常。
"""
        super(__class__, self).pop(self._match(key))

    def __call__(self, *args, **kwargs):
        assert self._default is not None or len(self.keys()) == 1
        return (self.default if self._default else self[list(self.keys())[0]])(*args, **kwargs)

    def set_default(self, key: str):
        """设置字典的默认键。设置后可以通过.default属性访问该键对应的值。

Args:
    key (str): 要设置为默认的键名。

注意:
    - key必须是字符串类型
    - 设置后可以通过.default访问,或在字典只有一个元素时直接调用
"""
        assert isinstance(key, str), 'default key must be str'
        self._default = key.lower()

    def __contains__(self, key):
        try:
            _ = self[self._match(key)]
            return True
        except (AttributeError, KeyError):
            return False

remove(key)

从字典中移除指定的键值对。

Parameters:

  • key (str) –

    要移除的键。支持与__getattr__相同的键匹配规则,包括首字母小写和组名省略等特性。

注意

如果找不到匹配的键,将抛出AttributeError异常。

Source code in lazyllm/common/registry.py
    def remove(self, key):
        """从字典中移除指定的键值对。

Args:
    key (str): 要移除的键。支持与__getattr__相同的键匹配规则,包括首字母小写和组名省略等特性。

注意:
    如果找不到匹配的键,将抛出AttributeError异常。
"""
        super(__class__, self).pop(self._match(key))

set_default(key)

设置字典的默认键。设置后可以通过.default属性访问该键对应的值。

Parameters:

  • key (str) –

    要设置为默认的键名。

注意
  • key必须是字符串类型
  • 设置后可以通过.default访问,或在字典只有一个元素时直接调用
Source code in lazyllm/common/registry.py
    def set_default(self, key: str):
        """设置字典的默认键。设置后可以通过.default属性访问该键对应的值。

Args:
    key (str): 要设置为默认的键名。

注意:
    - key必须是字符串类型
    - 设置后可以通过.default访问,或在字典只有一个元素时直接调用
"""
        assert isinstance(key, str), 'default key must be str'
        self._default = key.lower()

lazyllm.common.common.ResultCollector

Bases: object

结果收集器,用于在流程或任务执行过程中按名称存储和访问结果。
它通过调用自身(传入 name)返回一个可调用的 Impl 对象来收集指定名称的结果。
适用于需要跨步骤共享中间结果的场景。

Source code in lazyllm/common/common.py
class ResultCollector(object):
    """结果收集器,用于在流程或任务执行过程中按名称存储和访问结果。  
它通过调用自身(传入 name)返回一个可调用的 Impl 对象来收集指定名称的结果。  
适用于需要跨步骤共享中间结果的场景。
"""
    class Impl(object):
        def __init__(self, name, value): self._name, self._value = name, value

        def __call__(self, *args, **kw):
            assert (len(args) == 0) ^ (len(kw) == 0), f'args({len(args)}), kwargs({len(kw)})'
            assert self._name is not None
            if len(args) > 0:
                self._value[self._name] = args[0] if len(args) == 1 else package(*args)
                return self._value[self._name]
            else:
                self._value[self._name] = kw
                return kwargs(kw)

    def __init__(self): self._value = dict()
    def __call__(self, name): return ResultCollector.Impl(name, self._value)
    def __getitem__(self, name): return self._value[name]
    def __repr__(self): return repr(self._value)
    def keys(self):
        """获取所有已存储结果的名称。

**Returns:**

- KeysView[str]: 结果名称集合。
"""
        return self._value.keys()
    def items(self):
        """获取所有已存储的 (名称, 值) 对。

**Returns:**

- ItemsView[str, Any]: 结果的键值对集合。
"""
        return self._value.items()

items()

获取所有已存储的 (名称, 值) 对。

Returns:

  • ItemsView[str, Any]: 结果的键值对集合。
Source code in lazyllm/common/common.py
    def items(self):
        """获取所有已存储的 (名称, 值) 对。

**Returns:**

- ItemsView[str, Any]: 结果的键值对集合。
"""
        return self._value.items()

keys()

获取所有已存储结果的名称。

Returns:

  • KeysView[str]: 结果名称集合。
Source code in lazyllm/common/common.py
    def keys(self):
        """获取所有已存储结果的名称。

**Returns:**

- KeysView[str]: 结果名称集合。
"""
        return self._value.keys()

lazyllm.common.common.EnvVarContextManager

环境变量上下文管理器,用于 在代码块执行期间临时设置环境变量,退出时自动恢复原始环境变量。

Parameters:

  • env_vars_dict (dict) –

    需要临时设置的环境变量字典,值为 None 的变量将被忽略。

Source code in lazyllm/common/common.py
class EnvVarContextManager:
    """环境变量上下文管理器,用于 在代码块执行期间临时设置环境变量,退出时自动恢复原始环境变量。

Args:
    env_vars_dict (dict): 需要临时设置的环境变量字典,值为 None 的变量将被忽略。
"""
    def __init__(self, env_vars_dict):
        self.env_vars_dict = {var: value for var, value in env_vars_dict.items() if value is not None}
        self.original_values = {}

    def __enter__(self):
        for var, value in self.env_vars_dict.items():
            if var in os.environ:
                self.original_values[var] = os.environ[var]
            os.environ[var] = value
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        for var in self.env_vars_dict:
            if var in self.original_values:
                os.environ[var] = self.original_values[var]
            else:
                del os.environ[var]

Bind

lazyllm.common.bind

Bind

Bases: object

Bind 类用于函数绑定与延迟调用,支持动态参数传入和上下文参数解析,实现灵活的函数组合与流水线式调用。

bind 函数能够将一个函数与固定的位置参数和关键字参数绑定,支持使用占位符(如 _0, _1)引用当前数据流中上游节点的输出,实现数据在流水线中的跳跃传递和灵活组合。

注意事项: - 绑定的参数可以是具体值,也可以是当前数据流中上游节点的输出占位符。 - 参数绑定仅在当前数据流上下文内生效,不能跨数据流绑定或绑定外部变量。

Parameters:

  • __bind_func (Callable 或 type, default: _None ) –

    要绑定的函数或函数类型,传入类型时会自动实例化。

  • *args

    绑定时固定的位置参数,可以包含占位符。

  • **kw

    绑定时固定的关键字参数,可以包含占位符。

Examples:

>>> from lazyllm import bind, _0, _1
>>> def f1(x):
...     return x ** 2
>>> def f21(input1, input2=0):
...     return input1 + input2 + 1
>>> def f22(input1, input2=0):
...     return input1 + input2 - 1
>>> def f3(in1='placeholder1', in2='placeholder2', in3='placeholder3'):
...     return f"get [input:{in1}], [f21:{in2}], [f22:{in3}]"
>>> from lazyllm import pipeline, parallel
>>> with pipeline() as ppl:
...     ppl.f1 = f1
...     with parallel() as ppl.subprl2:
...         ppl.subprl2.path1 = f21
...         ppl.subprl2.path2 = f22
...     ppl.f3 = bind(f3, ppl.input, _0, _1)
...
>>> print(ppl(2))
get [input:2], [f21:5], [f22:3]
>>> # Demonstrate operator '|' overloading for bind
>>> with pipeline() as ppl2:
...     ppl2.f1 = f1
...     with parallel().bind(ppl2.input, _0) as ppl2.subprl2:
...         ppl2.subprl2.path1 = f21
...         ppl2.subprl2.path2 = f22
...     ppl2.f3 = f3 | bind(ppl2.input, _0, _1)
...
>>> print(ppl2(2))
get [input:2], [f21:7], [f22:5]
Source code in lazyllm/common/bind.py
class Bind(object):
    """Bind 类用于函数绑定与延迟调用,支持动态参数传入和上下文参数解析,实现灵活的函数组合与流水线式调用。

bind 函数能够将一个函数与固定的位置参数和关键字参数绑定,支持使用占位符(如 _0, _1)引用当前数据流中上游节点的输出,实现数据在流水线中的跳跃传递和灵活组合。

注意事项:
    - 绑定的参数可以是具体值,也可以是当前数据流中上游节点的输出占位符。
    - 参数绑定仅在当前数据流上下文内生效,不能跨数据流绑定或绑定外部变量。

Args:
    __bind_func (Callable 或 type): 要绑定的函数或函数类型,传入类型时会自动实例化。
    *args: 绑定时固定的位置参数,可以包含占位符。
    **kw: 绑定时固定的关键字参数,可以包含占位符。


Examples:
    >>> from lazyllm import bind, _0, _1
    >>> def f1(x):
    ...     return x ** 2
    >>> def f21(input1, input2=0):
    ...     return input1 + input2 + 1
    >>> def f22(input1, input2=0):
    ...     return input1 + input2 - 1
    >>> def f3(in1='placeholder1', in2='placeholder2', in3='placeholder3'):
    ...     return f"get [input:{in1}], [f21:{in2}], [f22:{in3}]"

    >>> from lazyllm import pipeline, parallel

    >>> with pipeline() as ppl:
    ...     ppl.f1 = f1
    ...     with parallel() as ppl.subprl2:
    ...         ppl.subprl2.path1 = f21
    ...         ppl.subprl2.path2 = f22
    ...     ppl.f3 = bind(f3, ppl.input, _0, _1)
    ...
    >>> print(ppl(2))
    get [input:2], [f21:5], [f22:3]

    >>> # Demonstrate operator '|' overloading for bind
    >>> with pipeline() as ppl2:
    ...     ppl2.f1 = f1
    ...     with parallel().bind(ppl2.input, _0) as ppl2.subprl2:
    ...         ppl2.subprl2.path1 = f21
    ...         ppl2.subprl2.path2 = f22
    ...     ppl2.f3 = f3 | bind(ppl2.input, _0, _1)
    ...
    >>> print(ppl2(2))
    get [input:2], [f21:7], [f22:5]
    """
    class _None: pass

    class Args(object):
        class _None: pass
        class Unpack(package): pass

        def __init__(self, source_id: str, target_id: str = 'input', *, unpack: bool = False):
            self._item_key, self._attr_key = Bind.Args._None, Bind.Args._None
            self._source_id, self._target_id = source_id, target_id
            self._unpack = unpack

        def __getitem__(self, key: str):
            self._item_key = key
            return self

        def __getattr__(self, key: str):
            if key.startswith('__') and key.endswith('__'):
                raise AttributeError(f'Args has no attribute {key}')
            self._attr_key = key
            return self

        def __getstate__(self):
            return self._item_key, self._attr_key, self._source_id, self._target_id

        def __setstate__(self, state):
            self._item_key, self._attr_key, self._source_id, self._target_id = state

        def get_arg(self, source):
            if not source or source['source'] != self._source_id:
                if self._source_id in locals['bind_args']:
                    source = locals['bind_args'][self._source_id]
                    if not source or source['source'] != self._source_id:
                        LOG.error(f'Get source failed, source is {source}, and expect id is {self._source_id}')
                        raise RuntimeError('Internal Error, please report issue to `https://github.com/LazyAGI/LazyLLM`')
                else:
                    LOG.error(f'Get source failed, locals is {locals["bind_args"]} with id {locals._sid}, '
                              f'and expect id is {self._source_id}')
                    raise RuntimeError('Unable to find the bound parameter, possibly due to pipeline.input/output can '
                                       'only be bind in direct member of pipeline! You may solve this by defining the '
                                       'pipeline in a `with lazyllm.save_pipeline_result():` block.')
            input = result = source[self._target_id]
            source = source['source']
            if self._item_key is not Bind.Args._None: result = input[self._item_key]
            elif self._attr_key is not Bind.Args._None: result = getattr(input, self._attr_key)
            if self._unpack and isinstance(result, package): result = Bind.Args.Unpack(result)
            return result

        def __repr__(self):
            return f'<class \'lazyllm.common.bind.Bind.Args\' source={self._source_id}>'

    def __init__(self, __bind_func=_None, *args, **kw):
        object.__setattr__(self, '_hooks', [])
        self._f = __bind_func() if isinstance(__bind_func, type) and __bind_func is not Bind._None else __bind_func
        self._args = args
        self._kw = kw

    def _wraps_plain_callable(self) -> bool:
        inner = getattr(self, '_f', None)
        if inner is None or not callable(inner):
            return False
        return not (hasattr(inner, '_flow_id') or hasattr(inner, '_module_id'))

    def __or__(self, other):
        if isinstance(other, Bind):
            return other.__ror__(self)
        return NotImplementedError('Only support `binded-func | bind()` syntax!')

    def __ror__(self, __value: Callable):
        if self._f is not Bind._None: self._args = (self._f,) + self._args
        self._f = __value
        return self

    # _bind_args_source: dict(input=input, args=dict(key=value))
    def __call__(self, *args, _bind_args_source=None, **kw):
        if self._f is None: return None
        keys = set(kw.keys()).intersection(set(self._kw.keys()))
        assert len(keys) == 0, f'Keys `{keys}` are already bind!'
        bind_args = args if len(self._args) == 0 else (
            [args[a.idx] if isinstance(a, Placeholder) else a for a in self._args])
        kwargs = {k: args[v.idx] if isinstance(v, Placeholder) else v for k, v in self._kw.items()}
        bind_args = [a.get_arg(_bind_args_source) if isinstance(a, Bind.Args) else a for a in bind_args]
        bind_args = list(itertools.chain.from_iterable(x if isinstance(x, Bind.Args.Unpack) else [x] for x in bind_args))
        kwargs = {k: v.get_arg(_bind_args_source) if isinstance(v, Bind.Args) else v for k, v in kwargs.items()}
        if self._hooks:
            from lazyllm.hook import execution_with_hooks
            merged = {**kwargs, **kw}
            return execution_with_hooks(self, *args, **merged)(self._f)(*bind_args, **merged)
        return self._f(*bind_args, **kwargs, **kw)

    # TODO: modify it
    def __repr__(self) -> str:
        return self._f.__repr__() + '(bind args: {})'.format(
            ', '.join([repr(a) if a is not self else 'self' for a in self._args] + [
                f'{k}={repr(v) if v is not self else "self"}' for k, v in self._kw.items()]))

    def __getattr__(self, name):
        # name will be '_f' in copy.deepcopy
        if name != '_f':
            return getattr(self._f, name)
        return super(__class__, self).__getattr__(name)

    def __setattr__(self, __name: str, __value: Any) -> None:
        if __name not in ('_f', '_args', '_kw', '_has_root', '_hooks'):
            return setattr(self._f, __name, __value)
        return super(__class__, self).__setattr__(__name, __value)

    def __eq__(self, value):
        return value == self._f

    def bind(self, *args, **kw):
        return Bind(self, *args, **kw)

Package

lazyllm.common.package

Bases: tuple

package类用于封装流水线或并行模块的返回值,保证传递给下游模块时自动拆包,从而支持多个值的灵活传递。

Examples:

>>> from lazyllm.common import package
>>> p = package(1, 2, 3)
>>> p
(1, 2, 3)
>>> p[1]
2
>>> p_slice = p[1:]
>>> isinstance(p_slice, package)
True
>>> p2 = package([4, 5])
>>> p + p2
(1, 2, 3, 4, 5)
Source code in lazyllm/common/common.py
class package(tuple):
    """package类用于封装流水线或并行模块的返回值,保证传递给下游模块时自动拆包,从而支持多个值的灵活传递。


Examples:
    >>> from lazyllm.common import package
    >>> p = package(1, 2, 3)
    >>> p
    (1, 2, 3)
    >>> p[1]
    2
    >>> p_slice = p[1:]
    >>> isinstance(p_slice, package)
    True
    >>> p2 = package([4, 5])
    >>> p + p2
    (1, 2, 3, 4, 5)
    """
    def __new__(cls, *args):
        if len(args) == 1 and isinstance(args[0], (tuple, list, types.GeneratorType)):
            return super(__class__, cls).__new__(cls, args[0])
        else:
            return super(__class__, cls).__new__(cls, args)

    def __getitem__(self, key):
        if isinstance(key, slice):
            return package(super(__class__, self).__getitem__(key))
        return super(__class__, self).__getitem__(key)

    def __add__(self, __other):
        return package(super().__add__(__other))

Identity

lazyllm.common.Identity

恒等模块,用于直接返回输入值。

该模块常用于模块拼接结构中占位,无实际处理逻辑。若输入为多个参数,将自动打包为一个整体结构输出。

Parameters:

  • *args

    可选的位置参数,占位用。

  • **kw

    可选的关键字参数,占位用。

Source code in lazyllm/common/common.py
class Identity():
    """
恒等模块,用于直接返回输入值。

该模块常用于模块拼接结构中占位,无实际处理逻辑。若输入为多个参数,将自动打包为一个整体结构输出。

Args:
    *args: 可选的位置参数,占位用。
    **kw: 可选的关键字参数,占位用。
"""
    def __init__(self, *args, **kw):
        pass

    def __call__(self, *inputs):
        if len(inputs) == 1:
            return inputs[0]
        return package(*inputs)

    def __repr__(self):
        return make_repr('Module', 'Identity')

Compilation

lazyllm.common.compile_func(func_code, global_env=None)

将一段 python 函数字符串编译成一个可执行函数并返回。

Parameters:

  • func_code (str) –

    包含 python 函数代码的字符串

  • global_env (str, default: None ) –

    在 python 函数中用到的包和全局变量

Examples:

from lazyllm.common import compile_func
code_str = 'def Identity(v): return v'
identity = compile_func(code_str)
assert identity('hello') == 'hello'
Source code in lazyllm/common/utils.py
def compile_func(func_code: str, global_env: Optional[Dict[str, Any]] = None) -> Callable:
    """
将一段 python 函数字符串编译成一个可执行函数并返回。

Args:
    func_code (str): 包含 python 函数代码的字符串
    global_env (str): 在 python 函数中用到的包和全局变量


Examples:

    from lazyllm.common import compile_func
    code_str = 'def Identity(v): return v'
    identity = compile_func(code_str)
    assert identity('hello') == 'hello'
    """
    fname = re.search(r'def\s+(\w+)\s*\(', func_code).group(1)
    module = ast.parse(func_code)
    SecurityVisitor().visit(module)
    func = compile(module, filename='<ast>', mode='exec')
    local_dict = {}
    exec(func, global_env if global_env is not None else local_dict, local_dict)
    return local_dict.pop(fname)

Queue

lazyllm.common.FileSystemQueue

Bases: ABC

基于文件系统的队列抽象基类。

FileSystemQueue是一个抽象基类,提供了基于文件系统的队列操作接口。它支持多种后端实现(如SQLite、Redis),用于在分布式环境中进行消息传递和数据流控制。

该类实现了单例模式,确保每个类名只有一个队列实例,并提供了线程安全的队列操作。

Parameters:

  • klass (str, default: '__default__' ) –

    队列的类名标识符。默认为 '__default__'

Returns:

  • FileSystemQueue: 队列实例(单例模式)
Source code in lazyllm/common/queue.py
class FileSystemQueue(ABC):
    """基于文件系统的队列抽象基类。

FileSystemQueue是一个抽象基类,提供了基于文件系统的队列操作接口。它支持多种后端实现(如SQLite、Redis),用于在分布式环境中进行消息传递和数据流控制。

该类实现了单例模式,确保每个类名只有一个队列实例,并提供了线程安全的队列操作。

Args:
    klass (str, optional): 队列的类名标识符。默认为 ``'__default__'``。

**Returns:**

- FileSystemQueue: 队列实例(单例模式)
"""

    __queue_pool__ = dict()
    __queue_pool_lock__ = threading.RLock()

    def __init__(self, *, klass='__default__'):
        super().__init__()
        self._class = klass

    def __new__(cls, *args, **kw):
        klass = kw.get('klass', '__default__')
        if klass not in __class__.__queue_pool__:
            with __class__.__queue_pool_lock__:
                if klass not in __class__.__queue_pool__:
                    if cls is __class__:
                        __class__.__queue_pool__[klass] = cls.__default_queue__(*args, **kw)
                    else:
                        __class__.__queue_pool__[klass] = super().__new__(cls)
        return __class__.__queue_pool__[klass]

    @classmethod
    def get_instance(cls, klass):
        """获取指定类名对应的队列实例。

此方法会根据类名标识符返回对应的队列对象。如果该类名尚未注册,会触发自动初始化。

Args:
    klass (str): 队列类名标识符,不能为 ``'__default__'``。

**Returns:**

- FileSystemQueue: 与指定类名绑定的队列实例。
"""
        assert isinstance(klass, str) and klass != '__default__'
        return cls(klass=klass)

    @classmethod
    def set_default(cls, queue: Type):
        """设置默认的队列实现。

此方法用于指定默认的队列类,作为未传入 `klass` 参数时的后端实现。

Args:
    queue (Type): 默认队列类。
"""
        cls.__default_queue__ = queue

    @property
    def sid(self):
        return f'{globals._sid}-{self._class}'

    def enqueue(self, message):
        """将消息加入队列。

此方法将指定的消息添加到队列的尾部,遵循先进先出(FIFO)的原则。

Args:
    message: 要加入队列的消息内容。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='enqueue_test')
    >>> queue.enqueue(123)
    >>> queue.peek()
    '123'
    """
        return self._enqueue(self.sid, message)
    def dequeue(self, limit=None):
        """从队列中取出消息。

此方法从队列头部取出消息并移除它们,可以指定一次取出的消息数量。

Args:
    limit (int, optional): 一次取出的最大消息数量。如果为None,则取出所有消息。默认为None。

**Returns:**

- list: 取出的消息列表。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='dequeue_test')
    >>> for i in range(5):
    ...     queue.enqueue(f"Message{i}")
    >>> all_messages = queue.dequeue()
    >>> all_messages
    ['Message0', 'Message1', 'Message2', 'Message3', 'Message4']
    """
        return self._dequeue(self.sid, limit=limit)
    def peek(self):
        """获取队列中的下一个消息,但不移除。

**Returns:**

- Any: 队列中下一个可用的消息;如果队列为空,返回 ``None``。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='peek_test')
    >>> queue.enqueue("First message")
    >>> queue.enqueue("Second message")
    >>> first_message = queue.peek()
    >>> first_message
    'First message'
    >>> queue.peek()
    'First message'
    """
        return self._peek(self.sid)
    def size(self):
        """获取队列中的消息数量。

**Returns:**

- int: 队列中当前消息的数量。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='size_test')
    >>> queue.size()
    0
    >>> queue.enqueue("Message1")
    >>> queue.size()
    1
    >>> queue.enqueue("Message2")
    >>> queue.size()
    2
    >>> queue.dequeue()
    ['Message1', 'Message2']
    >>> queue.size()
    0
    """
        return self._size(self.sid)
    def init(self):
        """初始化队列。

该方法会清空当前队列中的所有消息,相当于调用 ``clear()``。
"""
        self.clear()

    def clear(self):
        """清空队列。

移除队列中的所有消息,使其恢复为空状态。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='clear_test')
    >>> for i in range(10):
    ...     queue.enqueue(f"Message{i}")
    >>> queue.size()
    10
    >>> queue.clear()
    >>> queue.size()
    0
    >>> queue.peek() is None
    True
    """
        self._clear(self.sid)

    @abstractmethod
    def _enqueue(self, id, message): pass

    @abstractmethod
    def _dequeue(self, id, limit=None): pass

    @abstractmethod
    def _peek(self, id): pass

    @abstractmethod
    def _size(self, id): pass

    @abstractmethod
    def _clear(self, id): pass

clear()

清空队列。

移除队列中的所有消息,使其恢复为空状态。

Examples:

>>> import lazyllm
>>> queue = lazyllm.FileSystemQueue(klass='clear_test')
>>> for i in range(10):
...     queue.enqueue(f"Message{i}")
>>> queue.size()
10
>>> queue.clear()
>>> queue.size()
0
>>> queue.peek() is None
True
Source code in lazyllm/common/queue.py
    def clear(self):
        """清空队列。

移除队列中的所有消息,使其恢复为空状态。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='clear_test')
    >>> for i in range(10):
    ...     queue.enqueue(f"Message{i}")
    >>> queue.size()
    10
    >>> queue.clear()
    >>> queue.size()
    0
    >>> queue.peek() is None
    True
    """
        self._clear(self.sid)

dequeue(limit=None)

从队列中取出消息。

此方法从队列头部取出消息并移除它们,可以指定一次取出的消息数量。

Parameters:

  • limit (int, default: None ) –

    一次取出的最大消息数量。如果为None,则取出所有消息。默认为None。

Returns:

  • list: 取出的消息列表。

Examples:

>>> import lazyllm
>>> queue = lazyllm.FileSystemQueue(klass='dequeue_test')
>>> for i in range(5):
...     queue.enqueue(f"Message{i}")
>>> all_messages = queue.dequeue()
>>> all_messages
['Message0', 'Message1', 'Message2', 'Message3', 'Message4']
Source code in lazyllm/common/queue.py
    def dequeue(self, limit=None):
        """从队列中取出消息。

此方法从队列头部取出消息并移除它们,可以指定一次取出的消息数量。

Args:
    limit (int, optional): 一次取出的最大消息数量。如果为None,则取出所有消息。默认为None。

**Returns:**

- list: 取出的消息列表。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='dequeue_test')
    >>> for i in range(5):
    ...     queue.enqueue(f"Message{i}")
    >>> all_messages = queue.dequeue()
    >>> all_messages
    ['Message0', 'Message1', 'Message2', 'Message3', 'Message4']
    """
        return self._dequeue(self.sid, limit=limit)

enqueue(message)

将消息加入队列。

此方法将指定的消息添加到队列的尾部,遵循先进先出(FIFO)的原则。

Parameters:

  • message

    要加入队列的消息内容。

Examples:

>>> import lazyllm
>>> queue = lazyllm.FileSystemQueue(klass='enqueue_test')
>>> queue.enqueue(123)
>>> queue.peek()
'123'
Source code in lazyllm/common/queue.py
    def enqueue(self, message):
        """将消息加入队列。

此方法将指定的消息添加到队列的尾部,遵循先进先出(FIFO)的原则。

Args:
    message: 要加入队列的消息内容。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='enqueue_test')
    >>> queue.enqueue(123)
    >>> queue.peek()
    '123'
    """
        return self._enqueue(self.sid, message)

get_instance(klass) classmethod

获取指定类名对应的队列实例。

此方法会根据类名标识符返回对应的队列对象。如果该类名尚未注册,会触发自动初始化。

Parameters:

  • klass (str) –

    队列类名标识符,不能为 '__default__'

Returns:

  • FileSystemQueue: 与指定类名绑定的队列实例。
Source code in lazyllm/common/queue.py
    @classmethod
    def get_instance(cls, klass):
        """获取指定类名对应的队列实例。

此方法会根据类名标识符返回对应的队列对象。如果该类名尚未注册,会触发自动初始化。

Args:
    klass (str): 队列类名标识符,不能为 ``'__default__'``。

**Returns:**

- FileSystemQueue: 与指定类名绑定的队列实例。
"""
        assert isinstance(klass, str) and klass != '__default__'
        return cls(klass=klass)

init()

初始化队列。

该方法会清空当前队列中的所有消息,相当于调用 clear()

Source code in lazyllm/common/queue.py
    def init(self):
        """初始化队列。

该方法会清空当前队列中的所有消息,相当于调用 ``clear()``。
"""
        self.clear()

peek()

获取队列中的下一个消息,但不移除。

Returns:

  • Any: 队列中下一个可用的消息;如果队列为空,返回 None

Examples:

>>> import lazyllm
>>> queue = lazyllm.FileSystemQueue(klass='peek_test')
>>> queue.enqueue("First message")
>>> queue.enqueue("Second message")
>>> first_message = queue.peek()
>>> first_message
'First message'
>>> queue.peek()
'First message'
Source code in lazyllm/common/queue.py
    def peek(self):
        """获取队列中的下一个消息,但不移除。

**Returns:**

- Any: 队列中下一个可用的消息;如果队列为空,返回 ``None``。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='peek_test')
    >>> queue.enqueue("First message")
    >>> queue.enqueue("Second message")
    >>> first_message = queue.peek()
    >>> first_message
    'First message'
    >>> queue.peek()
    'First message'
    """
        return self._peek(self.sid)

set_default(queue) classmethod

设置默认的队列实现。

此方法用于指定默认的队列类,作为未传入 klass 参数时的后端实现。

Parameters:

  • queue (Type) –

    默认队列类。

Source code in lazyllm/common/queue.py
    @classmethod
    def set_default(cls, queue: Type):
        """设置默认的队列实现。

此方法用于指定默认的队列类,作为未传入 `klass` 参数时的后端实现。

Args:
    queue (Type): 默认队列类。
"""
        cls.__default_queue__ = queue

size()

获取队列中的消息数量。

Returns:

  • int: 队列中当前消息的数量。

Examples:

>>> import lazyllm
>>> queue = lazyllm.FileSystemQueue(klass='size_test')
>>> queue.size()
0
>>> queue.enqueue("Message1")
>>> queue.size()
1
>>> queue.enqueue("Message2")
>>> queue.size()
2
>>> queue.dequeue()
['Message1', 'Message2']
>>> queue.size()
0
Source code in lazyllm/common/queue.py
    def size(self):
        """获取队列中的消息数量。

**Returns:**

- int: 队列中当前消息的数量。


Examples:
    >>> import lazyllm
    >>> queue = lazyllm.FileSystemQueue(klass='size_test')
    >>> queue.size()
    0
    >>> queue.enqueue("Message1")
    >>> queue.size()
    1
    >>> queue.enqueue("Message2")
    >>> queue.size()
    2
    >>> queue.dequeue()
    ['Message1', 'Message2']
    >>> queue.size()
    0
    """
        return self._size(self.sid)

lazyllm.common.multiprocessing.SpawnProcess

Bases: Process

Source code in lazyllm/common/multiprocessing.py
class SpawnProcess(multiprocessing.Process):
    def start(self):
        """
使用spawn方式启动进程。

此方法在启动进程时强制使用spawn方式,这种方式会创建一个全新的Python解释器进程。spawn方式相比fork更安全,特别是在多线程环境下。

**说明:**
- 使用spawn方式启动新进程,避免了fork可能带来的问题
- 会临时切换启动方式为spawn,执行完后恢复原有启动方式
- 继承自multiprocessing.Process.start()的所有功能


Examples:

    ```python
    from lazyllm.common.multiprocessing import SpawnProcess

    def worker():
        print("Worker process running")

    # Create and start a process using spawn method
    process = SpawnProcess(target=worker)
    process.start()
    process.join()
    ```
    """
        with _ctx('spawn'):
            return super().start()

start()

使用spawn方式启动进程。

此方法在启动进程时强制使用spawn方式,这种方式会创建一个全新的Python解释器进程。spawn方式相比fork更安全,特别是在多线程环境下。

说明: - 使用spawn方式启动新进程,避免了fork可能带来的问题 - 会临时切换启动方式为spawn,执行完后恢复原有启动方式 - 继承自multiprocessing.Process.start()的所有功能

Examples:

```python
from lazyllm.common.multiprocessing import SpawnProcess

def worker():
    print("Worker process running")

# Create and start a process using spawn method
process = SpawnProcess(target=worker)
process.start()
process.join()
```
Source code in lazyllm/common/multiprocessing.py
    def start(self):
        """
使用spawn方式启动进程。

此方法在启动进程时强制使用spawn方式,这种方式会创建一个全新的Python解释器进程。spawn方式相比fork更安全,特别是在多线程环境下。

**说明:**
- 使用spawn方式启动新进程,避免了fork可能带来的问题
- 会临时切换启动方式为spawn,执行完后恢复原有启动方式
- 继承自multiprocessing.Process.start()的所有功能


Examples:

    ```python
    from lazyllm.common.multiprocessing import SpawnProcess

    def worker():
        print("Worker process running")

    # Create and start a process using spawn method
    process = SpawnProcess(target=worker)
    process.start()
    process.join()
    ```
    """
        with _ctx('spawn'):
            return super().start()

lazyllm.common.queue.SQLiteQueue

Bases: FileSystemQueue

基于 SQLite 的持久化文件系统队列。 该类扩展自 FileSystemQueue,使用 SQLite 数据库存储队列数据,通过 position 字段保证先进先出顺序,并支持并发安全的消息入队、出队、查看队头、队列大小查询和清空操作。 队列数据库默认存储在 ~/.lazyllm_filesystem_queue.db,通过文件锁机制确保多进程安全访问。

Parameters:

  • klass (str, default: '__default__' ) –

    队列分类名,用于逻辑隔离不同的队列,默认为 'default'。

Source code in lazyllm/common/queue.py
class SQLiteQueue(FileSystemQueue):
    """基于 SQLite 的持久化文件系统队列。
该类扩展自 FileSystemQueue,使用 SQLite 数据库存储队列数据,通过 position 字段保证先进先出顺序,并支持并发安全的消息入队、出队、查看队头、队列大小查询和清空操作。
队列数据库默认存储在 ~/.lazyllm_filesystem_queue.db,通过文件锁机制确保多进程安全访问。

Args:
    klass (str): 队列分类名,用于逻辑隔离不同的队列,默认为 '__default__'。
"""
    _init_lock = threading.Lock()

    def __init__(self, klass='__default__'):
        if getattr(self, '_initialized', False):
            return
        with self._init_lock:
            if getattr(self, '_initialized', False):
                return
            super(__class__, self).__init__(klass=klass)
            self.db_path = os.path.expanduser(os.path.join(config['home'], '.lazyllm_filesystem_queue.db'))
            lock_kwargs = {}
            if 'is_singleton' in inspect.signature(SoftFileLock).parameters:
                lock_kwargs['is_singleton'] = True
            self._lock = SoftFileLock(self.db_path + '.lock', **lock_kwargs)
            self._check_same_thread = not sqlite3_check_threadsafety()
            try:
                self._initialize_db()
            except Exception:
                with FileSystemQueue.__queue_pool_lock__:
                    if FileSystemQueue.__queue_pool__.get(klass) is self:
                        FileSystemQueue.__queue_pool__.pop(klass, None)
                raise
            self._initialized = True

    def _initialize_db(self):
        with self._lock, sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
            cursor = conn.cursor()
            cursor.execute('''
            CREATE TABLE IF NOT EXISTS queue (
                id TEXT NOT NULL,
                position INTEGER NOT NULL,
                message TEXT NOT NULL,
                PRIMARY KEY (id, position)
            )
            ''')
            conn.commit()

    def _enqueue(self, id, message):
        with self._lock:
            with sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                SELECT MAX(position) FROM queue WHERE id = ?
                ''', (id,))
                max_pos = cursor.fetchone()[0]
                next_pos = 0 if max_pos is None else max_pos + 1
                cursor.execute('''
                INSERT INTO queue (id, position, message)
                VALUES (?, ?, ?)
                ''', (id, next_pos, message))
                conn.commit()

    def _dequeue(self, id, limit=None):
        """Retrieve and remove all messages from the queue."""
        with self._lock:
            with sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
                cursor = conn.cursor()
                if limit:
                    cursor.execute('SELECT message, position FROM queue WHERE id = ? '
                                   'ORDER BY position ASC LIMIT ?', (id, limit))
                else:
                    cursor.execute('SELECT message, position FROM queue WHERE id = ? '
                                   'ORDER BY position ASC', (id,))

                rows = cursor.fetchall()
                if not rows:
                    return []
                messages = [row[0] for row in rows]
                cursor.execute('DELETE FROM queue WHERE id = ? AND position IN '
                               f'({",".join([str(row[1]) for row in rows])})', (id, ))
                conn.commit()
                return messages

    def _peek(self, id):
        with self._lock:
            with sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                SELECT message FROM queue WHERE id = ? ORDER BY position ASC LIMIT 1
                ''', (id,))
                row = cursor.fetchone()
                if row is None:
                    return None
                return row[0]

    def _size(self, id):
        with self._lock:
            with sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                SELECT COUNT(*) FROM queue WHERE id = ?
                ''', (id,))
                return cursor.fetchone()[0]

    def _clear(self, id):
        with self._lock:
            with sqlite3.connect(self.db_path, check_same_thread=self._check_same_thread) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                DELETE FROM queue WHERE id = ?
                ''', (id,))
                conn.commit()

lazyllm.common.ReadOnlyWrapper

Bases: object

一个轻量级只读包装器,用于包裹任意对象并对外提供只读访问(实际并未完全禁止修改,但复制时不会携带原始对象)。包装器可以动态替换内部对象,并提供判断对象是否为空的辅助方法。

Parameters:

  • obj (Optional[Any], default: None ) –

    初始被包装的对象,默认为 None。

Source code in lazyllm/common/common.py
class ReadOnlyWrapper(object):
    """
一个轻量级只读包装器,用于包裹任意对象并对外提供只读访问(实际并未完全禁止修改,但复制时不会携带原始对象)。包装器可以动态替换内部对象,并提供判断对象是否为空的辅助方法。

Args:
    obj (Optional[Any]): 初始被包装的对象,默认为 None。
"""
    def __init__(self, obj=None):
        self.obj = obj

    def set(self, obj):
        """
替换当前包装的内部对象。

Args:
    obj (Any): 新的内部对象。
"""
        self.obj = obj

    def __getattr__(self, key):
        # key will be 'obj' in copy.deepcopy
        if key != 'obj' and self.obj is not None:
            return getattr(self.obj, key)
        return super(__class__, self).__getattr__(key)

    # TODO: modify it
    def __repr__(self):
        r = self.obj.__repr__()
        return (f'{r[:-1]}' if r.endswith('>') else f'<{r}') + '(Readonly)>'

    def __deepcopy__(self, memo):
        # drop obj
        return ReadOnlyWrapper()

    def isNone(self):
        """
检查当前包装器是否未持有任何对象。

Args:
    None.

**Returns:**

- bool: 如果内部对象为 None 返回 True,否则 False。
"""
        return self.obj is None

isNone()

检查当前包装器是否未持有任何对象。

Returns:

  • bool: 如果内部对象为 None 返回 True,否则 False。
Source code in lazyllm/common/common.py
    def isNone(self):
        """
检查当前包装器是否未持有任何对象。

Args:
    None.

**Returns:**

- bool: 如果内部对象为 None 返回 True,否则 False。
"""
        return self.obj is None

set(obj)

替换当前包装的内部对象。

Parameters:

  • obj (Any) –

    新的内部对象。

Source code in lazyllm/common/common.py
    def set(self, obj):
        """
替换当前包装的内部对象。

Args:
    obj (Any): 新的内部对象。
"""
        self.obj = obj

lazyllm.common.queue.RedisQueue

Bases: FileSystemQueue

基于 Redis 实现的文件系统队列(继承自 FileSystemQueue),用于跨进程/节点的消息传递与队列管理。内部使用指定的 redis_url 初始化并管理底层存储,同时提供线程安全的初始化逻辑。

Parameters:

  • klass (str, default: '__default__' ) –

    队列的分类名称,用于区分不同队列实例,默认值为 'default'。

Source code in lazyllm/common/queue.py
class RedisQueue(FileSystemQueue):
    """
基于 Redis 实现的文件系统队列(继承自 FileSystemQueue),用于跨进程/节点的消息传递与队列管理。内部使用指定的 redis_url 初始化并管理底层存储,同时提供线程安全的初始化逻辑。

Args:
    klass (str): 队列的分类名称,用于区分不同队列实例,默认值为 '__default__'。
"""
    def __init__(self, klass='__default__'):
        super(__class__, self).__init__(klass=klass)
        self.redis_url = config['fsqredis_url']
        self._lock = threading.Lock()
        self._initialize_db()

    def _initialize_db(self):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            assert (
                conn.ping()
            ), 'Found fsque reids config but can not connect, please check your config `LAZYLLM_FSQREDIS_URL`.'
            if not conn.exists(self.sid):
                conn.rpush(self.sid, '<start>')

    def _enqueue(self, id, message):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            conn.rpush(id, message)

    def _dequeue(self, id, limit=None):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            if limit:
                limit = limit + 1
                vals = conn.lrange(id, 1, limit)
                conn.ltrim(id, limit, -1)
            else:
                vals = conn.lrange(id, 1, -1)
                conn.ltrim(id, 0, 0)
            if not vals:
                return []
            return [val.decode('utf-8') for val in vals]

    def _peek(self, id):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            val = conn.lindex(id, 1)
            if val is None:
                return None
            return val.decode('utf-8')

    def _size(self, id):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            rsize = conn.llen(id)
            return rsize - 1  # empty : [ <start> ]

    def _clear(self, id):
        with self._lock:
            conn = redis.Redis.from_url(self.redis_url)
            conn.delete(id)

Multiprocessing

lazyllm.common.ForkProcess

Bases: Process

LazyLLM 提供的增强进程类,继承自 Python 标准库的 multiprocessing.Process。此类专门使用 fork 启动方法来创建子进程,并提供了同步/异步执行模式的支持。

Parameters:

  • group

    进程组,默认为 None

  • target

    要在进程中执行的函数,默认为 None

  • name

    进程名称,默认为 None

  • args

    传递给目标函数的参数元组,默认为 ()

  • kwargs

    传递给目标函数的关键字参数字典,默认为 {}

  • daemon

    是否为守护进程,默认为 None

  • sync

    是否为同步模式,默认为 True。在同步模式下,进程执行完目标函数后会自动退出;在异步模式下,进程会持续运行直到被手动终止。

注意: 此类主要用于 LazyLLM 内部的进程管理,特别是在需要长期运行的服务器进程中。

Examples:

>>> import lazyllm
>>> from lazyllm.common import ForkProcess
>>> import time
>>> import os
>>> def simple_task(task_id):
...     print(f"Process {os.getpid()} executing task {task_id}")
...     time.sleep(0.1)  
...     return f"Task {task_id} completed by process {os.getpid()}"
>>> process = ForkProcess(target=simple_task, args=(1,), sync=True)
>>> process.start()
Process 12345 executing task 1
Source code in lazyllm/common/multiprocessing.py
class ForkProcess(multiprocessing.Process):
    """LazyLLM 提供的增强进程类,继承自 Python 标准库的 `multiprocessing.Process`。此类专门使用 fork 启动方法来创建子进程,并提供了同步/异步执行模式的支持。

Args:
    group: 进程组,默认为 ``None``
    target: 要在进程中执行的函数,默认为 ``None``
    name: 进程名称,默认为 ``None``
    args: 传递给目标函数的参数元组,默认为 ``()``
    kwargs: 传递给目标函数的关键字参数字典,默认为 ``{}``
    daemon: 是否为守护进程,默认为 ``None``
    sync: 是否为同步模式,默认为 ``True``。在同步模式下,进程执行完目标函数后会自动退出;在异步模式下,进程会持续运行直到被手动终止。

**注意**: 此类主要用于 LazyLLM 内部的进程管理,特别是在需要长期运行的服务器进程中。


Examples:

    >>> import lazyllm
    >>> from lazyllm.common import ForkProcess
    >>> import time
    >>> import os
    >>> def simple_task(task_id):
    ...     print(f"Process {os.getpid()} executing task {task_id}")
    ...     time.sleep(0.1)  
    ...     return f"Task {task_id} completed by process {os.getpid()}"
    >>> process = ForkProcess(target=simple_task, args=(1,), sync=True)
    >>> process.start()
    Process 12345 executing task 1
    """
    def __init__(self, group=None, target=None, name=None, args=(),
                 kwargs=None, *, daemon=None, sync=True):
        super().__init__(group, ForkProcess.work(target, sync), name, args, kwargs or {}, daemon=daemon)

    @staticmethod
    def work(f, sync):
        """
ForkProcess 的核心工作方法,负责包装目标函数并处理同步/异步执行逻辑。

Args:
    f: 要执行的目标函数
    sync: 是否为同步模式。在同步模式下,执行完目标函数后进程会退出;在异步模式下,进程会持续运行。
"""
        def impl(*args, **kw):
            try:
                f(*args, **kw)
                if not sync:
                    while True: time.sleep(1)
            finally:
                atexit._run_exitfuncs()
        return impl

    def start(self):
        """
启动 ForkProcess 进程。此方法会使用 fork 启动方法来创建子进程,并开始执行目标函数。

此方法的特点:

- **Fork 启动**: 使用 fork 方法创建子进程,在 Unix/Linux 系统上提供更好的性能
- **上下文管理**: 自动管理进程启动方法的上下文,确保使用正确的启动方式
- **继承父类**: 继承自 `multiprocessing.Process.start()` 的所有功能

**注意**: 此方法会实际创建新的进程并开始执行,调用后进程会立即开始运行。

"""
        with _ctx('fork'):
            return super().start()

start()

启动 ForkProcess 进程。此方法会使用 fork 启动方法来创建子进程,并开始执行目标函数。

此方法的特点:

  • Fork 启动: 使用 fork 方法创建子进程,在 Unix/Linux 系统上提供更好的性能
  • 上下文管理: 自动管理进程启动方法的上下文,确保使用正确的启动方式
  • 继承父类: 继承自 multiprocessing.Process.start() 的所有功能

注意: 此方法会实际创建新的进程并开始执行,调用后进程会立即开始运行。

Source code in lazyllm/common/multiprocessing.py
    def start(self):
        """
启动 ForkProcess 进程。此方法会使用 fork 启动方法来创建子进程,并开始执行目标函数。

此方法的特点:

- **Fork 启动**: 使用 fork 方法创建子进程,在 Unix/Linux 系统上提供更好的性能
- **上下文管理**: 自动管理进程启动方法的上下文,确保使用正确的启动方式
- **继承父类**: 继承自 `multiprocessing.Process.start()` 的所有功能

**注意**: 此方法会实际创建新的进程并开始执行,调用后进程会立即开始运行。

"""
        with _ctx('fork'):
            return super().start()

work(f, sync) staticmethod

ForkProcess 的核心工作方法,负责包装目标函数并处理同步/异步执行逻辑。

Parameters:

  • f

    要执行的目标函数

  • sync

    是否为同步模式。在同步模式下,执行完目标函数后进程会退出;在异步模式下,进程会持续运行。

Source code in lazyllm/common/multiprocessing.py
    @staticmethod
    def work(f, sync):
        """
ForkProcess 的核心工作方法,负责包装目标函数并处理同步/异步执行逻辑。

Args:
    f: 要执行的目标函数
    sync: 是否为同步模式。在同步模式下,执行完目标函数后进程会退出;在异步模式下,进程会持续运行。
"""
        def impl(*args, **kw):
            try:
                f(*args, **kw)
                if not sync:
                    while True: time.sleep(1)
            finally:
                atexit._run_exitfuncs()
        return impl

Options

lazyllm.common.Option

Bases: object

LazyLLM 提供的选项管理类,用于管理多个选项值并在它们之间进行迭代。此类主要用于参数网格搜索和超参数调优场景。

Parameters:

  • *obj

    一个或多个选项值,可以是任意类型的对象。如果传入单个列表或元组,会自动展开。至少需要提供两个选项。

主要特性:

  • 多选项管理: 可以管理多个不同的选项值。
  • 迭代支持: 支持标准 Python 迭代协议,可以遍历所有选项。
  • 当前值访问: 可直接访问当前选中的选项值。
  • 深度复制: 支持获取当前选中选项值的深拷贝。
  • 循环迭代: 可在所有选项之间循环迭代。

注意: 此类主要用于 LazyLLM 内部的参数搜索和实验管理,尤其在 TrialModule 中进行参数网格搜索时。

Examples:

>>> import lazyllm
>>> from lazyllm.common.option import Option
>>> learning_rates = Option(0.001, 0.01, 0.1)
>>> print(f"当前学习率: {learning_rates}")
当前学习率: <Option options="(0.001, 0.01, 0.1)" curr="0.001">
>>> print(f"所有选项: {list(learning_rates)}")
所有选项: [0.001, 0.01, 0.1]
Source code in lazyllm/common/option.py
class Option(object):
    """
LazyLLM 提供的选项管理类,用于管理多个选项值并在它们之间进行迭代。此类主要用于参数网格搜索和超参数调优场景。

Args:
    *obj: 一个或多个选项值,可以是任意类型的对象。如果传入单个列表或元组,会自动展开。至少需要提供两个选项。

主要特性:

- **多选项管理**: 可以管理多个不同的选项值。
- **迭代支持**: 支持标准 Python 迭代协议,可以遍历所有选项。
- **当前值访问**: 可直接访问当前选中的选项值。
- **深度复制**: 支持获取当前选中选项值的深拷贝。
- **循环迭代**: 可在所有选项之间循环迭代。

**注意**: 此类主要用于 LazyLLM 内部的参数搜索和实验管理,尤其在 `TrialModule` 中进行参数网格搜索时。


Examples:
    >>> import lazyllm
    >>> from lazyllm.common.option import Option
    >>> learning_rates = Option(0.001, 0.01, 0.1)
    >>> print(f"当前学习率: {learning_rates}")
    当前学习率: <Option options="(0.001, 0.01, 0.1)" curr="0.001">
    >>> print(f"所有选项: {list(learning_rates)}")
    所有选项: [0.001, 0.01, 0.1]
    """
    def __init__(self, *obj):
        if len(obj) == 1 and isinstance(obj[0], (tuple, list)): obj = obj[0]
        assert isinstance(obj, (tuple, list)) and len(obj) > 1, 'More than one option shoule be given'
        self._objs = obj
        self._idx = 0
        self._obj = self._objs[self._idx]

    def _next(self):
        self._idx += 1
        if self._idx == len(self._objs):
            self._idx = 0
            raise StopIteration

    def __setattr__(self, __name: str, __value: Any) -> None:
        object.__setattr__(self, __name, __value)
        if __name == '_idx' and 0 <= self._idx < len(self._objs):
            self._obj = self._objs[self._idx]

    def __deepcopy__(self, *args, **kw):
        return copy.deepcopy(self._obj)

    def __iter__(self):
        return _OptionIterator(self)

    def __repr__(self):
        return f'<Option options="{self._objs}" curr="{self._obj}">'

DynamicDescriptor

lazyllm.common.DynamicDescriptor

动态描述符类,用于创建支持实例和类级别调用的描述符。

Parameters:

  • func (callable) –

    要包装的函数或方法

Source code in lazyllm/common/common.py
class DynamicDescriptor:
    """动态描述符类,用于创建支持实例和类级别调用的描述符。

Args:
    func (callable): 要包装的函数或方法
"""
    class Impl:
        def __init__(self, func, instance, owner):
            self._func, self._instance, self._owner = func, instance, owner

        def __call__(self, *args, **kw):
            return self._func(self._instance, *args, **kw) if self._instance else self._func(self._owner, *args, **kw)

        def __repr__(self): return repr(self._func)
        __doc__ = property(lambda self: self._func.__doc__)

        @__doc__.setter
        def __doc__(self, value): self._func.__doc__ = value

    def __init__(self, func):
        self.__func__ = func

    def __get__(self, instance, owner):
        return DynamicDescriptor.Impl(self.__func__, instance, owner)

lazyllm.common.CaseInsensitiveDict

Bases: dict

大小写不敏感的字典类。

CaseInsensitiveDict 继承自 dict,提供大小写不敏感的键值存储和检索功能。所有的键都会被转换为小写形式存储,确保无论使用大写、小写或混合大小写的键名都能访问到相同的值。

特点: - 所有键在存储时自动转换为小写 - 支持标准的字典操作(获取、设置、检查包含关系) - 保持字典的原有功能,只是键名处理方式不同

Parameters:

  • *args

    传递给父类 dict 的位置参数

  • **kwargs

    传递给父类 dict 的关键字参数

Examples:

>>> from lazyllm.common import CaseInsensitiveDict
>>> # 创建大小写不敏感的字典
>>> d = CaseInsensitiveDict({'Name': 'John', 'AGE': 25, 'City': 'New York'})
>>> 
>>> # 使用不同大小写访问相同的键
>>> print(d['name'])      # 使用小写
... 'John'
>>> print(d['NAME'])      # 使用大写
... 'John'
>>> print(d['Name'])      # 使用首字母大写
... 'John'
>>> 
>>> # 设置值时也会转换为小写
>>> d['EMAIL'] = 'john@example.com'
>>> print(d['email'])     # 使用小写访问
... 'john@example.com'
>>> 
>>> # 检查键是否存在(大小写不敏感)
>>> 'AGE' in d
True
>>> 'age' in d
True
>>> 'Age' in d
True
>>> 
>>> # 支持标准字典操作
>>> d['PHONE'] = '123-456-7890'
>>> print(d.get('phone'))
... '123-456-7890'
>>> print(len(d))
... 5
Source code in lazyllm/common/common.py
class CaseInsensitiveDict(dict):
    """大小写不敏感的字典类。

CaseInsensitiveDict 继承自 dict,提供大小写不敏感的键值存储和检索功能。所有的键都会被转换为小写形式存储,确保无论使用大写、小写或混合大小写的键名都能访问到相同的值。

特点:
    - 所有键在存储时自动转换为小写
    - 支持标准的字典操作(获取、设置、检查包含关系)
    - 保持字典的原有功能,只是键名处理方式不同

Args:
    *args: 传递给父类 dict 的位置参数
    **kwargs: 传递给父类 dict 的关键字参数


Examples:
    >>> from lazyllm.common import CaseInsensitiveDict
    >>> # 创建大小写不敏感的字典
    >>> d = CaseInsensitiveDict({'Name': 'John', 'AGE': 25, 'City': 'New York'})
    >>> 
    >>> # 使用不同大小写访问相同的键
    >>> print(d['name'])      # 使用小写
    ... 'John'
    >>> print(d['NAME'])      # 使用大写
    ... 'John'
    >>> print(d['Name'])      # 使用首字母大写
    ... 'John'
    >>> 
    >>> # 设置值时也会转换为小写
    >>> d['EMAIL'] = 'john@example.com'
    >>> print(d['email'])     # 使用小写访问
    ... 'john@example.com'
    >>> 
    >>> # 检查键是否存在(大小写不敏感)
    >>> 'AGE' in d
    True
    >>> 'age' in d
    True
    >>> 'Age' in d
    True
    >>> 
    >>> # 支持标准字典操作
    >>> d['PHONE'] = '123-456-7890'
    >>> print(d.get('phone'))
    ... '123-456-7890'
    >>> print(len(d))
    ... 5
    """
    def __init__(self, *args, **kwargs):
        super().__init__()
        for key, value in dict(*args, **kwargs).items():
            assert isinstance(key, str)
            self[key] = value

    def __getitem__(self, key):
        assert isinstance(key, str)
        return super().__getitem__(key.lower())

    def __setitem__(self, key, value):
        assert isinstance(key, str)
        super().__setitem__(key.lower(), value)

    def __contains__(self, key):
        assert isinstance(key, str)
        return super().__contains__(key.lower())

lazyllm.common.ProcessPoolExecutor

Bases: ProcessPoolExecutor

Source code in lazyllm/common/multiprocessing.py
class ProcessPoolExecutor(PPE):
    def submit(self, fn, /, *args, **kwargs):
        """
将任务提交到进程池中执行。

此方法将一个函数及其参数序列化后提交到进程池中执行,返回一个 `Future` 对象,用于获取任务执行结果或状态。

Args:
    fn (Callable): 要执行的函数。
    *args: 传递给函数的位置参数。
    **kwargs: 传递给函数的关键字参数。

**Returns:**

- concurrent.futures.Future: 表示任务执行状态的 `Future` 对象。


Examples:

    >>> from lazyllm.common.multiprocessing import ProcessPoolExecutor
    >>> import time
    >>> 
    >>> def task(x):
    ...     time.sleep(1)
    ...     return x * 2
    ... 
    >>> with ProcessPoolExecutor(max_workers=2) as executor:
    ...     future = executor.submit(task, 5)
    ...     result = future.result()
    ...     print(result)
    10
    """
        f = dump_obj(functools.partial(fn, *args, **kwargs))
        return super(__class__, self).submit(_worker, f)

submit(fn, /, *args, **kwargs)

将任务提交到进程池中执行。

此方法将一个函数及其参数序列化后提交到进程池中执行,返回一个 Future 对象,用于获取任务执行结果或状态。

Parameters:

  • fn (Callable) –

    要执行的函数。

  • *args

    传递给函数的位置参数。

  • **kwargs

    传递给函数的关键字参数。

Returns:

  • concurrent.futures.Future: 表示任务执行状态的 Future 对象。

Examples:

>>> from lazyllm.common.multiprocessing import ProcessPoolExecutor
>>> import time
>>> 
>>> def task(x):
...     time.sleep(1)
...     return x * 2
... 
>>> with ProcessPoolExecutor(max_workers=2) as executor:
...     future = executor.submit(task, 5)
...     result = future.result()
...     print(result)
10
Source code in lazyllm/common/multiprocessing.py
    def submit(self, fn, /, *args, **kwargs):
        """
将任务提交到进程池中执行。

此方法将一个函数及其参数序列化后提交到进程池中执行,返回一个 `Future` 对象,用于获取任务执行结果或状态。

Args:
    fn (Callable): 要执行的函数。
    *args: 传递给函数的位置参数。
    **kwargs: 传递给函数的关键字参数。

**Returns:**

- concurrent.futures.Future: 表示任务执行状态的 `Future` 对象。


Examples:

    >>> from lazyllm.common.multiprocessing import ProcessPoolExecutor
    >>> import time
    >>> 
    >>> def task(x):
    ...     time.sleep(1)
    ...     return x * 2
    ... 
    >>> with ProcessPoolExecutor(max_workers=2) as executor:
    ...     future = executor.submit(task, 5)
    ...     result = future.result()
    ...     print(result)
    10
    """
        f = dump_obj(functools.partial(fn, *args, **kwargs))
        return super(__class__, self).submit(_worker, f)

lazyllm.common.ArgsDict

Bases: dict

参数字典类,用于管理和验证命令行参数。

Parameters:

  • *args

    传递给父类dict的 positional arguments

  • **kwargs

    传递给父类dict的 keyword arguments

Returns:

  • ArgsDict实例,提供参数检查和格式化功能
Source code in lazyllm/common/common.py
class ArgsDict(dict):
    """参数字典类,用于管理和验证命令行参数。

Args:
    *args: 传递给父类dict的 positional arguments
    **kwargs: 传递给父类dict的 keyword arguments

**Returns:**

- ArgsDict实例,提供参数检查和格式化功能
"""
    def __init__(self, *args, with_line=True, **kwargs):
        super(ArgsDict, self).__init__(*args, **kwargs)
        self._with_line = with_line

    def check_and_update(self, kw):
        """检查并更新参数字典。

Args:
    kw (dict): 要更新的参数字典
"""
        if not kw.pop('skip_check', config['deploy_skip_check_kw']):
            assert set(kw.keys()).issubset(set(self)), f'unexpected keys: {set(kw.keys()) - set(self)}'
        self.update(kw)

    def parse_kwargs(self):
        """将参数字典解析为命令行参数字符串。
"""
        string = []
        for k, v in self.items():
            if type(v) is dict:
                v = json.dumps(v).replace('\"', '\\\"')
            if self._with_line:
                string.append(f'--{k}={v}' if type(v) is not str else f'--{k}=\"{v}\"')
            else:
                string.append(f'{k}={v}' if type(v) is not str else f'{k}=\"{v}\"')
        string = ' '.join(string)
        return string

check_and_update(kw)

检查并更新参数字典。

Parameters:

  • kw (dict) –

    要更新的参数字典

Source code in lazyllm/common/common.py
    def check_and_update(self, kw):
        """检查并更新参数字典。

Args:
    kw (dict): 要更新的参数字典
"""
        if not kw.pop('skip_check', config['deploy_skip_check_kw']):
            assert set(kw.keys()).issubset(set(self)), f'unexpected keys: {set(kw.keys()) - set(self)}'
        self.update(kw)

parse_kwargs()

将参数字典解析为命令行参数字符串。

Source code in lazyllm/common/common.py
    def parse_kwargs(self):
        """将参数字典解析为命令行参数字符串。
"""
        string = []
        for k, v in self.items():
            if type(v) is dict:
                v = json.dumps(v).replace('\"', '\\\"')
            if self._with_line:
                string.append(f'--{k}={v}' if type(v) is not str else f'--{k}=\"{v}\"')
            else:
                string.append(f'{k}={v}' if type(v) is not str else f'{k}=\"{v}\"')
        string = ' '.join(string)
        return string

Threading

lazyllm.common.Thread

Bases: Thread

LazyLLM 提供的增强线程类,继承自 Python 标准库的 threading.Thread。此类提供了额外的功能,包括会话ID管理、预钩子函数支持和异常处理机制。

Parameters:

  • group

    线程组,默认为 None

  • target

    要在线程中执行的函数,默认为 None

  • name

    线程名称,默认为 None

  • args

    传递给目标函数的参数元组,默认为 ()

  • kwargs

    传递给目标函数的关键字参数字典,默认为 None

  • prehook

    在线程执行前要调用的函数或函数列表,默认为 None

  • daemon

    是否为守护线程,默认为 None

Examples:

>>> import lazyllm
>>> from lazyllm.common.threading import Thread
>>> import time
>>> def simple_task(name):
...     time.sleep(0.1)
...     return f"Hello from {name}"
>>> thread = Thread(target=simple_task, args=("Worker",))
>>> thread.start()
>>> result = thread.get_result()
>>> print(result)
Hello from Worker
>>> def setup_environment():
...     print("Setting up environment...")
...     return "environment_ready"
>>> def validate_input(data):
...     print(f"Validating input: {data}")
...     if not isinstance(data, (int, float)):
...         raise ValueError("Input must be numeric")
>>> def process_data(data):
...     print(f"Processing data: {data}")
...     time.sleep(0.1) 
...     return data * 2
>>> thread = Thread(
...     target=process_data,
...     args=(42,),
...     prehook=[setup_environment, lambda: validate_input(42)]
... )
>>> thread.start()
Setting up environment...
Validating input: 42
Processing data: 42
>>> result = thread.get_result()
>>> print(f"Final result: {result}")
Final result: 84
Source code in lazyllm/common/threading.py
class Thread(threading.Thread):
    """LazyLLM 提供的增强线程类,继承自 Python 标准库的 `threading.Thread`。此类提供了额外的功能,包括会话ID管理、预钩子函数支持和异常处理机制。

Args:
    group: 线程组,默认为 ``None``
    target: 要在线程中执行的函数,默认为 ``None``
    name: 线程名称,默认为 ``None``
    args: 传递给目标函数的参数元组,默认为 ``()``
    kwargs: 传递给目标函数的关键字参数字典,默认为 ``None``
    prehook: 在线程执行前要调用的函数或函数列表,默认为 ``None``
    daemon: 是否为守护线程,默认为 ``None``


Examples:
    >>> import lazyllm
    >>> from lazyllm.common.threading import Thread
    >>> import time
    >>> def simple_task(name):
    ...     time.sleep(0.1)
    ...     return f"Hello from {name}"
    >>> thread = Thread(target=simple_task, args=("Worker",))
    >>> thread.start()
    >>> result = thread.get_result()
    >>> print(result)
    Hello from Worker
    >>> def setup_environment():
    ...     print("Setting up environment...")
    ...     return "environment_ready"
    >>> def validate_input(data):
    ...     print(f"Validating input: {data}")
    ...     if not isinstance(data, (int, float)):
    ...         raise ValueError("Input must be numeric")
    >>> def process_data(data):
    ...     print(f"Processing data: {data}")
    ...     time.sleep(0.1) 
    ...     return data * 2
    >>> thread = Thread(
    ...     target=process_data,
    ...     args=(42,),
    ...     prehook=[setup_environment, lambda: validate_input(42)]
    ... )
    >>> thread.start()
    Setting up environment...
    Validating input: 42
    Processing data: 42
    >>> result = thread.get_result()
    >>> print(f"Final result: {result}")
    Final result: 84
    """
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, prehook=None, daemon=None):
        self.q = Queue()
        if not isinstance(prehook, (tuple, list)): prehook = [prehook] if prehook else []
        prehook.insert(0, functools.partial(_sid_setter, sid=globals._sid))
        super().__init__(group, self.work, name, (prehook, target, args), kwargs, daemon=daemon)

    def work(self, prehook, target, args, **kw):
        """线程的核心工作方法,负责执行预钩子函数、目标函数,并处理异常和结果。

Args:
    prehook: 预钩子函数列表,在线程执行前调用
    target: 要执行的目标函数
    args: 传递给目标函数的参数
    **kw: 传递给目标函数的关键字参数

**注意**: 此方法由 `Thread` 类内部调用,用户通常不需要直接调用此方法。
"""
        [p() for p in prehook]
        try:
            r = target(*args, **kw)
        except Exception as e:
            self.q.put(e)
        else:
            self.q.put(r)

    def get_result(self):
        """获取线程执行结果的方法。此方法会阻塞直到线程执行完成,然后返回执行结果或重新抛出异常。

**Returns:**

- 线程执行的结果。如果目标函数正常执行,返回其返回值;如果发生异常,会重新抛出该异常。

**注意**: 此方法应该在调用 `thread.start()` 之后使用,用于获取线程的执行结果。
"""
        r = self.q.get()
        if isinstance(r, Exception):
            raise r
        return r

get_result()

获取线程执行结果的方法。此方法会阻塞直到线程执行完成,然后返回执行结果或重新抛出异常。

Returns:

  • 线程执行的结果。如果目标函数正常执行,返回其返回值;如果发生异常,会重新抛出该异常。

注意: 此方法应该在调用 thread.start() 之后使用,用于获取线程的执行结果。

Source code in lazyllm/common/threading.py
    def get_result(self):
        """获取线程执行结果的方法。此方法会阻塞直到线程执行完成,然后返回执行结果或重新抛出异常。

**Returns:**

- 线程执行的结果。如果目标函数正常执行,返回其返回值;如果发生异常,会重新抛出该异常。

**注意**: 此方法应该在调用 `thread.start()` 之后使用,用于获取线程的执行结果。
"""
        r = self.q.get()
        if isinstance(r, Exception):
            raise r
        return r

work(prehook, target, args, **kw)

线程的核心工作方法,负责执行预钩子函数、目标函数,并处理异常和结果。

Parameters:

  • prehook

    预钩子函数列表,在线程执行前调用

  • target

    要执行的目标函数

  • args

    传递给目标函数的参数

  • **kw

    传递给目标函数的关键字参数

注意: 此方法由 Thread 类内部调用,用户通常不需要直接调用此方法。

Source code in lazyllm/common/threading.py
    def work(self, prehook, target, args, **kw):
        """线程的核心工作方法,负责执行预钩子函数、目标函数,并处理异常和结果。

Args:
    prehook: 预钩子函数列表,在线程执行前调用
    target: 要执行的目标函数
    args: 传递给目标函数的参数
    **kw: 传递给目标函数的关键字参数

**注意**: 此方法由 `Thread` 类内部调用,用户通常不需要直接调用此方法。
"""
        [p() for p in prehook]
        try:
            r = target(*args, **kw)
        except Exception as e:
            self.q.put(e)
        else:
            self.q.put(r)

LazyLLMCMD

lazyllm.common.LazyLLMCMD

Bases: object

命令行操作封装类,提供安全、灵活的命令行管理功能。

Parameters:

  • cmd (Union[str, List[str], Callable]) –

    命令行指令,支持三种形式:字符串命令,命令列表,可调用对象。

  • return_value (Any, default: None ) –

    预设返回值。

  • checkf (Any, default: lambda *a: True ) –

    命令验证函数。

  • no_displays (Any, default: None ) –

    需要过滤的敏感参数名。

Examples:

>>> from lazyllm.common import LazyLLMCMD
>>> cmd = LazyLLMCMD("run --epochs=50 --batch-size=32")
>>> print(cmd.get_args("epochs"))
50
>>> print(cmd.get_args("batch-size")) 
32
>>> base = LazyLLMCMD("python train.py", checkf=lambda x: True)
>>> new = base.with_cmd("python predict.py")
Source code in lazyllm/common/common.py
class LazyLLMCMD(object):
    """命令行操作封装类,提供安全、灵活的命令行管理功能。

Args:
    cmd (Union[str, List[str], Callable]):命令行指令,支持三种形式:字符串命令,命令列表,可调用对象。
    return_value (Any):预设返回值。
    checkf(Any):命令验证函数。
    no_displays(Any):需要过滤的敏感参数名。


Examples:
    >>> from lazyllm.common import LazyLLMCMD
    >>> cmd = LazyLLMCMD("run --epochs=50 --batch-size=32")
    >>> print(cmd.get_args("epochs"))
    50
    >>> print(cmd.get_args("batch-size")) 
    32
    >>> base = LazyLLMCMD("python train.py", checkf=lambda x: True)
    >>> new = base.with_cmd("python predict.py")

    """
    def __init__(self, cmd, *, return_value=None, checkf=(lambda *a: True), no_displays=None):
        if isinstance(cmd, (tuple, list)):
            cmd = ' && '.join(cmd)
        assert isinstance(cmd, str) or callable(cmd), 'cmd must be func or (list of) bash command str.'
        self.cmd = cmd
        self.return_value = return_value
        self.checkf = checkf
        self.no_displays = no_displays

    def __hash__(self):
        return hash(self.cmd)

    def __str__(self):
        assert not callable(self.cmd), f'Cannot convert cmd function {self.cmd} to str'
        cmd = re.sub(r'\b(LAZYLLM_[A-Z0-9_]*?_(?:API|SECRET)_KEY)=\S+', r'\1=xxxxxx', self.cmd)
        if self.no_displays:
            for item in self.no_displays:
                pattern = r'(-{1,2}' + re.escape(item) + r')(\s|=|)(\S+|)'
                cmd = re.sub(pattern, '', cmd)
            return cmd
        else:
            return cmd

    def with_cmd(self, cmd):
        """创建新命令对象并继承当前配置。

Args:
    cmd: 新的命令内容(类型需与原始命令一致)

"""
        # Attention: Cannot use copy.deepcopy because of class method.
        new_instance = LazyLLMCMD(cmd, return_value=self.return_value,
                                  checkf=self.checkf, no_displays=self.no_displays)
        return new_instance

    def get_args(self, key):
        """从命令字符串中提取指定参数的值。

Args:
    key: 要提取的参数名
"""
        assert not callable(self.cmd), f'Cannot get args from function {self.cmd}'
        pattern = r'*(-{1,2}' + re.escape(key) + r')(\s|=|)(\S+|)*'
        return re.match(pattern, self.cmd)[3]

get_args(key)

从命令字符串中提取指定参数的值。

Parameters:

  • key

    要提取的参数名

Source code in lazyllm/common/common.py
    def get_args(self, key):
        """从命令字符串中提取指定参数的值。

Args:
    key: 要提取的参数名
"""
        assert not callable(self.cmd), f'Cannot get args from function {self.cmd}'
        pattern = r'*(-{1,2}' + re.escape(key) + r')(\s|=|)(\S+|)*'
        return re.match(pattern, self.cmd)[3]

with_cmd(cmd)

创建新命令对象并继承当前配置。

Parameters:

  • cmd

    新的命令内容(类型需与原始命令一致)

Source code in lazyllm/common/common.py
    def with_cmd(self, cmd):
        """创建新命令对象并继承当前配置。

Args:
    cmd: 新的命令内容(类型需与原始命令一致)

"""
        # Attention: Cannot use copy.deepcopy because of class method.
        new_instance = LazyLLMCMD(cmd, return_value=self.return_value,
                                  checkf=self.checkf, no_displays=self.no_displays)
        return new_instance

lazyllm.common.utils.SecurityVisitor

Bases: NodeVisitor

AST-based security analyzer to detect unsafe operations in Python code.

IMPORTANT: Method names within this class (e.g., visit_Call, visit_Import) should not be renamed to lowercase. These method names are part of the NodeVisitor pattern from the ast module and must remain consistant with this naming convention to function correctly.

Source code in lazyllm/common/utils.py
class SecurityVisitor(ast.NodeVisitor):  # noqa C901
    """
    AST-based security analyzer to detect unsafe operations in Python code.

    IMPORTANT: Method names within this class (e.g., `visit_Call`, `visit_Import`) **should not**
    be renamed to lowercase. These method names are part of the `NodeVisitor` pattern from the `ast`
    module and must remain consistant with this naming convention to function correctly.
    """

    # **Dangerous built-in functions**
    DANGEROUS_BUILTINS = {'exec', 'eval', 'open', 'compile', 'getattr',
                          'setattr', '__import__', 'globals', 'locals', 'vars'}

    # **Dangerous os operations**
    DANGEROUS_OS_CALLS = {'system', 'popen', 'remove', 'rmdir', 'unlink', 'rename'}

    # **Dangerous sys operations**
    DANGEROUS_SYS_CALLS = {'exit', 'modules'}

    # **Dangerous modules**
    DANGEROUS_MODULES = {'pickle', 'subprocess', 'socket', 'shutil', 'requests', 'inspect', 'tempfile'}

    def visit_Call(self, node):  # noqa C901
        """Check function calls"""
        # Direct calls to dangerous built-in functions
        if isinstance(node.func, ast.Name) and node.func.id in self.DANGEROUS_BUILTINS:
            raise ValueError(f'⚠️ Detected dangerous function call: {node.func.id}')

        # Check for __import__ calls with string arguments
        if isinstance(node.func, ast.Name) and node.func.id == '__import__':
            if node.args and isinstance(node.args[0], ast.Str):
                module_name = node.args[0].s
                if module_name in self.DANGEROUS_MODULES:
                    raise ValueError(f'⚠️ Detected dangerous module import via __import__: {module_name}')

        # Check for indirect __import__ calls (function calls that might return __import__)
        if isinstance(node.func, ast.Call):
            # Check if this is a call to a function that might return __import__
            if isinstance(node.func.func, ast.Name):
                func_name = node.func.func.id
                if func_name in ['get_import', 'import_func']:  # Common patterns
                    raise ValueError(f'⚠️ Detected suspicious function call that might return __import__: {func_name}')

        # Check for attribute access that might lead to __import__
        if isinstance(node.func, ast.Attribute):
            if isinstance(node.func.value, ast.Name) and node.func.value.id == 'ImportHelper':
                if node.func.attr == 'get_import':
                    raise ValueError('⚠️ Detected suspicious method call: ImportHelper.get_import')

        # os / sys related calls
        if isinstance(node.func, ast.Attribute) and isinstance(node.func.value, ast.Name):
            if node.func.value.id == 'os' and node.func.attr in self.DANGEROUS_OS_CALLS:
                raise ValueError(f'⚠️ Detected dangerous os call: os.{node.func.attr}')
            if node.func.value.id == 'sys' and node.func.attr in self.DANGEROUS_SYS_CALLS:
                raise ValueError(f'⚠️ Detected dangerous sys call: sys.{node.func.attr}')

        self.generic_visit(node)

    def visit_Import(self, node):
        """Check import statements"""
        for alias in node.names:
            if alias.name in self.DANGEROUS_MODULES:
                raise ValueError(f'⚠️ Detected dangerous module import: {alias.name}')

    def visit_ImportFrom(self, node):
        """Check from ... import statements"""
        if node.module in self.DANGEROUS_MODULES:
            raise ValueError(f'⚠️ Detected dangerous module import: {node.module}')

    def visit_Attribute(self, node):
        """Check os.environ and tempfile usage"""
        if isinstance(node.value, ast.Name):
            if node.value.id == 'os' and node.attr == 'environ':
                raise ValueError('⚠️ Detected dangerous access: os.environ')
            if node.value.id == 'tempfile':
                raise ValueError(f'⚠️ Detected dangerous usage of tempfile: tempfile.{node.attr}')

        self.generic_visit(node)

    def visit_Lambda(self, node):
        """Check lambda functions that might return __import__"""
        # Check if lambda body returns __import__
        if isinstance(node.body, ast.Name) and node.body.id == '__import__':
            raise ValueError('⚠️ Detected lambda function returning __import__')
        self.generic_visit(node)

    def visit_ListComp(self, node):
        """Check list comprehensions that might contain __import__"""
        # Check if the expression in list comprehension is __import__
        if isinstance(node.elt, ast.Name) and node.elt.id == '__import__':
            raise ValueError('⚠️ Detected list comprehension containing __import__')
        self.generic_visit(node)

    def visit_FunctionDef(self, node):
        """Check function definitions that might return __import__"""
        # Check if function returns __import__
        for stmt in node.body:
            if isinstance(stmt, ast.Return):
                if isinstance(stmt.value, ast.Name) and stmt.value.id == '__import__':
                    raise ValueError(f'⚠️ Detected function {node.name} returning __import__')
        self.generic_visit(node)

visit_Attribute(node)

Check os.environ and tempfile usage

Source code in lazyllm/common/utils.py
def visit_Attribute(self, node):
    """Check os.environ and tempfile usage"""
    if isinstance(node.value, ast.Name):
        if node.value.id == 'os' and node.attr == 'environ':
            raise ValueError('⚠️ Detected dangerous access: os.environ')
        if node.value.id == 'tempfile':
            raise ValueError(f'⚠️ Detected dangerous usage of tempfile: tempfile.{node.attr}')

    self.generic_visit(node)

visit_Call(node)

Check function calls

Source code in lazyllm/common/utils.py
def visit_Call(self, node):  # noqa C901
    """Check function calls"""
    # Direct calls to dangerous built-in functions
    if isinstance(node.func, ast.Name) and node.func.id in self.DANGEROUS_BUILTINS:
        raise ValueError(f'⚠️ Detected dangerous function call: {node.func.id}')

    # Check for __import__ calls with string arguments
    if isinstance(node.func, ast.Name) and node.func.id == '__import__':
        if node.args and isinstance(node.args[0], ast.Str):
            module_name = node.args[0].s
            if module_name in self.DANGEROUS_MODULES:
                raise ValueError(f'⚠️ Detected dangerous module import via __import__: {module_name}')

    # Check for indirect __import__ calls (function calls that might return __import__)
    if isinstance(node.func, ast.Call):
        # Check if this is a call to a function that might return __import__
        if isinstance(node.func.func, ast.Name):
            func_name = node.func.func.id
            if func_name in ['get_import', 'import_func']:  # Common patterns
                raise ValueError(f'⚠️ Detected suspicious function call that might return __import__: {func_name}')

    # Check for attribute access that might lead to __import__
    if isinstance(node.func, ast.Attribute):
        if isinstance(node.func.value, ast.Name) and node.func.value.id == 'ImportHelper':
            if node.func.attr == 'get_import':
                raise ValueError('⚠️ Detected suspicious method call: ImportHelper.get_import')

    # os / sys related calls
    if isinstance(node.func, ast.Attribute) and isinstance(node.func.value, ast.Name):
        if node.func.value.id == 'os' and node.func.attr in self.DANGEROUS_OS_CALLS:
            raise ValueError(f'⚠️ Detected dangerous os call: os.{node.func.attr}')
        if node.func.value.id == 'sys' and node.func.attr in self.DANGEROUS_SYS_CALLS:
            raise ValueError(f'⚠️ Detected dangerous sys call: sys.{node.func.attr}')

    self.generic_visit(node)

visit_FunctionDef(node)

Check function definitions that might return import

Source code in lazyllm/common/utils.py
def visit_FunctionDef(self, node):
    """Check function definitions that might return __import__"""
    # Check if function returns __import__
    for stmt in node.body:
        if isinstance(stmt, ast.Return):
            if isinstance(stmt.value, ast.Name) and stmt.value.id == '__import__':
                raise ValueError(f'⚠️ Detected function {node.name} returning __import__')
    self.generic_visit(node)

visit_Import(node)

Check import statements

Source code in lazyllm/common/utils.py
def visit_Import(self, node):
    """Check import statements"""
    for alias in node.names:
        if alias.name in self.DANGEROUS_MODULES:
            raise ValueError(f'⚠️ Detected dangerous module import: {alias.name}')

visit_ImportFrom(node)

Check from ... import statements

Source code in lazyllm/common/utils.py
def visit_ImportFrom(self, node):
    """Check from ... import statements"""
    if node.module in self.DANGEROUS_MODULES:
        raise ValueError(f'⚠️ Detected dangerous module import: {node.module}')

visit_Lambda(node)

Check lambda functions that might return import

Source code in lazyllm/common/utils.py
def visit_Lambda(self, node):
    """Check lambda functions that might return __import__"""
    # Check if lambda body returns __import__
    if isinstance(node.body, ast.Name) and node.body.id == '__import__':
        raise ValueError('⚠️ Detected lambda function returning __import__')
    self.generic_visit(node)

visit_ListComp(node)

Check list comprehensions that might contain import

Source code in lazyllm/common/utils.py
def visit_ListComp(self, node):
    """Check list comprehensions that might contain __import__"""
    # Check if the expression in list comprehension is __import__
    if isinstance(node.elt, ast.Name) and node.elt.id == '__import__':
        raise ValueError('⚠️ Detected list comprehension containing __import__')
    self.generic_visit(node)

lazyllm.common.common.Finalizer

Bases: object

终结器类,用于管理资源的清理和释放操作。可以作为上下文管理器使用,或通过对象销毁时自动触发清理。

Parameters:

  • func1 (Callable) –

    主要的清理函数。如果提供了func2,则func1会立即执行,func2作为清理函数。

  • func2 (Optional[Callable], default: None ) –

    可选的清理函数,默认为None。

  • condition (Callable, default: lambda: True ) –

    条件函数,返回True时才执行清理函数,默认总是返回True。

用途: 1. 可以作为上下文管理器使用(with语句) 2. 可以通过对象销毁时自动触发清理 3. 支持条件性清理 4. 支持两阶段初始化和清理

注意: - 当提供func2时,func1会在初始化时立即执行 - 清理函数只会执行一次 - 清理操作会在对象销毁或退出上下文时执行

Source code in lazyllm/common/common.py
class Finalizer(object):
    """终结器类,用于管理资源的清理和释放操作。可以作为上下文管理器使用,或通过对象销毁时自动触发清理。

Args:
    func1 (Callable): 主要的清理函数。如果提供了func2,则func1会立即执行,func2作为清理函数。
    func2 (Optional[Callable]): 可选的清理函数,默认为None。
    condition (Callable): 条件函数,返回True时才执行清理函数,默认总是返回True。

用途:
1. 可以作为上下文管理器使用(with语句)
2. 可以通过对象销毁时自动触发清理
3. 支持条件性清理
4. 支持两阶段初始化和清理

注意:
    - 当提供func2时,func1会在初始化时立即执行
    - 清理函数只会执行一次
    - 清理操作会在对象销毁或退出上下文时执行
"""
    def __init__(self, func1: Callable, func2: Optional[Callable] = None, *, condition: Callable = lambda: True):
        if func2:
            func1()
            func1 = func2
        self._func = func1
        self._condition = condition

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.__del__()

    def __del__(self):
        if self._func:
            if self._condition(): self._func()
            self._func = None

lazyllm.common.FlatList.absorb(item)

添加元素到列表中。

Parameters:

  • item

    要添加的元素,可以是单个元素或列表

Source code in lazyllm/common/common.py
    def absorb(self, item):
        """添加元素到列表中。

Args:
    item: 要添加的元素,可以是单个元素或列表
"""
        if isinstance(item, list):
            self.extend(item)
        elif item is not None:
            self.append(item)