Skip to content

Flow

lazyllm.flow.FlowBase

A base class for creating flow-like structures that can contain various items.

This class provides a way to organize items, which can be instances of FlowBase or other types, into a hierarchical structure. Each item can have a name and the structure can be traversed or modified dynamically.

Parameters:

  • items (iterable, default: () ) –

    An iterable of items to be included in the flow. These can be instances of FlowBase or other objects.

  • item_names (list of str, default: [] ) –

    A list of names corresponding to the items. This allows items to be accessed by name. If not provided, items can only be accessed by index.

Source code in lazyllm/flow/flow.py
class FlowBase(metaclass=_MetaBind):
    """A base class for creating flow-like structures that can contain various items.

This class provides a way to organize items, which can be instances of ``FlowBase`` or other types, into a hierarchical structure. Each item can have a name and the structure can be traversed or modified dynamically.

Args:
    items (iterable): An iterable of items to be included in the flow. These can be instances of ``FlowBase`` or other objects.
    item_names (list of str, optional): A list of names corresponding to the items. This allows items to be accessed by name. If not provided, items can only be accessed by index.

"""
    def __init__(self, *items, item_names=[], auto_capture=False) -> None:
        self._father = None
        self._items, self._item_names, self._item_ids = [], [], []
        self._auto_capture = auto_capture
        self._capture = True
        self._curr_frame = None
        self._flow_id = str(uuid.uuid4().hex)

        for k, v in zip(item_names if item_names else [None] * len(items), items):
            self._add(k, v)

        self._capture = False

    def __post_init__(self): pass

    def _add(self, k, v):
        assert self._capture, f'_add can only be used in `{self.__class__}.__init__` or `with {self.__class__}()`'
        self._items.append(v() if isinstance(v, type) else _FuncWrap(v) if _is_function(v) or v in self._items else v)
        self._item_ids.append(k or str(uuid.uuid4().hex))
        if isinstance(v, FlowBase): v._father = self
        if k:
            assert k not in self._item_names, f'Duplicated names {k}'
            self._item_names.append(k)
        if self._curr_frame and isinstance(v, FlowBase):
            if k:
                if k not in self._curr_frame.f_locals:
                    self._curr_frame.f_locals[k] = v
                else:
                    lazyllm.LOG.warning(f'{k} is already defined in this scope, ignor it')

    def __enter__(self, __frame=None):
        assert len(self._items) == 0, f'Cannot init {self.__class__} with items if you want to use it by context.'
        self._curr_frame = __frame if __frame else inspect.currentframe().f_back
        if self._auto_capture:
            self._frame_keys = list(self._curr_frame.f_locals.keys())
        self._capture = True
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self._auto_capture:
            locals = self._curr_frame.f_locals.copy()
            for var, val in locals.items():
                if var != 'self' and var not in self._frame_keys and (
                        (val._f if isinstance(val, bind) else val) is not self):
                    self._add(var, val)
        self._capture = False
        self._curr_frame = None
        self.__post_init__()
        return False

    def __setattr__(self, name: str, value):
        if '_capture' in self.__dict__ and self._capture and not name.startswith('_'):
            assert name not in self._item_names, f'Duplicated name: {name}'
            self._add(name, value)
        else:
            super(__class__, self).__setattr__(name, value)

    def __getattr__(self, name):
        if '_item_names' in self.__dict__ and name in self._item_names:
            return self._items[self._item_names.index(name)]
        raise AttributeError(f'{self.__class__} object has no attribute {name}')

    def id(self, module=None):
        if isinstance(module, str): return module
        return self._item_ids[self._items.index(module)] if module else self._flow_id

    @property
    def is_root(self):
        """A property that indicates whether the current flow item is the root of the flow structure.

**Returns:**

- bool: True if the current item has no parent (`` _father`` is None), otherwise False.


Examples:
    >>> import lazyllm
    >>> p = lazyllm.pipeline()
    >>> p.is_root
    True
    >>> p2 = lazyllm.pipeline(p)
    >>> p.is_root
    False
    >>> p2.is_root
    True
    """
        return self._father is None

    @property
    def ancestor(self):
        """A property that returns the topmost ancestor of the current flow item.

If the current item is the root, it returns itself.

**Returns:**

- FlowBase: The topmost ancestor flow item.


Examples:
    >>> import lazyllm
    >>> p = lazyllm.pipeline()
    >>> p2 = lazyllm.pipeline(p)
    >>> p.ancestor is p2
    True
    """
        if self.is_root: return self
        return self._father.ancestor

    def for_each(self, filter, action):
        """Performs an action on each item in the flow that matches a given filter.

The method recursively traverses the flow structure, applying the action to each item that passes the filter.

Args:
    filter (callable): A function that takes an item as input and returns True if the item should have the action applied.
    action (callable): A function that takes an item as input and performs some operation on it.

**Returns:**

- None


Examples:
    >>> import lazyllm
    >>> def test1(): print('1')
    ... 
    >>> def test2(): print('2')
    ... 
    >>> def test3(): print('3')
    ... 
    >>> flow = lazyllm.pipeline(test1, lazyllm.pipeline(test2, test3))
    >>> flow.for_each(lambda x: callable(x), lambda x: print(x))
    <Function type=test1>
    <Function type=test2>
    <Function type=test3>
    """
        for item in self._items:
            if isinstance(item, FlowBase):
                item.for_each(filter, action)
            elif filter(item):
                action(item)

ancestor property

A property that returns the topmost ancestor of the current flow item.

If the current item is the root, it returns itself.

Returns:

  • FlowBase: The topmost ancestor flow item.

Examples:

>>> import lazyllm
>>> p = lazyllm.pipeline()
>>> p2 = lazyllm.pipeline(p)
>>> p.ancestor is p2
True

is_root property

A property that indicates whether the current flow item is the root of the flow structure.

Returns:

  • bool: True if the current item has no parent (_father is None), otherwise False.

Examples:

>>> import lazyllm
>>> p = lazyllm.pipeline()
>>> p.is_root
True
>>> p2 = lazyllm.pipeline(p)
>>> p.is_root
False
>>> p2.is_root
True

for_each(filter, action)

Performs an action on each item in the flow that matches a given filter.

The method recursively traverses the flow structure, applying the action to each item that passes the filter.

Parameters:

  • filter (callable) –

    A function that takes an item as input and returns True if the item should have the action applied.

  • action (callable) –

    A function that takes an item as input and performs some operation on it.

Returns:

  • None

Examples:

>>> import lazyllm
>>> def test1(): print('1')
... 
>>> def test2(): print('2')
... 
>>> def test3(): print('3')
... 
>>> flow = lazyllm.pipeline(test1, lazyllm.pipeline(test2, test3))
>>> flow.for_each(lambda x: callable(x), lambda x: print(x))
<Function type=test1>
<Function type=test2>
<Function type=test3>
Source code in lazyllm/flow/flow.py
    def for_each(self, filter, action):
        """Performs an action on each item in the flow that matches a given filter.

The method recursively traverses the flow structure, applying the action to each item that passes the filter.

Args:
    filter (callable): A function that takes an item as input and returns True if the item should have the action applied.
    action (callable): A function that takes an item as input and performs some operation on it.

**Returns:**

- None


Examples:
    >>> import lazyllm
    >>> def test1(): print('1')
    ... 
    >>> def test2(): print('2')
    ... 
    >>> def test3(): print('3')
    ... 
    >>> flow = lazyllm.pipeline(test1, lazyllm.pipeline(test2, test3))
    >>> flow.for_each(lambda x: callable(x), lambda x: print(x))
    <Function type=test1>
    <Function type=test2>
    <Function type=test3>
    """
        for item in self._items:
            if isinstance(item, FlowBase):
                item.for_each(filter, action)
            elif filter(item):
                action(item)

lazyllm.flow.Pipeline

Bases: LazyLLMFlowsBase

A sequential execution model that forms a pipeline of processing stages.

The Pipeline class is a linear sequence of processing stages, where the output of one stage becomes the input to the next. It supports the addition of post-actions that can be performed after the last stage. It is a subclass of LazyLLMFlowsBase which provides a lazy execution model and allows for functions to be wrapped and registered in a lazy manner.

Parameters:

  • args (list of callables or single callable, default: () ) –

    The processing stages of the pipeline. Each element can be a callable function or an instance of LazyLLMFlowsBase.FuncWrap. If a single list or tuple is provided, it is unpacked as the stages of the pipeline.

  • post_action (callable, default: None ) –

    An optional action to perform after the last stage of the pipeline. Defaults to None.

  • kwargs (dict of callables) –

    Named processing stages of the pipeline. Each key-value pair represents a named stage, where the key is the name and the value is the callable stage.

Returns:

  • The output of the last stage of the pipeline.

Examples:

>>> import lazyllm
>>> ppl = lazyllm.pipeline(
...     stage1=lambda x: x+1,
...     stage2=lambda x: f'get {x}'
... )
>>> ppl(1)
'get 2'
>>> ppl.stage2
<Function type=lambda>
Source code in lazyllm/flow/flow.py
class Pipeline(LazyLLMFlowsBase):
    """A sequential execution model that forms a pipeline of processing stages.

The ``Pipeline`` class is a linear sequence of processing stages, where the output of one stage becomes the input to the next. It supports the addition of post-actions that can be performed after the last stage. It is a subclass of ``LazyLLMFlowsBase`` which provides a lazy execution model and allows for functions to be wrapped and registered in a lazy manner.

Args:
    args (list of callables or single callable): The processing stages of the pipeline. Each element can be a callable function or an instance of ``LazyLLMFlowsBase.FuncWrap``. If a single list or tuple is provided, it is unpacked as the stages of the pipeline.
    post_action (callable, optional): An optional action to perform after the last stage of the pipeline. Defaults to None.
    kwargs (dict of callables): Named processing stages of the pipeline. Each key-value pair represents a named stage, where the key is the name and the value is the callable stage.

**Returns:**

- The output of the last stage of the pipeline.


Examples:
    >>> import lazyllm
    >>> ppl = lazyllm.pipeline(
    ...     stage1=lambda x: x+1,
    ...     stage2=lambda x: f'get {x}'
    ... )
    >>> ppl(1)
    'get 2'
    >>> ppl.stage2
    <Function type=lambda>
    """
    g_save_flow_result = None

    def __init__(self, *args, post_action=None, auto_capture=False, **kw):
        super().__init__(*args, post_action=post_action, auto_capture=auto_capture, **kw)
        self.save_flow_result = __class__.g_save_flow_result

    @property
    def _loop_count(self):
        return getattr(self, '_loop_count_var', 1)

    @_loop_count.setter
    def _loop_count(self, count):
        assert count > 1, 'At least one loop is required!'
        self._loop_count_var = count

    @property
    def _stop_condition(self):
        return getattr(self, '_stop_condition_var', None)

    @_stop_condition.setter
    def _stop_condition(self, cond):
        self._stop_condition_var = cond

    @property
    def _judge_on_full_input(self):
        return getattr(self, '_judge_on_full_input_var', True)

    @_judge_on_full_input.setter
    def _judge_on_full_input(self, judge):
        self._judge_on_full_input_var = judge

    @property
    def input(self): return bind.Args(self.id())
    @property
    def kwargs(self): return bind.Args(self.id(), 'kwargs')
    def output(self, module): return bind.Args(self.id(), self.id(module))

    def _run(self, __input, **kw):
        output = __input
        bind_args_source = dict(source=self.id(), input=output, kwargs=kw.copy())
        if config['save_flow_result'] or __class__.g_save_flow_result or (
                self.save_flow_result and __class__.g_save_flow_result is not False):
            globals['bind_args'][self.id()] = bind_args_source
        for _ in range(self._loop_count):
            for it in self._items:
                output = self.invoke(it, output, bind_args_source=bind_args_source, **kw)
                kw.clear()
                bind_args_source[self.id(it)] = output
            exp = output
            if not self._judge_on_full_input:
                assert isinstance(output, tuple) and len(output) >= 2
                exp = output[0]
                output = output[1:]
            if callable(self._stop_condition) and self.invoke(self._stop_condition, exp): break
        globals['bind_args'].pop(self.id(), None)
        return output

lazyllm.flow.Parallel

Bases: LazyLLMFlowsBase

A class for managing parallel flows in LazyLLMFlows.

This class inherits from LazyLLMFlowsBase and provides an interface for running operations in parallel or sequentially. It supports concurrent execution using threads and allows for the return of results as a dictionary.

The Parallel class can be visualized as follows:

#       /> module11 -> ... -> module1N -> out1 \
# input -> module21 -> ... -> module2N -> out2 -> (out1, out2, out3)
#       \> module31 -> ... -> module3N -> out3 /

The Parallel.sequential method can be visualized as follows:

# input -> module21 -> ... -> module2N -> out2 -> 

Parameters:

  • _scatter (bool, default: False ) –

    If True, the input is split across the items. If False, the same input is passed to all items. Defaults to False.

  • _concurrent (bool, default: True ) –

    If True, operations will be executed concurrently using threading. If False, operations will be executed sequentially. Defaults to True.

  • args

    Variable length argument list for the base class.

  • kwargs

    Arbitrary keyword arguments for the base class.

asdict property

Tag Parallel so that the return value of each call to Parallel is changed from a tuple to a dict. When using asdict, make sure that the elements of parallel are named, for example: parallel(name=value).

tuple property

Mark Parallel so that the return value of Parallel changes from package to tuple each time it is called.

list property

Mark Parallel so that the return value of Parallel changes from package to list each time it is called.

sum property

Mark Parallel so that the return value of Parallel is accumulated each time it is called.

join(self, string)

Mark Parallel so that the return value of Parallel is joined by string each time it is called.

Examples:

>>> import lazyllm
>>> test1 = lambda a: a + 1
>>> test2 = lambda a: a * 4
>>> test3 = lambda a: a / 2
>>> ppl = lazyllm.parallel(test1, test2, test3)
>>> ppl(1)
(2, 4, 0.5)
>>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3)
>>> ppl(1)
{2, 4, 0.5}
>>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).asdict
>>> ppl(2)
{'a': 3, 'b': 8, 'c': 1.0}
>>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).astuple
>>> ppl(-1)
(0, -4, -0.5)
>>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).aslist
>>> ppl(0)
[1, 0, 0.0]
>>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).join('\n')
>>> ppl(1)
'2\n4\n0.5'
Source code in lazyllm/flow/flow.py
class Parallel(LazyLLMFlowsBase):
    """A class for managing parallel flows in LazyLLMFlows.

This class inherits from LazyLLMFlowsBase and provides an interface for running operations in parallel or sequentially. It supports concurrent execution using threads and allows for the return of results as a dictionary.


The ``Parallel`` class can be visualized as follows:

```text
#       /> module11 -> ... -> module1N -> out1 \\
# input -> module21 -> ... -> module2N -> out2 -> (out1, out2, out3)
#       \> module31 -> ... -> module3N -> out3 /
```       

The ``Parallel.sequential`` method can be visualized as follows:

```text
# input -> module21 -> ... -> module2N -> out2 -> 
```

Args:
    _scatter (bool, optional): If ``True``, the input is split across the items. If ``False``, the same input is passed to all items. Defaults to ``False``.
    _concurrent (bool, optional): If ``True``, operations will be executed concurrently using threading. If ``False``, operations will be executed sequentially. Defaults to ``True``.
    args: Variable length argument list for the base class.
    kwargs: Arbitrary keyword arguments for the base class.

`asdict property`

Tag ``Parallel`` so that the return value of each call to ``Parallel`` is changed from a tuple to a dict. When using ``asdict``, make sure that the elements of ``parallel`` are named, for example: ``parallel(name=value)``.

`tuple property`

Mark Parallel so that the return value of Parallel changes from package to tuple each time it is called.

`list property`

Mark Parallel so that the return value of Parallel changes from package to list each time it is called.

`sum property`

Mark Parallel so that the return value of Parallel is accumulated each time it is called.

`join(self, string)`

Mark Parallel so that the return value of Parallel is joined by ``string`` each time it is called.


Examples:
    >>> import lazyllm
    >>> test1 = lambda a: a + 1
    >>> test2 = lambda a: a * 4
    >>> test3 = lambda a: a / 2
    >>> ppl = lazyllm.parallel(test1, test2, test3)
    >>> ppl(1)
    (2, 4, 0.5)
    >>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3)
    >>> ppl(1)
    {2, 4, 0.5}
    >>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).asdict
    >>> ppl(2)
    {'a': 3, 'b': 8, 'c': 1.0}
    >>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).astuple
    >>> ppl(-1)
    (0, -4, -0.5)
    >>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).aslist
    >>> ppl(0)
    [1, 0, 0.0]
    >>> ppl = lazyllm.parallel(a=test1, b=test2, c=test3).join('\\n')
    >>> ppl(1)
    '2\\n4\\n0.5'
    """

    class PostProcessType(Enum):
        NONE = 0
        DICT = 1
        TUPLE = 2
        LIST = 3
        SUM = 4
        JOIN = 5

    def __init__(self, *args, _scatter: bool = False, _concurrent: Union[bool, int] = True,
                 auto_capture: bool = False, **kw):
        super().__init__(*args, **kw, auto_capture=auto_capture)
        self._post_process_type = Parallel.PostProcessType.NONE
        self._post_process_args = None
        self._concurrent = _concurrent if not isinstance(_concurrent, bool) else 5 if _concurrent else 0
        self._scatter = _scatter

    @staticmethod
    def _set_status(self, type, args=None):
        assert self._post_process_type is Parallel.PostProcessType.NONE, 'Cannor set post process twice'
        self._post_process_type = type
        self._post_process_args = args
        return self

    asdict = property(partial(_set_status, type=PostProcessType.DICT))
    astuple = property(partial(_set_status, type=PostProcessType.TUPLE))
    aslist = property(partial(_set_status, type=PostProcessType.LIST))
    sum = property(partial(_set_status, type=PostProcessType.SUM))

    def join(self, string=''):
        assert isinstance(string, str), 'argument of join shoule be str'
        return Parallel._set_status(self, type=Parallel.PostProcessType.JOIN, args=string)

    @classmethod
    def sequential(cls, *args, **kw):
        return cls(*args, _concurrent=False, **kw)

    def _run(self, __input, items=None, **kw):
        if items is None:
            items = self._items
            size = len(items)
            if self._scatter:
                inputs = _split_input(__input, self._item_names if self._item_names else size)
            else:
                inputs = [__input] * size
        else:
            inputs = __input

        if self._concurrent:
            def impl(func, barrier, global_data, *args, **kw):
                lazyllm.globals._init_sid()
                lazyllm.globals._update(global_data)
                lazyllm.globals['bind_args'] = lazyllm.globals['bind_args'].copy()
                _barr.impl = barrier
                r = func(*args, **kw)
                lazyllm.globals.clear()
                return r

            loop, barrier = asyncio.new_event_loop(), threading.Barrier(len(items))
            with concurrent.futures.ThreadPoolExecutor(max_workers=self._concurrent) as executor:
                return package(loop.run_until_complete(asyncio.gather(*[loop.run_in_executor(executor, partial(
                    impl, self.invoke, barrier, lazyllm.globals._data, it, inp, **kw))
                    for it, inp in zip(items, inputs)])))
        else:
            return package(self.invoke(it, inp, **kw) for it, inp in zip(items, inputs))

    def _post_process(self, output):
        if self._post_process_type == Parallel.PostProcessType.DICT:
            assert self._item_names, 'Item name should be set when you want to return dict.'
            output = {k: v for k, v in zip(self._item_names, output)}
        elif self._post_process_type == Parallel.PostProcessType.TUPLE:
            output = tuple(output)
        elif self._post_process_type == Parallel.PostProcessType.LIST:
            output = list(output)
        elif self._post_process_type == Parallel.PostProcessType.SUM:
            output = ''.join([str(i) for i in output]) if isinstance(output[0], str) else sum(output, type(output[0])())
        elif self._post_process_type == Parallel.PostProcessType.JOIN:
            output = self._post_process_args.join([str(i) for i in output])
        return output

lazyllm.flow.Diverter

Bases: Parallel

A flow diverter that routes inputs through different modules in parallel.

The Diverter class is a specialized form of parallel processing where multiple inputs are each processed by a separate sequence of modules in parallel. The outputs are then aggregated and returned as a tuple.

This class is useful when you have distinct data processing pipelines that can be executed concurrently, and you want to manage them within a single flow construct.

#                 /> in1 -> module11 -> ... -> module1N -> out1 \
# (in1, in2, in3) -> in2 -> module21 -> ... -> module2N -> out2 -> (out1, out2, out3)
#                 \> in3 -> module31 -> ... -> module3N -> out3 /

Parameters:

  • args

    Variable length argument list representing the modules to be executed in parallel.

  • _concurrent (bool, default: True ) –

    A flag to control whether the modules should be run concurrently. Defaults to True. You can use Diverter.sequential instead of Diverter to set this variable.

  • kwargs

    Arbitrary keyword arguments representing additional modules, where the key is the name of the module.

Examples:

>>> import lazyllm
>>> div = lazyllm.diverter(lambda x: x+1, lambda x: x*2, lambda x: -x)
>>> div(1, 2, 3)
(2, 4, -3)
>>> div = lazyllm.diverter(a=lambda x: x+1, b=lambda x: x*2, c=lambda x: -x).asdict
>>> div(1, 2, 3)
{'a': 2, 'b': 4, 'c': -3}
>>> div(dict(c=3, b=2, a=1))
{'a': 2, 'b': 4, 'c': -3}
Source code in lazyllm/flow/flow.py
class Diverter(Parallel):
    """A flow diverter that routes inputs through different modules in parallel.

The Diverter class is a specialized form of parallel processing where multiple inputs are each processed by a separate sequence of modules in parallel. The outputs are then aggregated and returned as a tuple.

This class is useful when you have distinct data processing pipelines that can be executed concurrently, and you want to manage them within a single flow construct.

```text
#                 /> in1 -> module11 -> ... -> module1N -> out1 \\
# (in1, in2, in3) -> in2 -> module21 -> ... -> module2N -> out2 -> (out1, out2, out3)
#                 \> in3 -> module31 -> ... -> module3N -> out3 /
```                    

Args:
    args : Variable length argument list representing the modules to be executed in parallel.
    _concurrent (bool, optional): A flag to control whether the modules should be run concurrently. Defaults to ``True``. You can use ``Diverter.sequential`` instead of ``Diverter`` to set this variable.
    kwargs : Arbitrary keyword arguments representing additional modules, where the key is the name of the module.



Examples:
    >>> import lazyllm
    >>> div = lazyllm.diverter(lambda x: x+1, lambda x: x*2, lambda x: -x)
    >>> div(1, 2, 3)
    (2, 4, -3)
    >>> div = lazyllm.diverter(a=lambda x: x+1, b=lambda x: x*2, c=lambda x: -x).asdict
    >>> div(1, 2, 3)
    {'a': 2, 'b': 4, 'c': -3}
    >>> div(dict(c=3, b=2, a=1))
    {'a': 2, 'b': 4, 'c': -3}
    """
    def __init__(self, *args, _concurrent: Union[bool, int] = True, auto_capture: bool = False, **kw):
        super().__init__(*args, _scatter=True, _concurrent=_concurrent, auto_capture=auto_capture, **kw)

lazyllm.flow.Warp

Bases: Parallel

A flow warp that applies a single module to multiple inputs in parallel.

The Warp class is designed to apply the same processing module to a set of inputs. It effectively 'warps' the single module around the inputs so that each input is processed in parallel. The outputs are collected and returned as a tuple. It is important to note that this class cannot be used for asynchronous tasks, such as training and deployment.

#                 /> in1 \                            /> out1 \
# (in1, in2, in3) -> in2 -> module1 -> ... -> moduleN -> out2 -> (out1, out2, out3)
#                 \> in3 /                            \> out3 /

Parameters:

  • args

    Variable length argument list representing the single module to be applied to all inputs.

  • kwargs

    Arbitrary keyword arguments for future extensions.

Note
  • Only one function is allowed in warp.
  • The Warp flow should not be used for asynchronous tasks such as training and deployment.

Examples:

>>> import lazyllm
>>> warp = lazyllm.warp(lambda x: x * 2)
>>> warp(1, 2, 3, 4)
(2, 4, 6, 8)
>>> warp = lazyllm.warp(lazyllm.pipeline(lambda x: x * 2, lambda x: f'get {x}'))
>>> warp(1, 2, 3, 4)
('get 2', 'get 4', 'get 6', 'get 8')
Source code in lazyllm/flow/flow.py
class Warp(Parallel):
    """A flow warp that applies a single module to multiple inputs in parallel.

The Warp class is designed to apply the same processing module to a set of inputs. It effectively 'warps' the single module around the inputs so that each input is processed in parallel. The outputs are collected and returned as a tuple. It is important to note that this class cannot be used for asynchronous tasks, such as training and deployment.

```text
#                 /> in1 \                            /> out1 \\
# (in1, in2, in3) -> in2 -> module1 -> ... -> moduleN -> out2 -> (out1, out2, out3)
#                 \> in3 /                            \> out3 /
``` 

Args:
    args: Variable length argument list representing the single module to be applied to all inputs.
    kwargs: Arbitrary keyword arguments for future extensions.

Note:
    - Only one function is allowed in warp.
    - The Warp flow should not be used for asynchronous tasks such as training and deployment.


Examples:
    >>> import lazyllm
    >>> warp = lazyllm.warp(lambda x: x * 2)
    >>> warp(1, 2, 3, 4)
    (2, 4, 6, 8)
    >>> warp = lazyllm.warp(lazyllm.pipeline(lambda x: x * 2, lambda x: f'get {x}'))
    >>> warp(1, 2, 3, 4)
    ('get 2', 'get 4', 'get 6', 'get 8')
    """
    def __init__(self, *args, _scatter: bool = False, _concurrent: Union[bool, int] = True,
                 auto_capture: bool = False, **kw):
        super().__init__(*args, _scatter=_scatter, _concurrent=_concurrent, auto_capture=auto_capture, **kw)
        if len(self._items) > 1: self._items = [Pipeline(*self._items)]

    def __post_init__(self):
        if len(self._items) > 1: self._items = [Pipeline(*self._items)]

    def _run(self, __input, **kw):
        assert 1 == len(self._items), 'Only one function is enabled in warp'
        inputs = _split_input(__input)
        items = self._items * len(inputs)
        return super(__class__, self)._run(inputs, items, **kw)

    @property
    def asdict(self): raise NotImplementedError

lazyllm.flow.IFS

Bases: LazyLLMFlowsBase

Implements an If-Else functionality within the LazyLLMFlows framework.

The IFS (If-Else Flow Structure) class is designed to conditionally execute one of two provided paths (true path or false path) based on the evaluation of a given condition. After the execution of the selected path, an optional post-action can be applied, and the input can be returned alongside the output if specified.

Parameters:

  • cond (callable) –

    A callable that takes the input and returns a boolean. It determines which path to execute. If cond(input) evaluates to True, tpath is executed; otherwise, fpath is executed.

  • tpath (callable) –

    The path to be executed if the condition is True.

  • fpath (callable) –

    The path to be executed if the condition is False.

  • post_action (callable, default: None ) –

    An optional callable that is executed after the selected path. It can be used to perform cleanup or further processing. Defaults to None.

Returns:

  • The output of the executed path.

Examples:

>>> import lazyllm
>>> cond = lambda x: x > 0
>>> tpath = lambda x: x * 2
>>> fpath = lambda x: -x
>>> ifs_flow = lazyllm.ifs(cond, tpath, fpath)
>>> ifs_flow(10)
20
>>> ifs_flow(-5)
5
Source code in lazyllm/flow/flow.py
class IFS(LazyLLMFlowsBase):
    """Implements an If-Else functionality within the LazyLLMFlows framework.

The IFS (If-Else Flow Structure) class is designed to conditionally execute one of two provided
paths (true path or false path) based on the evaluation of a given condition. After the execution
of the selected path, an optional post-action can be applied, and the input can be returned alongside
the output if specified.

Args:
    cond (callable): A callable that takes the input and returns a boolean. It determines which path
                        to execute. If ``cond(input)`` evaluates to True, ``tpath`` is executed; otherwise,
                        ``fpath`` is executed.
    tpath (callable): The path to be executed if the condition is True.
    fpath (callable): The path to be executed if the condition is False.
    post_action (callable, optional): An optional callable that is executed after the selected path.
                                        It can be used to perform cleanup or further processing. Defaults to None.

**Returns:**

- The output of the executed path.


Examples:
    >>> import lazyllm
    >>> cond = lambda x: x > 0
    >>> tpath = lambda x: x * 2
    >>> fpath = lambda x: -x
    >>> ifs_flow = lazyllm.ifs(cond, tpath, fpath)
    >>> ifs_flow(10)
    20
    >>> ifs_flow(-5)
    5
    """
    def __init__(self, cond, tpath, fpath, post_action=None):
        super().__init__(cond, tpath, fpath, post_action=post_action)

    def _run(self, __input, **kw):
        cond, tpath, fpath = self._items
        try:
            flag = cond()
        except Exception:
            flag = cond if isinstance(cond, bool) else self.invoke(cond, __input, **kw)
        return self.invoke(tpath if flag else fpath, __input, **kw)

lazyllm.flow.Switch

Bases: LazyLLMFlowsBase

A control flow mechanism that selects and executes a flow based on a condition.

The Switch class provides a way to choose between different flows depending on the value of an expression or the truthiness of conditions. It is similar to a switch-case statement found in other programming languages.

# switch(exp):
#     case cond1: input -> module11 -> ... -> module1N -> out; break
#     case cond2: input -> module21 -> ... -> module2N -> out; break
#     case cond3: input -> module31 -> ... -> module3N -> out; break

Parameters:

  • args

    A variable length argument list, alternating between conditions and corresponding flows or functions. Conditions are either callables returning a boolean or values to be compared with the input expression.

  • post_action (callable, default: None ) –

    A function to be called on the output after the selected flow is executed. Defaults to None.

  • judge_on_full_input(bool)

    If set to True, the conditional judgment will be performed through the input of switch, otherwise the input will be split into two parts: the judgment condition and the actual input, and only the judgment condition will be judged.

  • kwargs

    Arbitrary keyword arguments representing named conditions and corresponding flows or functions.

Raises:

  • TypeError

    If an odd number of arguments are provided, or if the first argument is not a dictionary and the conditions are not provided in pairs.

Examples:

>>> import lazyllm
>>> def is_positive(x): return x > 0
...
>>> def is_negative(x): return x < 0
...
>>> switch = lazyllm.switch(is_positive, lambda x: 2 * x, is_negative, lambda x : -x, 'default', lambda x : '000', judge_on_full_input=True)
>>>
>>> switch(1)
2
>>> switch(0)
'000'
>>> switch(-4)
4
>>>
>>> def is_1(x): return True if x == 1 else False
...
>>> def is_2(x): return True if x == 2 else False
...
>>> def is_3(x): return True if x == 3 else False
...
>>> def t1(x): return 2 * x
...
>>> def t2(x): return 3 * x
...
>>> def t3(x): return x
...
>>> with lazyllm.switch(judge_on_full_input=True) as sw:
...     sw.case[is_1::t1]
...     sw.case(is_2, t2)
...     sw.case[is_3, t3]
...
>>> sw(1)
2
>>> sw(2)
6
>>> sw(3)
3
Source code in lazyllm/flow/flow.py
class Switch(LazyLLMFlowsBase):
    """A control flow mechanism that selects and executes a flow based on a condition.

The ``Switch`` class provides a way to choose between different flows depending on the value of an expression or the truthiness of conditions. It is similar to a switch-case statement found in other programming languages.

```text
# switch(exp):
#     case cond1: input -> module11 -> ... -> module1N -> out; break
#     case cond2: input -> module21 -> ... -> module2N -> out; break
#     case cond3: input -> module31 -> ... -> module3N -> out; break
``` 

Args:
    args: A variable length argument list, alternating between conditions and corresponding flows or functions. Conditions are either callables returning a boolean or values to be compared with the input expression.
    post_action (callable, optional): A function to be called on the output after the selected flow is executed. Defaults to ``None``.
    judge_on_full_input(bool): If set to ``True``, the conditional judgment will be performed through the input of ``switch``, otherwise the input will be split into two parts: the judgment condition and the actual input, and only the judgment condition will be judged.
    kwargs: Arbitrary keyword arguments representing named conditions and corresponding flows or functions.

Raises:
    TypeError: If an odd number of arguments are provided, or if the first argument is not a dictionary and the conditions are not provided in pairs.


Examples:
    >>> import lazyllm
    >>> def is_positive(x): return x > 0
    ...
    >>> def is_negative(x): return x < 0
    ...
    >>> switch = lazyllm.switch(is_positive, lambda x: 2 * x, is_negative, lambda x : -x, 'default', lambda x : '000', judge_on_full_input=True)
    >>>
    >>> switch(1)
    2
    >>> switch(0)
    '000'
    >>> switch(-4)
    4
    >>>
    >>> def is_1(x): return True if x == 1 else False
    ...
    >>> def is_2(x): return True if x == 2 else False
    ...
    >>> def is_3(x): return True if x == 3 else False
    ...
    >>> def t1(x): return 2 * x
    ...
    >>> def t2(x): return 3 * x
    ...
    >>> def t3(x): return x
    ...
    >>> with lazyllm.switch(judge_on_full_input=True) as sw:
    ...     sw.case[is_1::t1]
    ...     sw.case(is_2, t2)
    ...     sw.case[is_3, t3]
    ...
    >>> sw(1)
    2
    >>> sw(2)
    6
    >>> sw(3)
    3
    """
    # Switch({cond1: M1, cond2: M2, ..., condN: MN})
    # Switch(cond1, M1, cond2, M2, ..., condN, MN)
    def __init__(self, *args, conversion=None, post_action=None, judge_on_full_input=True):
        if len(args) == 1 and isinstance(args[0], dict):
            self.conds, items = list(args[0].keys()), list(args[0].values())
        else:
            self.conds, items = list(args[0::2]), args[1::2]
        super().__init__(*items, post_action=post_action)
        self._judge_on_full_input = judge_on_full_input
        self._set_conversion(conversion)

    def _set_conversion(self, conversion):
        self._conversion = conversion

    def _run(self, __input, **kw):
        exp = __input
        if not self._judge_on_full_input:
            assert isinstance(__input, tuple) and len(__input) >= 2
            exp = __input[0]
            __input = __input[1] if len(__input) == 2 else __input[1:]
        if self._conversion: exp = self._conversion(exp)

        for idx, cond in enumerate(self.conds):
            if (callable(cond) and self.invoke(cond, exp) is True) or (exp == cond) or (
                    exp == package((cond,))) or cond == 'default':
                return self.invoke(self._items[idx], __input, **kw)

    class Case:
        def __init__(self, m) -> None: self._m = m
        def __call__(self, cond, func): self._m._add_case(cond, func)

        def __getitem__(self, key):
            if isinstance(key, slice):
                if key.start:
                    if (callable(key.step) and key.stop is None):
                        return self._m._add_case(key.start, key.step)
                    elif (key.step is None and callable(key.stop)):
                        return self._m._add_case(key.start, key.stop)
            elif isinstance(key, tuple) and len(key) == 2 and callable(key[1]):
                return self._m._add_case(key[0], key[1])
            raise RuntimeError(f'Only [cond::func], [cond:func] or [cond, func] is allowed in case, but you give {key}')

    @property
    def case(self): return Switch.Case(self)

    def _add_case(self, case, func):
        self.conds.append(case)
        self._add(None, func)

lazyllm.flow.Loop

Bases: Pipeline

Initializes a Loop flow structure which repeatedly applies a sequence of functions to an input until a stop condition is met or a specified count of iterations is reached.

The Loop structure allows for the definition of a simple control flow where a series of steps are applied in a loop, with an optional stop condition that can be used to exit the loop based on the output of the steps.

Parameters:

  • *item (callable or list of callables, default: () ) –

    The function(s) or callable object(s) that will be applied in the loop.

  • stop_condition (callable, default: None ) –

    A function that takes the output of the last item in the loop as input and returns a boolean. If it returns True, the loop will stop. If None, the loop will continue until count is reached. Defaults to None.

  • count (int, default: maxsize ) –

    The maximum number of iterations to run the loop for. If None, the loop will continue indefinitely or until stop_condition returns True. Defaults to None.

  • post_action (callable, default: None ) –

    A function to be called with the final output after the loop ends. Defaults to None.

  • judge_on_full_input(bool)

    If set to True, the conditional judgment will be performed through the input of stop_condition, otherwise the input will be split into two parts: the judgment condition and the actual input, and only the judgment condition will be judged.

Raises:

  • AssertionError

    If both stop_condition and count are provided or if count is not an integer when provided.

Examples:

>>> import lazyllm
>>> loop = lazyllm.loop(lambda x: x * 2, stop_condition=lambda x: x > 10, judge_on_full_input=True)
>>> loop(1)
16
>>> loop(3)
12
>>>
>>> with lazyllm.loop(stop_condition=lambda x: x > 10, judge_on_full_input=True) as lp:
...    lp.f1 = lambda x: x + 1
...    lp.f2 = lambda x: x * 2
...
>>> lp(0)
14
Source code in lazyllm/flow/flow.py
class Loop(Pipeline):
    """Initializes a Loop flow structure which repeatedly applies a sequence of functions to an input until a stop condition is met or a specified count of iterations is reached.

The Loop structure allows for the definition of a simple control flow where a series of steps are applied in a loop, with an optional stop condition that can be used to exit the loop based on the output of the steps.

Args:
    *item (callable or list of callables): The function(s) or callable object(s) that will be applied in the loop.
    stop_condition (callable, optional): A function that takes the output of the last item in the loop as input and returns a boolean. If it returns ``True``, the loop will stop. If ``None``, the loop will continue until ``count`` is reached. Defaults to ``None``.
    count (int, optional): The maximum number of iterations to run the loop for. If ``None``, the loop will continue indefinitely or until ``stop_condition`` returns ``True``. Defaults to ``None``.
    post_action (callable, optional): A function to be called with the final output after the loop ends. Defaults to ``None``.
    judge_on_full_input(bool): If set to ``True``, the conditional judgment will be performed through the input of ``stop_condition``, otherwise the input will be split into two parts: the judgment condition and the actual input, and only the judgment condition will be judged.

Raises:
    AssertionError: If both ``stop_condition`` and ``count`` are provided or if ``count`` is not an integer when provided.


Examples:
    >>> import lazyllm
    >>> loop = lazyllm.loop(lambda x: x * 2, stop_condition=lambda x: x > 10, judge_on_full_input=True)
    >>> loop(1)
    16
    >>> loop(3)
    12
    >>>
    >>> with lazyllm.loop(stop_condition=lambda x: x > 10, judge_on_full_input=True) as lp:
    ...    lp.f1 = lambda x: x + 1
    ...    lp.f2 = lambda x: x * 2
    ...
    >>> lp(0)
    14
    """
    def __init__(self, *item, stop_condition=None, count=sys.maxsize, post_action=None,
                 auto_capture=False, judge_on_full_input=True, **kw):
        super().__init__(*item, post_action=post_action, auto_capture=auto_capture, **kw)
        assert callable(stop_condition) or stop_condition is None
        self._judge_on_full_input = judge_on_full_input
        self._stop_condition = stop_condition
        self._loop_count = count