Skip to content

Module

lazyllm.module.ModuleBase

Bases: object

Module is the top-level component in LazyLLM, possessing four key capabilities: training, deployment, inference, and evaluation. Each module can choose to implement some or all of these capabilities, and each capability can be composed of one or more components. ModuleBase itself cannot be instantiated directly; subclasses that inherit and implement the forward function can be used as a functor. Similar to PyTorch's Module, when a Module A holds an instance of another Module B as a member variable, B will be automatically added to A's submodules. If you need the following capabilities, please have your custom class inherit from ModuleBase:

  1. Combine some or all of the training, deployment, inference, and evaluation capabilities. For example, an Embedding model requires training and inference.

  2. If you want the member variables to possess some or all of the capabilities for training, deployment, and evaluation, and you want to train, deploy, and evaluate these members through the start, update, eval, and other methods of the Module's root node.

  3. Pass user-set parameters directly to your custom module from the outermost layer (refer to WebModule).

  4. The desire for it to be usable by the parameter grid search module (refer to TrialModule).

Examples:

>>> import lazyllm
>>> class Module(lazyllm.module.ModuleBase):
...     pass
... 
>>> class Module2(lazyllm.module.ModuleBase):
...     def __init__(self):
...         super(__class__, self).__init__()
...         self.m = Module()
... 
>>> m = Module2()
>>> m.submodules
[<__main__.Module object at 0x7f3dc3bb16f0>]
>>> m.m3 = Module()
>>> m.submodules
[<__main__.Module object at 0x7f3dc3bb16f0>, <__main__.Module object at 0x7f3dc3bb0be0>]
Source code in lazyllm/module/module.py
class ModuleBase(object):
    """Module is the top-level component in LazyLLM, possessing four key capabilities: training, deployment, inference, and evaluation. Each module can choose to implement some or all of these capabilities, and each capability can be composed of one or more components.
ModuleBase itself cannot be instantiated directly; subclasses that inherit and implement the forward function can be used as a functor.
Similar to PyTorch's Module, when a Module A holds an instance of another Module B as a member variable, B will be automatically added to A's submodules.
If you need the following capabilities, please have your custom class inherit from ModuleBase:

1. Combine some or all of the training, deployment, inference, and evaluation capabilities. For example, an Embedding model requires training and inference.

2. If you want the member variables to possess some or all of the capabilities for training, deployment, and evaluation, and you want to train, deploy, and evaluate these members through the start, update, eval, and other methods of the Module's root node.

3. Pass user-set parameters directly to your custom module from the outermost layer (refer to WebModule).

4. The desire for it to be usable by the parameter grid search module (refer to TrialModule).


Examples:
    >>> import lazyllm
    >>> class Module(lazyllm.module.ModuleBase):
    ...     pass
    ... 
    >>> class Module2(lazyllm.module.ModuleBase):
    ...     def __init__(self):
    ...         super(__class__, self).__init__()
    ...         self.m = Module()
    ... 
    >>> m = Module2()
    >>> m.submodules
    [<__main__.Module object at 0x7f3dc3bb16f0>]
    >>> m.m3 = Module()
    >>> m.submodules
    [<__main__.Module object at 0x7f3dc3bb16f0>, <__main__.Module object at 0x7f3dc3bb0be0>]
    """
    builder_keys = []  # keys in builder support Option by default

    def __new__(cls, *args, **kw):
        sig = inspect.signature(cls.__init__)
        paras = sig.parameters
        values = list(paras.values())[1:]  # paras.value()[0] is self
        for i, p in enumerate(args):
            if isinstance(p, Option):
                ann = values[i].annotation
                assert ann == Option or (isinstance(ann, (tuple, list)) and Option in ann), \
                    f'{values[i].name} cannot accept Option'
        for k, v in kw.items():
            if isinstance(v, Option):
                ann = paras[k].annotation
                assert ann == Option or (isinstance(ann, (tuple, list)) and Option in ann), \
                    f'{k} cannot accept Option'
        return object.__new__(cls)

    def __init__(self, *, return_trace=False):
        self._submodules = []
        self._evalset = None
        self._return_trace = return_trace
        self.mode_list = ('train', 'server', 'eval')
        self._set_mid()
        self._module_name = None
        self._options = []
        self.eval_result = None

    def __setattr__(self, name: str, value):
        if isinstance(value, ModuleBase):
            self._submodules.append(value)
        elif isinstance(value, Option):
            self._options.append(value)
        elif name.endswith('_args') and isinstance(value, dict):
            for v in value.values():
                if isinstance(v, Option):
                    self._options.append(v)
        return super().__setattr__(name, value)

    def __getattr__(self, key):
        def _setattr(v, *, _return_value=self, **kw):
            k = key[:-7] if key.endswith('_method') else key
            if isinstance(v, tuple) and len(v) == 2 and isinstance(v[1], dict):
                kw.update(v[1])
                v = v[0]
            if len(kw) > 0:
                setattr(self, f'_{k}_args', kw)
            setattr(self, f'_{k}', v)
            if hasattr(self, f'_{k}_setter_hook'): getattr(self, f'_{k}_setter_hook')()
            return _return_value
        keys = self.__class__.builder_keys
        if key in keys:
            return _setattr
        elif key.startswith('_') and key[1:] in keys:
            return None
        elif key.startswith('_') and key.endswith('_args') and (key[1:-5] in keys or f'{key[1:-4]}method' in keys):
            return dict()
        raise AttributeError(f'{self.__class__} object has no attribute {key}')

    def __call__(self, *args, **kw):
        try:
            kw.update(globals['global_parameters'].get(self._module_id, dict()))
            if (history := globals['chat_history'].get(self._module_id)) is not None: kw['llm_chat_history'] = history
            r = self.forward(**args[0], **kw) if args and isinstance(args[0], kwargs) else self.forward(*args, **kw)
            if self._return_trace:
                lazyllm.FileSystemQueue.get_instance('lazy_trace').enqueue(str(r))
        except Exception as e:
            raise RuntimeError(f'\nAn error occured in {self.__class__} with name {self.name}.\n'
                               f'Args:\n{args}\nKwargs\n{kw}\nError messages:\n{e}\n')
        return r

    # interfaces
    def forward(self, *args, **kw):
        """Define computation steps executed each time, all subclasses of ModuleBase need to override.


Examples:
    >>> import lazyllm
    >>> class MyModule(lazyllm.module.ModuleBase):
    ...     def forward(self, input):
    ...         return input + 1
    ... 
    >>> MyModule()(1)
    2   
    """
        raise NotImplementedError
    def _get_train_tasks(self):
        """Define a training task. This function returns a training pipeline. Subclasses that override this function can be trained or fine-tuned during the update phase.


Examples:
    >>> import lazyllm
    >>> class MyModule(lazyllm.module.ModuleBase):
    ...     def _get_train_tasks(self):
    ...         return lazyllm.pipeline(lambda : 1, lambda x: print(x))
    ... 
    >>> MyModule().update()
    1
    """
        return None
    def _get_deploy_tasks(self):
        """Define a deployment task. This function returns a deployment pipeline. Subclasses that override this function can be deployed during the update/start phase.


Examples:
    >>> import lazyllm
    >>> class MyModule(lazyllm.module.ModuleBase):
    ...     def _get_deploy_tasks(self):
    ...         return lazyllm.pipeline(lambda : 1, lambda x: print(x))
    ... 
    >>> MyModule().start()
    1
    """
        return None
    def _get_post_process_tasks(self): return None

    def _set_mid(self, mid=None):
        self._module_id = mid if mid else str(uuid.uuid4().hex)
        return self

    _url_id = property(lambda self: self._module_id)

    @property
    def name(self): return self._module_name
    @name.setter
    def name(self, name): self._module_name = name

    @property
    def submodules(self): return self._submodules

    def evalset(self, evalset, load_f=None, collect_f=lambda x: x):
        """during update or eval, and the results will be stored in the eval_result variable.


Examples:
    >>> import lazyllm
    >>> m = lazyllm.module.TrainableModule().deploy_method(layzllm.deploy.dummy).finetune_method(lazyllm.finetune.dummy).mode("finetune").prompt(None)
    >>> m.evalset([1, 2, 3])
    >>> m.update()
    INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}
    >>> print(m.eval_result)
    ["reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1}"]
    """
        if isinstance(evalset, str) and os.path.exists(evalset):
            with open(evalset) as f:
                assert callable(load_f)
                self._evalset = load_f(f)
        else:
            self._evalset = evalset
        self.eval_result_collet_f = collect_f

    # TODO: add lazyllm.eval
    def _get_eval_tasks(self):
        def set_result(x): self.eval_result = x

        def parallel_infer():
            with ThreadPoolExecutor(max_workers=100) as executor:
                results = list(executor.map(lambda item: self(**item)
                                            if isinstance(item, dict) else self(item), self._evalset))
            return results
        if self._evalset:
            return Pipeline(parallel_infer,
                            lambda x: self.eval_result_collet_f(x),
                            set_result)
        return None

    # update module(train or finetune),
    def _update(self, *, mode=None, recursive=True):  # noqa C901
        if not mode: mode = list(self.mode_list)
        if type(mode) is not list: mode = [mode]
        for item in mode:
            assert item in self.mode_list, f"Cannot find {item} in mode list: {self.mode_list}"
        # dfs to get all train tasks
        train_tasks, deploy_tasks, eval_tasks, post_process_tasks = FlatList(), FlatList(), FlatList(), FlatList()
        stack, visited = [(self, iter(self.submodules if recursive else []))], set()
        while len(stack) > 0:
            try:
                top = next(stack[-1][1])
                stack.append((top, iter(top.submodules)))
            except StopIteration:
                top = stack.pop()[0]
                if top._module_id in visited: continue
                visited.add(top._module_id)
                if 'train' in mode: train_tasks.absorb(top._get_train_tasks())
                if 'server' in mode: deploy_tasks.absorb(top._get_deploy_tasks())
                if 'eval' in mode: eval_tasks.absorb(top._get_eval_tasks())
                post_process_tasks.absorb(top._get_post_process_tasks())

        if 'train' in mode and len(train_tasks) > 0:
            Parallel(*train_tasks).set_sync(True)()
        if 'server' in mode and len(deploy_tasks) > 0:
            if redis_client:
                Parallel(*deploy_tasks).set_sync(False)()
            else:
                Parallel.sequential(*deploy_tasks)()
        if 'eval' in mode and len(eval_tasks) > 0:
            Parallel.sequential(*eval_tasks)()
        Parallel.sequential(*post_process_tasks)()
        return self

    def update(self, *, recursive=True):
        """Update the module (and all its submodules). The module will be updated when the ``_get_train_tasks`` method is overridden.

Args:
    recursive (bool): Whether to recursively update all submodules, default is True.


Examples:
    >>> import lazyllm
    >>> m = lazyllm.module.TrainableModule().finetune_method(lazyllm.finetune.dummy).deploy_method(lazyllm.deploy.dummy).mode('finetune').prompt(None)
    >>> m.evalset([1, 2, 3])
    >>> m.update()
    INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}
    >>> print(m.eval_result)
    ["reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1}"]
    """
        return self._update(mode=['train', 'server', 'eval'], recursive=recursive)
    def update_server(self, *, recursive=True): return self._update(mode=['server'], recursive=recursive)
    def eval(self, *, recursive=True):
        """Evaluate the module (and all its submodules). This function takes effect after the module has been set with an evaluation set using 'evalset'.

Args:
    recursive (bool): Whether to recursively evaluate all submodules. Defaults to True.


Examples:
    >>> import lazyllm
    >>> class MyModule(lazyllm.module.ModuleBase):
    ...     def forward(self, input):
    ...         return f'reply for input'
    ... 
    >>> m = MyModule()
    >>> m.evalset([1, 2, 3])
    >>> m.eval().eval_result
    ['reply for input', 'reply for input', 'reply for input']
    """
        return self._update(mode=['eval'], recursive=recursive)
    def start(self):
        """Deploy the module and all its submodules.


Examples:
    >>> import lazyllm
    >>> m = lazyllm.TrainableModule().deploy_method(lazyllm.deploy.dummy).prompt(None)
    >>> m.start()
    <Module type=Trainable mode=None basemodel= target= stream=False return_trace=False>
    >>> m(1)
    "reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}"
    """
        return self._update(mode=['server'], recursive=True)
    def restart(self):
        """Re-deploy the module and all its submodules.


Examples:
    >>> import lazyllm
    >>> m = lazyllm.TrainableModule().deploy_method(lazyllm.deploy.dummy).prompt(None)
    >>> m.restart()
    <Module type=Trainable mode=None basemodel= target= stream=False return_trace=False>
    >>> m(1)
    "reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}"
    """
        return self.start()
    def wait(self): pass

    @property
    def options(self):
        options = self._options.copy()
        for m in self.submodules:
            options += m.options
        return options

    def _overwrote(self, f):
        return getattr(self.__class__, f) is not getattr(__class__, f)

    def __repr__(self):
        return lazyllm.make_repr('Module', self.__class__, name=self.name)

_get_deploy_tasks()

Define a deployment task. This function returns a deployment pipeline. Subclasses that override this function can be deployed during the update/start phase.

Examples:

>>> import lazyllm
>>> class MyModule(lazyllm.module.ModuleBase):
...     def _get_deploy_tasks(self):
...         return lazyllm.pipeline(lambda : 1, lambda x: print(x))
... 
>>> MyModule().start()
1
Source code in lazyllm/module/module.py
    def _get_deploy_tasks(self):
        """Define a deployment task. This function returns a deployment pipeline. Subclasses that override this function can be deployed during the update/start phase.


Examples:
    >>> import lazyllm
    >>> class MyModule(lazyllm.module.ModuleBase):
    ...     def _get_deploy_tasks(self):
    ...         return lazyllm.pipeline(lambda : 1, lambda x: print(x))
    ... 
    >>> MyModule().start()
    1
    """
        return None

_get_train_tasks()

Define a training task. This function returns a training pipeline. Subclasses that override this function can be trained or fine-tuned during the update phase.

Examples:

>>> import lazyllm
>>> class MyModule(lazyllm.module.ModuleBase):
...     def _get_train_tasks(self):
...         return lazyllm.pipeline(lambda : 1, lambda x: print(x))
... 
>>> MyModule().update()
1
Source code in lazyllm/module/module.py
    def _get_train_tasks(self):
        """Define a training task. This function returns a training pipeline. Subclasses that override this function can be trained or fine-tuned during the update phase.


Examples:
    >>> import lazyllm
    >>> class MyModule(lazyllm.module.ModuleBase):
    ...     def _get_train_tasks(self):
    ...         return lazyllm.pipeline(lambda : 1, lambda x: print(x))
    ... 
    >>> MyModule().update()
    1
    """
        return None

eval(*, recursive=True)

Evaluate the module (and all its submodules). This function takes effect after the module has been set with an evaluation set using 'evalset'.

Parameters:

  • recursive (bool, default: True ) –

    Whether to recursively evaluate all submodules. Defaults to True.

Examples:

>>> import lazyllm
>>> class MyModule(lazyllm.module.ModuleBase):
...     def forward(self, input):
...         return f'reply for input'
... 
>>> m = MyModule()
>>> m.evalset([1, 2, 3])
>>> m.eval().eval_result
['reply for input', 'reply for input', 'reply for input']
Source code in lazyllm/module/module.py
    def eval(self, *, recursive=True):
        """Evaluate the module (and all its submodules). This function takes effect after the module has been set with an evaluation set using 'evalset'.

Args:
    recursive (bool): Whether to recursively evaluate all submodules. Defaults to True.


Examples:
    >>> import lazyllm
    >>> class MyModule(lazyllm.module.ModuleBase):
    ...     def forward(self, input):
    ...         return f'reply for input'
    ... 
    >>> m = MyModule()
    >>> m.evalset([1, 2, 3])
    >>> m.eval().eval_result
    ['reply for input', 'reply for input', 'reply for input']
    """
        return self._update(mode=['eval'], recursive=recursive)

evalset(evalset, load_f=None, collect_f=lambda x: x)

during update or eval, and the results will be stored in the eval_result variable.

Examples:

>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().deploy_method(layzllm.deploy.dummy).finetune_method(lazyllm.finetune.dummy).mode("finetune").prompt(None)
>>> m.evalset([1, 2, 3])
>>> m.update()
INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}
>>> print(m.eval_result)
["reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1}"]
Source code in lazyllm/module/module.py
    def evalset(self, evalset, load_f=None, collect_f=lambda x: x):
        """during update or eval, and the results will be stored in the eval_result variable.


Examples:
    >>> import lazyllm
    >>> m = lazyllm.module.TrainableModule().deploy_method(layzllm.deploy.dummy).finetune_method(lazyllm.finetune.dummy).mode("finetune").prompt(None)
    >>> m.evalset([1, 2, 3])
    >>> m.update()
    INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}
    >>> print(m.eval_result)
    ["reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1}"]
    """
        if isinstance(evalset, str) and os.path.exists(evalset):
            with open(evalset) as f:
                assert callable(load_f)
                self._evalset = load_f(f)
        else:
            self._evalset = evalset
        self.eval_result_collet_f = collect_f

forward(*args, **kw)

Define computation steps executed each time, all subclasses of ModuleBase need to override.

Examples:

>>> import lazyllm
>>> class MyModule(lazyllm.module.ModuleBase):
...     def forward(self, input):
...         return input + 1
... 
>>> MyModule()(1)
2
Source code in lazyllm/module/module.py
    def forward(self, *args, **kw):
        """Define computation steps executed each time, all subclasses of ModuleBase need to override.


Examples:
    >>> import lazyllm
    >>> class MyModule(lazyllm.module.ModuleBase):
    ...     def forward(self, input):
    ...         return input + 1
    ... 
    >>> MyModule()(1)
    2   
    """
        raise NotImplementedError

start()

Deploy the module and all its submodules.

Examples:

>>> import lazyllm
>>> m = lazyllm.TrainableModule().deploy_method(lazyllm.deploy.dummy).prompt(None)
>>> m.start()
<Module type=Trainable mode=None basemodel= target= stream=False return_trace=False>
>>> m(1)
"reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}"
Source code in lazyllm/module/module.py
    def start(self):
        """Deploy the module and all its submodules.


Examples:
    >>> import lazyllm
    >>> m = lazyllm.TrainableModule().deploy_method(lazyllm.deploy.dummy).prompt(None)
    >>> m.start()
    <Module type=Trainable mode=None basemodel= target= stream=False return_trace=False>
    >>> m(1)
    "reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}"
    """
        return self._update(mode=['server'], recursive=True)

restart()

Re-deploy the module and all its submodules.

Examples:

>>> import lazyllm
>>> m = lazyllm.TrainableModule().deploy_method(lazyllm.deploy.dummy).prompt(None)
>>> m.restart()
<Module type=Trainable mode=None basemodel= target= stream=False return_trace=False>
>>> m(1)
"reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}"
Source code in lazyllm/module/module.py
    def restart(self):
        """Re-deploy the module and all its submodules.


Examples:
    >>> import lazyllm
    >>> m = lazyllm.TrainableModule().deploy_method(lazyllm.deploy.dummy).prompt(None)
    >>> m.restart()
    <Module type=Trainable mode=None basemodel= target= stream=False return_trace=False>
    >>> m(1)
    "reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}"
    """
        return self.start()

update(*, recursive=True)

Update the module (and all its submodules). The module will be updated when the _get_train_tasks method is overridden.

Parameters:

  • recursive (bool, default: True ) –

    Whether to recursively update all submodules, default is True.

Examples:

>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().finetune_method(lazyllm.finetune.dummy).deploy_method(lazyllm.deploy.dummy).mode('finetune').prompt(None)
>>> m.evalset([1, 2, 3])
>>> m.update()
INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}
>>> print(m.eval_result)
["reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1}"]
Source code in lazyllm/module/module.py
    def update(self, *, recursive=True):
        """Update the module (and all its submodules). The module will be updated when the ``_get_train_tasks`` method is overridden.

Args:
    recursive (bool): Whether to recursively update all submodules, default is True.


Examples:
    >>> import lazyllm
    >>> m = lazyllm.module.TrainableModule().finetune_method(lazyllm.finetune.dummy).deploy_method(lazyllm.deploy.dummy).mode('finetune').prompt(None)
    >>> m.evalset([1, 2, 3])
    >>> m.update()
    INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}
    >>> print(m.eval_result)
    ["reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1}"]
    """
        return self._update(mode=['train', 'server', 'eval'], recursive=recursive)

lazyllm.module.ActionModule

Bases: ModuleBase

Used to wrap a Module around functions, modules, flows, Module, and other callable objects. The wrapped Module (including the Module within the flow) will become a submodule of this Module.

Parameters:

  • action (Callable | list[Callable], default: () ) –

    The object to be wrapped, which is one or a set of callable objects.

Examples:

>>> import lazyllm
>>> def myfunc(input): return input + 1
... 
>>> class MyModule1(lazyllm.module.ModuleBase):
...     def forward(self, input): return input * 2
... 
>>> class MyModule2(lazyllm.module.ModuleBase):
...     def _get_deploy_tasks(self): return lazyllm.pipeline(lambda : print('MyModule2 deployed!'))
...     def forward(self, input): return input * 4
... 
>>> class MyModule3(lazyllm.module.ModuleBase):
...     def _get_deploy_tasks(self): return lazyllm.pipeline(lambda : print('MyModule3 deployed!'))
...     def forward(self, input): return f'get {input}'
... 
>>> m = lazyllm.ActionModule(myfunc, lazyllm.pipeline(MyModule1(), MyModule2), MyModule3())
>>> print(m(1))
get 16
>>> 
>>> m.evalset([1, 2, 3])
>>> m.update()
MyModule2 deployed!
MyModule3 deployed!
>>> print(m.eval_result)
['get 16', 'get 24', 'get 32']

evalset(evalset, load_f=None, collect_f=<function ModuleBase.<lambda>>)

Set the evaluation set for the Module. Modules that have been set with an evaluation set will be evaluated during update or eval, and the evaluation results will be stored in the eval_result variable.

evalset(evalset, collect_f=lambda x: ...)→ None

Parameters:

  • evalset (list) ) –

    Evaluation set

  • collect_f (Callable) ) –

    Post-processing method for evaluation results, no post-processing by default.

evalset(evalset, load_f=None, collect_f=lambda x: ...)→ None

Parameters:

  • evalset (str) ) –

    Path to the evaluation set

  • load_f (Callable) ) –

    Method for loading the evaluation set, including parsing file formats and converting to a list

  • collect_f (Callable) ) –

    Post-processing method for evaluation results, no post-processing by default.

Examples:

>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().deploy_method(deploy.dummy)
>>> m.evalset([1, 2, 3])
>>> m.update()
INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}
>>> m.eval_result
["reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1}"]
Source code in lazyllm/module/module.py
class ActionModule(ModuleBase):
    """Used to wrap a Module around functions, modules, flows, Module, and other callable objects. The wrapped Module (including the Module within the flow) will become a submodule of this Module.

Args:
    action (Callable|list[Callable]): The object to be wrapped, which is one or a set of callable objects.

**Examples:**

```python
>>> import lazyllm
>>> def myfunc(input): return input + 1
... 
>>> class MyModule1(lazyllm.module.ModuleBase):
...     def forward(self, input): return input * 2
... 
>>> class MyModule2(lazyllm.module.ModuleBase):
...     def _get_deploy_tasks(self): return lazyllm.pipeline(lambda : print('MyModule2 deployed!'))
...     def forward(self, input): return input * 4
... 
>>> class MyModule3(lazyllm.module.ModuleBase):
...     def _get_deploy_tasks(self): return lazyllm.pipeline(lambda : print('MyModule3 deployed!'))
...     def forward(self, input): return f'get {input}'
... 
>>> m = lazyllm.ActionModule(myfunc, lazyllm.pipeline(MyModule1(), MyModule2), MyModule3())
>>> print(m(1))
get 16
>>> 
>>> m.evalset([1, 2, 3])
>>> m.update()
MyModule2 deployed!
MyModule3 deployed!
>>> print(m.eval_result)
['get 16', 'get 24', 'get 32']
```


<span style="font-size: 20px;">**`evalset(evalset, load_f=None, collect_f=<function ModuleBase.<lambda>>)`**</span>

Set the evaluation set for the Module. Modules that have been set with an evaluation set will be evaluated during ``update`` or ``eval``, and the evaluation results will be stored in the eval_result variable. 


<span style="font-size: 18px;">&ensp;**`evalset(evalset, collect_f=lambda x: ...)→ None `**</span>


Args:
    evalset (list) :Evaluation set
    collect_f (Callable) :Post-processing method for evaluation results, no post-processing by default.



<span style="font-size: 18px;">&ensp;**`evalset(evalset, load_f=None, collect_f=lambda x: ...)→ None`**</span>


Args:
    evalset (str) :Path to the evaluation set
    load_f (Callable) :Method for loading the evaluation set, including parsing file formats and converting to a list
    collect_f (Callable) :Post-processing method for evaluation results, no post-processing by default.

**Examples:**

```python
>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().deploy_method(deploy.dummy)
>>> m.evalset([1, 2, 3])
>>> m.update()
INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}
>>> m.eval_result
["reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1}"]
```


"""
    def __init__(self, *action, return_trace=False):
        super().__init__(return_trace=return_trace)
        if len(action) == 1 and isinstance(action, FlowBase): action = action[0]
        if isinstance(action, (tuple, list)):
            action = Pipeline(*action)
        assert isinstance(action, FlowBase), f'Invalid action type {type(action)}'
        self.action = action

    def forward(self, *args, **kw):
        return self.action(*args, **kw)

    @property
    def submodules(self):
        if isinstance(self.action, FlowBase):
            submodule = []
            self.action.for_each(lambda x: isinstance(x, ModuleBase), lambda x: submodule.append(x))
            return submodule
        return super().submodules

    def __repr__(self):
        return lazyllm.make_repr('Module', 'Action', subs=[repr(self.action)],
                                 name=self._module_name, return_trace=self._return_trace)

lazyllm.module.TrainableModule

Bases: UrlModule

Trainable module, all models (including LLM, Embedding, etc.) are served through TrainableModule

TrainableModule(base_model='', target_path='', *, stream=False, return_trace=False)

Parameters:

  • base_model (str, default: '' ) –

    Name or path of the base model. If the model is not available locally, it will be automatically downloaded from the model source.

  • target_path (str, default: '' ) –

    Path to save the fine-tuning task. Can be left empty if only performing inference.

  • source (str) –

    Model source, optional values are huggingface or. If not set, it will read the value from the environment variable LAZYLLM_MODEL_SOURCE.

  • stream (bool, default: False ) –

    Whether to output stream. If the inference engine used does not support streaming, this parameter will be ignored.

  • return_trace (bool, default: False ) –

    Whether to record the results in trace.

TrainableModule.trainset(v):

Set the training set for TrainableModule

Parameters:

  • v (str) –

    Path to the training/fine-tuning dataset.

Examples:

>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().finetune_method(finetune.dummy).trainset('/file/to/path').deploy_method(None).mode('finetune')
>>> m.update()
INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}

TrainableModule.train_method(v, **kw):

Set the training method for TrainableModule. Continued pre-training is not supported yet, expected to be available in the next version.

Parameters:

  • v (LazyLLMTrainBase) –

    Training method, options include train.auto etc.

  • kw (**dict) –

    Parameters required by the training method, corresponding to v.

TrainableModule.finetune_method(v, **kw):

Set the fine-tuning method and its parameters for TrainableModule.

Parameters:

  • v (LazyLLMFinetuneBase) –

    Fine-tuning method, options include finetune.auto / finetune.alpacalora / finetune.collie etc.

  • kw (**dict) –

    Parameters required by the fine-tuning method, corresponding to v.

Examples:

>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().finetune_method(finetune.dummy).deploy_method(None).mode('finetune')
>>> m.update()
INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}                

TrainableModule.deploy_method(v, **kw):

Set the deployment method and its parameters for TrainableModule.

Parameters:

  • v (LazyLLMDeployBase) –

    Deployment method, options include deploy.auto / deploy.lightllm / deploy.vllm etc.

  • kw (**dict) –

    Parameters required by the deployment method, corresponding to v.

Examples:

>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().deploy_method(deploy.dummy).mode('finetune')
>>> m.evalset([1, 2, 3])
>>> m.update()
INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}
>>> m.eval_result
["reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1}"]

TrainableModule.mode(v):

Set whether to execute training or fine-tuning during update for TrainableModule.

Parameters:

  • v (str) –

    Sets whether to execute training or fine-tuning during update, options are 'finetune' and 'train', default is 'finetune'.

Examples:

>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().finetune_method(finetune.dummy).deploy_method(None).mode('finetune')
>>> m.update()
INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}

eval(*, recursive=True) Evaluate the module (and all its submodules). This function takes effect after the module has set an evaluation set through evalset.

Parameters:

  • recursive (bool) ) –

    Whether to recursively evaluate all submodules, default is True.

evalset(evalset, load_f=None, collect_f=<function ModuleBase.<lambda>>)

Set the evaluation set for the Module. Modules that have been set with an evaluation set will be evaluated during update or eval, and the evaluation results will be stored in the eval_result variable.

evalset(evalset, collect_f=lambda x: ...)→ None

Parameters:

  • evalset (list) ) –

    Evaluation set

  • collect_f (Callable) ) –

    Post-processing method for evaluation results, no post-processing by default.

evalset(evalset, load_f=None, collect_f=lambda x: ...)→ None

Parameters:

  • evalset (str) ) –

    Path to the evaluation set

  • load_f (Callable) ) –

    Method for loading the evaluation set, including parsing file formats and converting to a list

  • collect_f (Callable) ) –

    Post-processing method for evaluation results, no post-processing by default.

Examples:

>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().deploy_method(deploy.dummy)
>>> m.evalset([1, 2, 3])
>>> m.update()
INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}
>>> m.eval_result
["reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1}"]

restart()

Restart the module and all its submodules.

Examples:

>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().deploy_method(deploy.dummy)
>>> m.restart()
>>> m(1)
"reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}"

start()

Deploy the module and all its submodules.

Examples:

import lazyllm
m = lazyllm.module.TrainableModule().deploy_method(deploy.dummy)
m.start()
m(1)
"reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}"
Source code in lazyllm/module/module.py
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
class TrainableModule(UrlModule):
    """Trainable module, all models (including LLM, Embedding, etc.) are served through TrainableModule

<span style="font-size: 20px;">**`TrainableModule(base_model='', target_path='', *, stream=False, return_trace=False)`**</span>


Args:
    base_model (str): Name or path of the base model. If the model is not available locally, it will be automatically downloaded from the model source.
    target_path (str): Path to save the fine-tuning task. Can be left empty if only performing inference.
    source (str): Model source, optional values are huggingface or. If not set, it will read the value from the environment variable LAZYLLM_MODEL_SOURCE.
    stream (bool): Whether to output stream. If the inference engine used does not support streaming, this parameter will be ignored.
    return_trace (bool): Whether to record the results in trace.

<span style="font-size: 20px;">**`TrainableModule.trainset(v):`**</span>

Set the training set for TrainableModule


Args:
    v (str): Path to the training/fine-tuning dataset.

**Examples:**

```python
>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().finetune_method(finetune.dummy).trainset('/file/to/path').deploy_method(None).mode('finetune')
>>> m.update()
INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}
```

<span style="font-size: 20px;">**`TrainableModule.train_method(v, **kw):`**</span>

Set the training method for TrainableModule. Continued pre-training is not supported yet, expected to be available in the next version.

Args:
    v (LazyLLMTrainBase): Training method, options include ``train.auto`` etc.
    kw (**dict): Parameters required by the training method, corresponding to v.

<span style="font-size: 20px;">**`TrainableModule.finetune_method(v, **kw):`**</span>

Set the fine-tuning method and its parameters for TrainableModule.

Args:
    v (LazyLLMFinetuneBase): Fine-tuning method, options include ``finetune.auto`` / ``finetune.alpacalora`` / ``finetune.collie`` etc.
    kw (**dict): Parameters required by the fine-tuning method, corresponding to v.

**Examples:**

```python
>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().finetune_method(finetune.dummy).deploy_method(None).mode('finetune')
>>> m.update()
INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}                
```

<span style="font-size: 20px;">**`TrainableModule.deploy_method(v, **kw):`**</span>

Set the deployment method and its parameters for TrainableModule.

Args:
    v (LazyLLMDeployBase): Deployment method, options include ``deploy.auto`` / ``deploy.lightllm`` / ``deploy.vllm`` etc.
    kw (**dict): Parameters required by the deployment method, corresponding to v.

**Examples:**

```python
>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().deploy_method(deploy.dummy).mode('finetune')
>>> m.evalset([1, 2, 3])
>>> m.update()
INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}
>>> m.eval_result
["reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1}"]
```                


<span style="font-size: 20px;">**`TrainableModule.mode(v):`**</span>

Set whether to execute training or fine-tuning during update for TrainableModule.

Args:
    v (str): Sets whether to execute training or fine-tuning during update, options are 'finetune' and 'train', default is 'finetune'.

**Examples:**

```python
>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().finetune_method(finetune.dummy).deploy_method(None).mode('finetune')
>>> m.update()
INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}
```    

<span style="font-size: 20px;">**`eval(*, recursive=True)`**</span>
Evaluate the module (and all its submodules). This function takes effect after the module has set an evaluation set through evalset.

Args:
    recursive (bool) :Whether to recursively evaluate all submodules, default is True.                         

<span style="font-size: 20px;">**`evalset(evalset, load_f=None, collect_f=<function ModuleBase.<lambda>>)`**</span>

Set the evaluation set for the Module. Modules that have been set with an evaluation set will be evaluated during ``update`` or ``eval``, and the evaluation results will be stored in the eval_result variable. 


<span style="font-size: 18px;">&ensp;**`evalset(evalset, collect_f=lambda x: ...)→ None `**</span>


Args:
    evalset (list) :Evaluation set
    collect_f (Callable) :Post-processing method for evaluation results, no post-processing by default.



<span style="font-size: 18px;">&ensp;**`evalset(evalset, load_f=None, collect_f=lambda x: ...)→ None`**</span>


Args:
    evalset (str) :Path to the evaluation set
    load_f (Callable) :Method for loading the evaluation set, including parsing file formats and converting to a list
    collect_f (Callable) :Post-processing method for evaluation results, no post-processing by default.

**Examples:**

```python
>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().deploy_method(deploy.dummy)
>>> m.evalset([1, 2, 3])
>>> m.update()
INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}
>>> m.eval_result
["reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1}"]
```

<span style="font-size: 20px;">**`restart() `**</span>

Restart the module and all its submodules.

**Examples:**

```python
>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().deploy_method(deploy.dummy)
>>> m.restart()
>>> m(1)
"reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}"
```

<span style="font-size: 20px;">**`start() `**</span> 

Deploy the module and all its submodules.

**Examples:**

```python
import lazyllm
m = lazyllm.module.TrainableModule().deploy_method(deploy.dummy)
m.start()
m(1)
"reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}"
```                                  
"""
    builder_keys = _TrainableModuleImpl.builder_keys

    def __init__(self, base_model: Option = '', target_path='', *, stream=False, return_trace=False):
        super().__init__(url=None, stream=stream, return_trace=return_trace)
        self._impl = _TrainableModuleImpl(base_model, target_path, stream,
                                          None, lazyllm.finetune.auto, lazyllm.deploy.auto)
        self._impl._add_father(self)
        self.prompt()

    base_model = property(lambda self: self._impl._base_model)
    target_path = property(lambda self: self._impl._target_path)
    _url_id = property(lambda self: self._impl._module_id)

    @property
    def series(self):
        return re.sub(r'\d+$', '', ModelManager.get_model_name(self.base_model).split('-')[0].upper())

    @property
    def type(self):
        return ModelManager.get_model_type(self.base_model).upper()

    @property
    def _deploy_type(self):
        if self._impl._deploy is not lazyllm.deploy.AutoDeploy:
            return self._impl._deploy
        elif self._impl._deployer:
            return type(self._impl._deployer)
        else:
            return lazyllm.deploy.AutoDeploy

    def wait(self):
        # TODO(wangzhihong): Split finetune launcher and deploy launcher; Only one deploy launcher is allowed.
        for launcher in self._impl._launchers:
            launcher.wait()

    # modify default value to ''
    def prompt(self, prompt=''):
        if self.base_model != '' and prompt == '' and ModelManager.get_model_type(self.base_model) != 'llm':
            prompt = None
        prompt = super(__class__, self).prompt(prompt)._prompt
        self._tools = getattr(prompt, "_tools", None)
        keys = ModelManager.get_model_prompt_keys(self.base_model)
        if keys:
            prompt._set_model_configs(**keys)
            for key in ["tool_start_token", "tool_args_token", "tool_end_token"]:
                if key in keys: setattr(self, f"_{key}", keys[key])
        return self

    def _loads_str(self, text: str) -> Union[str, Dict]:
        try:
            ret = json.loads(text)
            return self._loads_str(ret) if isinstance(ret, str) else ret
        except Exception:
            LOG.error(f"{text} is not a valid json string.")
            return text

    def _parse_arguments_with_args_token(self, output: str) -> tuple[str, dict]:
        items = output.split(self._tool_args_token)
        func_name = items[0].strip()
        if len(items) == 1:
            return func_name.split(self._tool_end_token)[0].strip() if getattr(self, "_tool_end_token", None)\
                else func_name, {}
        args = (items[1].split(self._tool_end_token)[0].strip() if getattr(self, "_tool_end_token", None)
                else items[1].strip())
        return func_name, self._loads_str(args) if isinstance(args, str) else args

    def _parse_arguments_without_args_token(self, output: str) -> tuple[str, dict]:
        items = output.split(self._tool_end_token)[0] if getattr(self, "_tool_end_token", None) else output
        func_name = ""
        args = {}
        try:
            items = json.loads(items.strip())
            func_name = items.get('name', '')
            args = items.get("parameters", items.get("arguments", {}))
        except Exception:
            LOG.error(f"tool calls info {items} parse error")

        return func_name, self._loads_str(args) if isinstance(args, str) else args

    def _parse_arguments_with_tools(self, output: Dict[str, Any], tools: List[str]) -> bool:
        func_name = ''
        args = {}
        is_tc = False
        tc = {}
        if output.get('name', '') in tools:
            is_tc = True
            func_name = output.get('name', '')
            args = output.get("parameters", output.get("arguments", {}))
            tc = {'name': func_name, 'arguments': self._loads_str(args) if isinstance(args, str) else args}
            return is_tc, tc
        return is_tc, tc

    def _parse_tool_start_token(self, output: str) -> tuple[str, List[Dict]]:
        tool_calls = []
        segs = output.split(self._tool_start_token)
        content = segs[0]
        for seg in segs[1:]:
            func_name, arguments = self._parse_arguments_with_args_token(seg.strip())\
                if getattr(self, "_tool_args_token", None)\
                else self._parse_arguments_without_args_token(seg.strip())
            if func_name:
                tool_calls.append({"name": func_name, "arguments": arguments})

        return content, tool_calls

    def _parse_tools(self, output: str) -> tuple[str, List[Dict]]:
        tool_calls = []
        tools = {tool['function']['name'] for tool in self._tools}
        lines = output.strip().split("\n")
        content = []
        is_tool_call = False
        for idx, line in enumerate(lines):
            if line.startswith("{") and idx > 0:
                func_name = lines[idx - 1].strip()
                if func_name in tools:
                    is_tool_call = True
                    if func_name == content[-1].strip():
                        content.pop()
                    arguments = "\n".join(lines[idx:]).strip()
                    tool_calls.append({'name': func_name, "arguments": arguments})
                    continue
            if "{" in line and 'name' in line:
                try:
                    items = json.loads(line.strip())
                    items = [items] if isinstance(items, dict) else items
                    if isinstance(items, list):
                        for item in items:
                            is_tool_call, tc = self._parse_arguments_with_tools(item, tools)
                            if is_tool_call:
                                tool_calls.append(tc)
                except Exception:
                    LOG.error(f"tool calls info {line} parse error")
            if not is_tool_call:
                content.append(line)
        content = "\n".join(content) if len(content) > 0 else ''
        return content, tool_calls

    def _extract_tool_calls(self, output: str) -> tuple[str, List[Dict]]:
        tool_calls = []
        content = ''
        if getattr(self, "_tool_start_token", None) and self._tool_start_token in output:
            content, tool_calls = self._parse_tool_start_token(output)
        elif self._tools:
            content, tool_calls = self._parse_tools(output)
        else:
            content = output

        return content, tool_calls

    def _build_response(self, content: str, tool_calls: List[Dict[str, str]]) -> str:
        tc = [{'id': str(uuid.uuid4().hex), 'type': 'function', 'function': tool_call} for tool_call in tool_calls]
        if content and tc:
            return globals["tool_delimiter"].join([content, json.dumps(tc, ensure_ascii=False)])
        elif not content and tc:
            return globals["tool_delimiter"] + json.dumps(tc, ensure_ascii=False)
        else:
            return content

    def _extract_and_format(self, output: str) -> str:
        """
        1.extract tool calls information;
            a. If 'tool_start_token' exists, the boundary of tool_calls can be found according to 'tool_start_token',
               and then the function name and arguments of tool_calls can be extracted according to 'tool_args_token'
               and 'tool_end_token'.
            b. If 'tool_start_token' does not exist, the text is segmented using '\n' according to the incoming tools
               information, and then processed according to the rules.
        """
        content, tool_calls = self._extract_tool_calls(output)
        return self._build_response(content, tool_calls)

    def __repr__(self):
        return lazyllm.make_repr('Module', 'Trainable', mode=self._impl._mode, basemodel=self.base_model,
                                 target=self.target_path, name=self._module_name,
                                 stream=self._stream, return_trace=self._return_trace)

    def __getattr__(self, key):
        if key in self.__class__.builder_keys:
            return functools.partial(getattr(self._impl, key), _return_value=self)
        raise AttributeError(f'{__class__} object has no attribute {key}')

    def share(self, prompt=None, format=None):
        new = copy.copy(self)
        new._set_mid()
        if prompt is not None: new.prompt(prompt)
        if format is not None: new.formatter(format)
        new._impl._add_father(new)
        return new

lazyllm.module.UrlModule

Bases: ModuleBase, UrlTemplate

The URL obtained from deploying the ServerModule can be wrapped into a Module. When calling __call__ , it will access the service.

Parameters:

  • url (str, default: '' ) –

    The URL of the service to be wrapped.

  • stream (bool, default: False ) –

    Whether to request and output in streaming mode, default is non-streaming.

  • return_trace (bool, default: False ) –

    Whether to record the results in trace, default is False.

Examples:

>>> import lazyllm
>>> def demo(input): return input * 2
... 
>>> s = lazyllm.ServerModule(demo, launcher=lazyllm.launchers.empty(sync=False))
>>> s.start()
INFO:     Uvicorn running on http://0.0.0.0:35485
>>> u = lazyllm.UrlModule(url=s._url)
>>> print(u(1))
2
Source code in lazyllm/module/module.py
class UrlModule(ModuleBase, UrlTemplate):
    """The URL obtained from deploying the ServerModule can be wrapped into a Module. When calling ``__call__`` , it will access the service.

Args:
    url (str): The URL of the service to be wrapped.
    stream (bool): Whether to request and output in streaming mode, default is non-streaming.
    return_trace (bool): Whether to record the results in trace, default is False.


Examples:
    >>> import lazyllm
    >>> def demo(input): return input * 2
    ... 
    >>> s = lazyllm.ServerModule(demo, launcher=lazyllm.launchers.empty(sync=False))
    >>> s.start()
    INFO:     Uvicorn running on http://0.0.0.0:35485
    >>> u = lazyllm.UrlModule(url=s._url)
    >>> print(u(1))
    2
    """
    def __init__(self, *, url='', stream=False, return_trace=False):
        super().__init__(return_trace=return_trace)
        self.__url = url
        self._stream = stream
        # Set for request by specific deploy:
        UrlTemplate.__init__(self)
        self._extract_result_func = lambda x: x
        self._stream_parse_parameters = {}
        self._stream_url_suffix = ''
        __class__.prompt(self)
        __class__.formatter(self)

    @property
    def _url(self):
        if redis_client:
            try:
                while not self.__url:
                    self.__url = get_redis(self._url_id)
                    if self.__url: break
                    time.sleep(lazyllm.config["redis_recheck_delay"])
            except Exception as e:
                LOG.error(f"Error accessing Redis: {e}")
                raise
        return self.__url

    def _set_url(self, url):
        if redis_client:
            redis_client.set(self._module_id, url)
        LOG.debug(f'url: {url}')
        self.__url = url

    # Cannot modify or add any attrubute of self
    # prompt keys (excluding history) are in __input (ATTENTION: dict, not kwargs)
    # deploy parameters keys are in **kw
    def forward(self, __input=package(), *, llm_chat_history=None, tools=None, stream_output=False, **kw):  # noqa C901
        """Defines the computation steps to be executed each time. All subclasses of ModuleBase need to override this function.



Examples:
    >>> import lazyllm
    >>> class MyModule(lazyllm.module.ModuleBase):
    ...    def forward(self, input):
    ...        return input + 1
    ...
    >>> MyModule()(1)
    2
    """
        assert self._url is not None, f'Please start {self.__class__} first'
        stream_output = stream_output or self._stream
        url = self._url

        files = []
        if self.template_message and globals['global_parameters'].get("lazyllm-files"):
            files = globals['global_parameters']["lazyllm-files"]['files']
        query = __input
        __input = self._prompt.generate_prompt(query, llm_chat_history, tools)
        headers = {'Content-Type': 'application/json'}

        if isinstance(self, ServerModule):
            assert llm_chat_history is None and tools is None
            headers['Global-Parameters'] = encode_request(globals._data)
            headers['Session-ID'] = encode_request(globals._sid)
            data = encode_request((__input, kw))
        elif self.template_message:
            data = self._modify_parameters(copy.deepcopy(self.template_message), kw)
            assert 'inputs' in self.keys_name_handle
            data[self.keys_name_handle['inputs']] = __input
            if 'image' in self.keys_name_handle and files:
                data[self.keys_name_handle['image']] = files
            elif 'audio' in self.keys_name_handle and files:
                data[self.keys_name_handle['audio']] = files
        else:
            if len(kw) != 0: raise NotImplementedError(f'kwargs ({kw}) are not allowed in UrlModule')
            data = __input

        if stream_output:
            if self._stream_url_suffix and not url.endswith(self._stream_url_suffix):
                url += self._stream_url_suffix
            if "stream" in data: data['stream'] = stream_output
        parse_parameters = self._stream_parse_parameters if stream_output else {"delimiter": b"<|lazyllm_delimiter|>"}

        token = getattr(self, "_tool_start_token", '')
        cache = ""

        # context bug with httpx, so we use requests
        with requests.post(url, json=data, stream=True, headers=headers) as r:
            if r.status_code == 200:
                messages = ''
                for line in r.iter_lines(**parse_parameters):
                    if not line: continue
                    try:
                        line = pickle.loads(codecs.decode(line, "base64"))
                    except Exception:
                        line = line.decode('utf-8')
                    chunk = self._prompt.get_response(self._extract_result_func(line))
                    if isinstance(chunk, str):
                        if chunk.startswith(messages): chunk = chunk[len(messages):]
                        messages += chunk
                    else:
                        messages = chunk

                    if not stream_output: continue
                    if not cache:
                        if token.startswith(chunk.lstrip('\n') if not token.startswith('\n') else chunk) \
                           or token in chunk: cache = chunk
                        else: FileSystemQueue().enqueue(chunk)
                    elif token in cache:
                        stream_output = False
                        if not cache.startswith(token): FileSystemQueue().enqueue(cache.split(token)[0])
                    else:
                        cache += chunk
                        if not (token.startswith(cache.lstrip('\n') if not token.startswith('\n') else cache)
                                or token in cache):
                            FileSystemQueue().enqueue(cache)
                            cache = ""
            else:
                raise requests.RequestException('\n'.join([c.decode('utf-8') for c in r.iter_content(None)]))
            return self._formatter.format(self._extract_and_format(messages))

    def prompt(self, prompt=None):
        if prompt is None:
            self._prompt = EmptyPrompter()
        elif isinstance(prompt, PrompterBase):
            self._prompt = prompt
        elif isinstance(prompt, (str, dict)):
            self._prompt = ChatPrompter(prompt)
        return self

    def _extract_and_format(self, output: str) -> str:
        return output

    def formatter(self, format: FormatterBase = None):
        if isinstance(format, FormatterBase):
            self._formatter = format
        elif format is None:
            self._formatter = EmptyFormatter()
        else:
            raise TypeError("format must be a FormatterBase")
        return self

    def _modify_parameters(self, paras, kw):
        for key, value in paras.items():
            if key == self.keys_name_handle['inputs']:
                continue
            elif isinstance(value, dict):
                if key in kw:
                    assert set(kw[key].keys()).issubset(set(value.keys()))
                    value.update(kw.pop(key))
                for k in value.keys():
                    if k in kw: value[k] = kw.pop(k)
            else:
                if key in kw: paras[key] = kw.pop(key)
        return paras

    def set_default_parameters(self, **kw):
        self._modify_parameters(self.template_message, kw)

    def __repr__(self):
        return lazyllm.make_repr('Module', 'Url', name=self._module_name, url=self._url,
                                 stream=self._stream, return_trace=self._return_trace)

forward(__input=package(), *, llm_chat_history=None, tools=None, stream_output=False, **kw)

Defines the computation steps to be executed each time. All subclasses of ModuleBase need to override this function.

Examples:

>>> import lazyllm
>>> class MyModule(lazyllm.module.ModuleBase):
...    def forward(self, input):
...        return input + 1
...
>>> MyModule()(1)
2
Source code in lazyllm/module/module.py
    def forward(self, __input=package(), *, llm_chat_history=None, tools=None, stream_output=False, **kw):  # noqa C901
        """Defines the computation steps to be executed each time. All subclasses of ModuleBase need to override this function.



Examples:
    >>> import lazyllm
    >>> class MyModule(lazyllm.module.ModuleBase):
    ...    def forward(self, input):
    ...        return input + 1
    ...
    >>> MyModule()(1)
    2
    """
        assert self._url is not None, f'Please start {self.__class__} first'
        stream_output = stream_output or self._stream
        url = self._url

        files = []
        if self.template_message and globals['global_parameters'].get("lazyllm-files"):
            files = globals['global_parameters']["lazyllm-files"]['files']
        query = __input
        __input = self._prompt.generate_prompt(query, llm_chat_history, tools)
        headers = {'Content-Type': 'application/json'}

        if isinstance(self, ServerModule):
            assert llm_chat_history is None and tools is None
            headers['Global-Parameters'] = encode_request(globals._data)
            headers['Session-ID'] = encode_request(globals._sid)
            data = encode_request((__input, kw))
        elif self.template_message:
            data = self._modify_parameters(copy.deepcopy(self.template_message), kw)
            assert 'inputs' in self.keys_name_handle
            data[self.keys_name_handle['inputs']] = __input
            if 'image' in self.keys_name_handle and files:
                data[self.keys_name_handle['image']] = files
            elif 'audio' in self.keys_name_handle and files:
                data[self.keys_name_handle['audio']] = files
        else:
            if len(kw) != 0: raise NotImplementedError(f'kwargs ({kw}) are not allowed in UrlModule')
            data = __input

        if stream_output:
            if self._stream_url_suffix and not url.endswith(self._stream_url_suffix):
                url += self._stream_url_suffix
            if "stream" in data: data['stream'] = stream_output
        parse_parameters = self._stream_parse_parameters if stream_output else {"delimiter": b"<|lazyllm_delimiter|>"}

        token = getattr(self, "_tool_start_token", '')
        cache = ""

        # context bug with httpx, so we use requests
        with requests.post(url, json=data, stream=True, headers=headers) as r:
            if r.status_code == 200:
                messages = ''
                for line in r.iter_lines(**parse_parameters):
                    if not line: continue
                    try:
                        line = pickle.loads(codecs.decode(line, "base64"))
                    except Exception:
                        line = line.decode('utf-8')
                    chunk = self._prompt.get_response(self._extract_result_func(line))
                    if isinstance(chunk, str):
                        if chunk.startswith(messages): chunk = chunk[len(messages):]
                        messages += chunk
                    else:
                        messages = chunk

                    if not stream_output: continue
                    if not cache:
                        if token.startswith(chunk.lstrip('\n') if not token.startswith('\n') else chunk) \
                           or token in chunk: cache = chunk
                        else: FileSystemQueue().enqueue(chunk)
                    elif token in cache:
                        stream_output = False
                        if not cache.startswith(token): FileSystemQueue().enqueue(cache.split(token)[0])
                    else:
                        cache += chunk
                        if not (token.startswith(cache.lstrip('\n') if not token.startswith('\n') else cache)
                                or token in cache):
                            FileSystemQueue().enqueue(cache)
                            cache = ""
            else:
                raise requests.RequestException('\n'.join([c.decode('utf-8') for c in r.iter_content(None)]))
            return self._formatter.format(self._extract_and_format(messages))

lazyllm.module.ServerModule

Bases: UrlModule

Using FastAPI, any callable object can be wrapped into an API service, allowing the simultaneous launch of one main service and multiple satellite services.

Parameters:

  • m (Callable) –

    The function to be wrapped as a service. It can be a function or a functor. When launching satellite services, it needs to be an object implementing __call__ (a functor).

  • pre (Callable, default: None ) –

    Preprocessing function executed in the service process. It can be a function or a functor, default is None.

  • post (Callable, default: None ) –

    Postprocessing function executed in the service process. It can be a function or a functor, default is None.

  • stream (bool, default: False ) –

    Whether to request and output in streaming mode, default is non-streaming.

  • return_trace (bool, default: False ) –

    Whether to record the results in trace, default is False.

  • port (int, default: None ) –

    Specifies the port after the service is deployed. The default is None, which will generate a random port.

  • launcher (LazyLLMLaunchersBase, default: None ) –

    Used to select the compute node for service execution, default is launchers.remote .

Examples:

>>> def demo(input): return input * 2
... 
>>> s = lazyllm.ServerModule(demo, launcher=launchers.empty(sync=False))
>>> s.start()
INFO:     Uvicorn running on http://0.0.0.0:35485
>>> print(s(1))
2
>>> class MyServe(object):
...     def __call__(self, input):
...         return 2 * input
...     
...     @lazyllm.FastapiApp.post
...     def server1(self, input):
...         return f'reply for {input}'
...
...     @lazyllm.FastapiApp.get
...     def server2(self):
...        return f'get method'
...
>>> m = lazyllm.ServerModule(MyServe(), launcher=launchers.empty(sync=False))
>>> m.start()
>>> print(m(1))
INFO:     Uvicorn running on http://0.0.0.0:32028
>>> print(m(1))
2  

evalset(evalset, load_f=None, collect_f=<function ModuleBase.<lambda>>)

Set the evaluation set for the Module. Modules that have been set with an evaluation set will be evaluated during update or eval, and the evaluation results will be stored in the eval_result variable.

evalset(evalset, collect_f=lambda x: ...)→ None

Parameters:

  • evalset (list) ) –

    Evaluation set

  • collect_f (Callable) ) –

    Post-processing method for evaluation results, no post-processing by default.

evalset(evalset, load_f=None, collect_f=lambda x: ...)→ None

Parameters:

  • evalset (str) ) –

    Path to the evaluation set

  • load_f (Callable) ) –

    Method for loading the evaluation set, including parsing file formats and converting to a list

  • collect_f (Callable) ) –

    Post-processing method for evaluation results, no post-processing by default.

Examples:

>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().deploy_method(deploy.dummy)
>>> m.evalset([1, 2, 3])
>>> m.update()
INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}
>>> m.eval_result
["reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1}"]

restart()

Restart the module and all its submodules.

Examples:

>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().deploy_method(deploy.dummy)
>>> m.restart()
>>> m(1)
"reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}"

start()

Deploy the module and all its submodules.

Examples:

import lazyllm
m = lazyllm.module.TrainableModule().deploy_method(deploy.dummy)
m.start()
m(1)
"reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}"
Source code in lazyllm/module/module.py
class ServerModule(UrlModule):
    """Using FastAPI, any callable object can be wrapped into an API service, allowing the simultaneous launch of one main service and multiple satellite services.

Args:
    m (Callable): The function to be wrapped as a service. It can be a function or a functor. When launching satellite services, it needs to be an object implementing ``__call__`` (a functor).
    pre (Callable): Preprocessing function executed in the service process. It can be a function or a functor, default is ``None``.
    post (Callable): Postprocessing function executed in the service process. It can be a function or a functor, default is ``None``.
    stream (bool): Whether to request and output in streaming mode, default is non-streaming.
    return_trace (bool): Whether to record the results in trace, default is ``False``.
    port (int): Specifies the port after the service is deployed. The default is ``None``, which will generate a random port.
    launcher (LazyLLMLaunchersBase): Used to select the compute node for service execution, default is ``launchers.remote`` .

**Examples:**

```python
>>> def demo(input): return input * 2
... 
>>> s = lazyllm.ServerModule(demo, launcher=launchers.empty(sync=False))
>>> s.start()
INFO:     Uvicorn running on http://0.0.0.0:35485
>>> print(s(1))
2
```

```python
>>> class MyServe(object):
...     def __call__(self, input):
...         return 2 * input
...     
...     @lazyllm.FastapiApp.post
...     def server1(self, input):
...         return f'reply for {input}'
...
...     @lazyllm.FastapiApp.get
...     def server2(self):
...        return f'get method'
...
>>> m = lazyllm.ServerModule(MyServe(), launcher=launchers.empty(sync=False))
>>> m.start()
>>> print(m(1))
INFO:     Uvicorn running on http://0.0.0.0:32028
>>> print(m(1))
2  
```

<span style="font-size: 20px;">**`evalset(evalset, load_f=None, collect_f=<function ModuleBase.<lambda>>)`**</span>

Set the evaluation set for the Module. Modules that have been set with an evaluation set will be evaluated during ``update`` or ``eval``, and the evaluation results will be stored in the eval_result variable. 


<span style="font-size: 18px;">&ensp;**`evalset(evalset, collect_f=lambda x: ...)→ None `**</span>


Args:
    evalset (list) :Evaluation set
    collect_f (Callable) :Post-processing method for evaluation results, no post-processing by default.



<span style="font-size: 18px;">&ensp;**`evalset(evalset, load_f=None, collect_f=lambda x: ...)→ None`**</span>


Args:
    evalset (str) :Path to the evaluation set
    load_f (Callable) :Method for loading the evaluation set, including parsing file formats and converting to a list
    collect_f (Callable) :Post-processing method for evaluation results, no post-processing by default.

**Examples:**

```python
>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().deploy_method(deploy.dummy)
>>> m.evalset([1, 2, 3])
>>> m.update()
INFO: (lazyllm.launcher) PID: dummy finetune!, and init-args is {}
>>> m.eval_result
["reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1}", "reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1}"]
```

<span style="font-size: 20px;">**`restart() `**</span>

Restart the module and all its submodules.

**Examples:**

```python
>>> import lazyllm
>>> m = lazyllm.module.TrainableModule().deploy_method(deploy.dummy)
>>> m.restart()
>>> m(1)
"reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}"
```

<span style="font-size: 20px;">**`start() `**</span> 

Deploy the module and all its submodules.

**Examples:**

```python
import lazyllm
m = lazyllm.module.TrainableModule().deploy_method(deploy.dummy)
m.start()
m(1)
"reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1}"
```                                                                    
"""
    def __init__(self, m, pre=None, post=None, stream=False, return_trace=False, port=None, launcher=None):
        assert stream is False or return_trace is False, 'Module with stream output has no trace'
        assert (post is None) or (stream is False), 'Stream cannot be true when post-action exists'
        super().__init__(url=None, stream=stream, return_trace=return_trace)
        self._set_template(
            copy.deepcopy(lazyllm.deploy.RelayServer.message_format),
            lazyllm.deploy.RelayServer.keys_name_handle,
            copy.deepcopy(lazyllm.deploy.RelayServer.default_headers),
        )
        self._impl = _ServerModuleImpl(m, pre, post, launcher, port, father=self)

    _url_id = property(lambda self: self._impl._module_id)

    def wait(self):
        self._impl._launcher.wait()

    def __repr__(self):
        return lazyllm.make_repr('Module', 'Server', subs=[repr(self._impl._m)], name=self._module_name,
                                 stream=self._stream, return_trace=self._return_trace)

lazyllm.module.TrialModule

Bases: object

Parameter grid search module will traverse all its submodules, collect all searchable parameters, and iterate over these parameters for fine-tuning, deployment, and evaluation.

Parameters:

  • m (Callable) –

    The submodule whose parameters will be grid-searched. Fine-tuning, deployment, and evaluation will be based on this module.

Examples:

>>> import lazyllm
>>> from lazyllm import finetune, deploy
>>> m = lazyllm.TrainableModule('b1', 't').finetune_method(finetune.dummy, **dict(a=lazyllm.Option(['f1', 'f2'])))
>>> m.deploy_method(deploy.dummy).mode('finetune').prompt(None)
>>> s = lazyllm.ServerModule(m, post=lambda x, ori: f'post2({x})')
>>> s.evalset([1, 2, 3])
>>> t = lazyllm.TrialModule(s)
>>> t.update()
>>>
dummy finetune!, and init-args is {a: f1}
dummy finetune!, and init-args is {a: f2}
[["post2(reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1})", "post2(reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1})", "post2(reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1})"], ["post2(reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1})", "post2(reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1})", "post2(reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1})"]]
Source code in lazyllm/module/trialmodule.py
class TrialModule(object):
    """Parameter grid search module will traverse all its submodules, collect all searchable parameters, and iterate over these parameters for fine-tuning, deployment, and evaluation.

Args:
    m (Callable): The submodule whose parameters will be grid-searched. Fine-tuning, deployment, and evaluation will be based on this module.


Examples:
    >>> import lazyllm
    >>> from lazyllm import finetune, deploy
    >>> m = lazyllm.TrainableModule('b1', 't').finetune_method(finetune.dummy, **dict(a=lazyllm.Option(['f1', 'f2'])))
    >>> m.deploy_method(deploy.dummy).mode('finetune').prompt(None)
    >>> s = lazyllm.ServerModule(m, post=lambda x, ori: f'post2({x})')
    >>> s.evalset([1, 2, 3])
    >>> t = lazyllm.TrialModule(s)
    >>> t.update()
    >>>
    dummy finetune!, and init-args is {a: f1}
    dummy finetune!, and init-args is {a: f2}
    [["post2(reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1})", "post2(reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1})", "post2(reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1})"], ["post2(reply for 1, and parameters is {'do_sample': False, 'temperature': 0.1})", "post2(reply for 2, and parameters is {'do_sample': False, 'temperature': 0.1})", "post2(reply for 3, and parameters is {'do_sample': False, 'temperature': 0.1})"]]
    """
    def __init__(self, m):
        self.m = m

    @staticmethod
    def work(m, q):
        # update option at module.update()
        m = copy.deepcopy(m)
        m.update()
        q.put(m.eval_result)

    def update(self):
        options = get_options(self.m)
        q = multiprocessing.Queue()
        ps = []
        for _ in OptionIter(options, get_options):
            p = ForkProcess(target=TrialModule.work, args=(self.m, q), sync=True)
            ps.append(p)
            p.start()
            time.sleep(1)
        [p.join() for p in ps]
        result = [q.get() for p in ps]
        LOG.info(f'{result}')

lazyllm.module.OnlineChatModule

Used to manage and create access modules for large model platforms currently available on the market. Currently, it supports openai, sensenova, glm, kimi, qwen and doubao (since the platform is not currently being developed for individual users, access is not currently supported). For how to obtain the platform's API key, please visit Getting Started

Parameters:

  • model (str) –

    Specify the model to access, default is gpt-3.5-turbo(openai) / SenseChat-5(sensenova) / glm-4(glm) / moonshot-v1-8k(kimi) / qwen-plus(qwen) .

  • source (str) –

    Specify the type of module to create. Options include openai / sensenova / glm / kimi / qwen / doubao (not yet supported) .

  • base_url (str) –

    Specify the base link of the platform to be accessed. The default is the official link.

  • system_prompt (str) –

    Specify the requested system prompt. The default is the official system prompt.

  • stream (bool) –

    Whether to request and output in streaming mode, default is streaming.

  • return_trace (bool) –

    Whether to record the results in trace, default is False.

Examples:

>>> import lazyllm
>>> m = lazyllm.OnlineChatModule(source="sensenova", stream=True)
>>> query = "Hello!"
>>> resp = m(query)
>>> for r in resp:
...     print(r)
...
{'content': '你好'}
{'content': '!'}
{'content': '有什么'}
{'content': '我可以'}
{'content': '帮助'}
{'content': '你的'}
{'content': '吗'}
{'content': '?'}
{'content': ''}
>>> m = lazyllm.OnlineChatModule(source="sensenova", model="nova-ptc-s-v2", stream=False)
>>> train_file = "toy_chat_fine_tuning.jsonl"
>>> m.set_train_tasks(train_file=train_file, upload_url="https://file.sensenova.cn/v1/files")
>>> m._get_train_tasks()
Num examples:
First example:
{'role': 'system', 'content': 'Marv is a factual chatbot that is also sarcastic.'}
{'role': 'user', 'content': "What's the capital of France?"}
{'role': 'assistant', 'content': "Paris, as if everyone doesn't know that already."}
No errors found
train file id: 7193d9a3-8b6e-4332-99cc-724dec75d9dd
toy_chat_fine_tuning.jsonl upload success! file id is d632e591-f668-43a1-b5bf-49418e9c0fec
fine tuning job ft-85f7bc96034441f2b64f9a5fff5d5b9c created, status: SUBMITTED
fine tuning job ft-85f7bc96034441f2b64f9a5fff5d5b9c status: RUNNING
...
fine tuning job ft-85f7bc96034441f2b64f9a5fff5d5b9c status: SUCCEEDED
fine tuned model: nova-ptc-s-v2:ft-fee492082cbe4a6d880d396f34f1bc50 finished  
>>> m._get_deploy_tasks()
deployment c5aaf3bf-ef9b-4797-8c15-12ff04ed5372 created, status: SUBMITTED
...
deployment c5aaf3bf-ef9b-4797-8c15-12ff04ed5372 status: PENDING
...
deployment c5aaf3bf-ef9b-4797-8c15-12ff04ed5372 status: RUNNING
deployment c5aaf3bf-ef9b-4797-8c15-12ff04ed5372 finished
Source code in lazyllm/module/onlineChatModule/onlineChatModule.py
class OnlineChatModule(metaclass=_ChatModuleMeta):
    """Used to manage and create access modules for large model platforms currently available on the market. Currently, it supports openai, sensenova, glm, kimi, qwen and doubao (since the platform is not currently being developed for individual users, access is not currently supported). For how to obtain the platform's API key, please visit [Getting Started](/#platform)

Args:
    model (str): Specify the model to access, default is ``gpt-3.5-turbo(openai)`` / ``SenseChat-5(sensenova)`` / ``glm-4(glm)`` / ``moonshot-v1-8k(kimi)`` / ``qwen-plus(qwen)`` .
    source (str): Specify the type of module to create. Options include  ``openai`` /  ``sensenova`` /  ``glm`` /  ``kimi`` /  ``qwen`` / ``doubao (not yet supported)`` .
    base_url (str): Specify the base link of the platform to be accessed. The default is the official link.
    system_prompt (str): Specify the requested system prompt. The default is the official system prompt.
    stream (bool): Whether to request and output in streaming mode, default is streaming.
    return_trace (bool): Whether to record the results in trace, default is False.      


Examples:
    >>> import lazyllm
    >>> m = lazyllm.OnlineChatModule(source="sensenova", stream=True)
    >>> query = "Hello!"
    >>> resp = m(query)
    >>> for r in resp:
    ...     print(r)
    ...
    {'content': '你好'}
    {'content': '!'}
    {'content': '有什么'}
    {'content': '我可以'}
    {'content': '帮助'}
    {'content': '你的'}
    {'content': '吗'}
    {'content': '?'}
    {'content': ''}
    >>> m = lazyllm.OnlineChatModule(source="sensenova", model="nova-ptc-s-v2", stream=False)
    >>> train_file = "toy_chat_fine_tuning.jsonl"
    >>> m.set_train_tasks(train_file=train_file, upload_url="https://file.sensenova.cn/v1/files")
    >>> m._get_train_tasks()
    Num examples:
    First example:
    {'role': 'system', 'content': 'Marv is a factual chatbot that is also sarcastic.'}
    {'role': 'user', 'content': "What's the capital of France?"}
    {'role': 'assistant', 'content': "Paris, as if everyone doesn't know that already."}
    No errors found
    train file id: 7193d9a3-8b6e-4332-99cc-724dec75d9dd
    toy_chat_fine_tuning.jsonl upload success! file id is d632e591-f668-43a1-b5bf-49418e9c0fec
    fine tuning job ft-85f7bc96034441f2b64f9a5fff5d5b9c created, status: SUBMITTED
    fine tuning job ft-85f7bc96034441f2b64f9a5fff5d5b9c status: RUNNING
    ...
    fine tuning job ft-85f7bc96034441f2b64f9a5fff5d5b9c status: SUCCEEDED
    fine tuned model: nova-ptc-s-v2:ft-fee492082cbe4a6d880d396f34f1bc50 finished  
    >>> m._get_deploy_tasks()
    deployment c5aaf3bf-ef9b-4797-8c15-12ff04ed5372 created, status: SUBMITTED
    ...
    deployment c5aaf3bf-ef9b-4797-8c15-12ff04ed5372 status: PENDING
    ...
    deployment c5aaf3bf-ef9b-4797-8c15-12ff04ed5372 status: RUNNING
    deployment c5aaf3bf-ef9b-4797-8c15-12ff04ed5372 finished
    """
    MODELS = {'openai': OpenAIModule,
              'sensenova': SenseNovaModule,
              'glm': GLMModule,
              'kimi': KimiModule,
              'qwen': QwenModule,
              'doubao': DoubaoModule}

    @staticmethod
    def _encapsulate_parameters(base_url: str,
                                model: str,
                                stream: bool,
                                return_trace: bool,
                                **kwargs) -> Dict[str, Any]:
        params = {"stream": stream, "return_trace": return_trace}
        if base_url is not None:
            params['base_url'] = base_url
        if model is not None:
            params['model'] = model
        params.update(kwargs)

        return params

    def __new__(self,
                model: str = None,
                source: str = None,
                base_url: str = None,
                stream: bool = True,
                return_trace: bool = False,
                **kwargs):
        if model in OnlineChatModule.MODELS.keys() and source is None: source, model = model, source

        params = OnlineChatModule._encapsulate_parameters(base_url, model, stream, return_trace, **kwargs)

        if source is None:
            for source in OnlineChatModule.MODELS.keys():
                if lazyllm.config[f'{source}_api_key']: break
            else:
                raise KeyError(f"No api_key is configured for any of the models {OnlineChatModule.MODELS.keys()}.")

        assert source in OnlineChatModule.MODELS.keys(), f"Unsupported source: {source}"
        return OnlineChatModule.MODELS[source](**params)

lazyllm.module.OnlineEmbeddingModule

Used to manage and create online Embedding service modules currently on the market, currently supporting openai, sensenova, glm, qwen.

Parameters:

  • source (str) –

    Specify the type of module to create. Options are openai / sensenova / glm / qwen.

  • embed_url (str) –

    Specify the base link of the platform to be accessed. The default is the official link.

  • embed_mode_name (str) –

    Specify the model to access, default is text-embedding-ada-002(openai) / nova-embedding-stable(sensenova) / embedding-2(glm) / text-embedding-v1(qwen)

Examples:

>>> import lazyllm
>>> m = lazyllm.OnlineEmbeddingModule(source="sensenova")
>>> emb = m("hello world")
>>> print(f"emb: {emb}")
emb: [0.0010528564, 0.0063285828, 0.0049476624, -0.012008667, ..., -0.009124756, 0.0032043457, -0.051696777]
Source code in lazyllm/module/onlineEmbedding/onlineEmbeddingModule.py
class OnlineEmbeddingModule(metaclass=__EmbedModuleMeta):
    """Used to manage and create online Embedding service modules currently on the market, currently supporting openai, sensenova, glm, qwen.

Args:
    source (str): Specify the type of module to create. Options are  ``openai`` /  ``sensenova`` /  ``glm`` /  ``qwen``.
    embed_url (str): Specify the base link of the platform to be accessed. The default is the official link.
    embed_mode_name (str): Specify the model to access, default is ``text-embedding-ada-002(openai)`` / ``nova-embedding-stable(sensenova)`` / ``embedding-2(glm)`` / ``text-embedding-v1(qwen)`` 


Examples:
    >>> import lazyllm
    >>> m = lazyllm.OnlineEmbeddingModule(source="sensenova")
    >>> emb = m("hello world")
    >>> print(f"emb: {emb}")
    emb: [0.0010528564, 0.0063285828, 0.0049476624, -0.012008667, ..., -0.009124756, 0.0032043457, -0.051696777]
    """
    MODELS = {'openai': OpenAIEmbedding,
              'sensenova': SenseNovaEmbedding,
              'glm': GLMEmbedding,
              'qwen': QwenEmbedding}

    @staticmethod
    def _encapsulate_parameters(embed_url: str,
                                embed_model_name: str) -> Dict[str, Any]:
        params = {}
        if embed_url is not None:
            params["embed_url"] = embed_url
        if embed_model_name is not None:
            params["embed_model_name"] = embed_model_name
        return params

    def __new__(self,
                source: str = None,
                embed_url: str = None,
                embed_model_name: str = None):
        params = OnlineEmbeddingModule._encapsulate_parameters(embed_url, embed_model_name)

        if source is None:
            for source in OnlineEmbeddingModule.MODELS.keys():
                if lazyllm.config[f'{source}_api_key']: break
            else:
                raise KeyError(f"No api_key is configured for any of the models {OnlineEmbeddingModule.MODELS.keys()}.")

        assert source in OnlineEmbeddingModule.MODELS.keys(), f"Unsupported source: {source}"
        return OnlineEmbeddingModule.MODELS[source](**params)

lazyllm.module.OnlineChatModuleBase

Bases: ModuleBase

OnlineChatModuleBase is a public component that manages the LLM interface for open platforms, and has key capabilities such as training, deployment, and inference. OnlineChatModuleBase itself does not support direct instantiation; it requires subclasses to inherit from this class and implement interfaces related to fine-tuning, such as uploading files, creating fine-tuning tasks, querying fine-tuning tasks, and deployment-related interfaces, such as creating deployment services and querying deployment tasks. If you need to support the capabilities of a new open platform's LLM, please extend your custom class from OnlineChatModuleBase:

  1. Consider post-processing the returned results based on the parameters returned by the new platform's model. If the model's return format is consistent with OpenAI, no processing is necessary.

  2. If the new platform supports model fine-tuning, you must also inherit from the FileHandlerBase class. This class primarily validates file formats and converts .jsonl formatted data into a format supported by the model for subsequent training.

  3. If the new platform supports model fine-tuning, you must implement interfaces for file upload, creating fine-tuning services, and querying fine-tuning services. Even if the new platform does not require deployment of the fine-tuned model, please implement dummy interfaces for creating and querying deployment services.

  4. If the new platform supports model fine-tuning, provide a list of models that support fine-tuning to facilitate judgment during the fine-tuning service process.

  5. Configure the api_key supported by the new platform as a global variable by using lazyllm.config.add(variable_name, type, default_value, environment_variable_name) .

Examples:

>>> import lazyllm
>>> from lazyllm.module import OnlineChatModuleBase
>>> from lazyllm.module.onlineChatModule.fileHandler import FileHandlerBase
>>> class NewPlatformChatModule(OnlineChatModuleBase):
...     def __init__(self,
...                   base_url: str = "<new platform base url>",
...                   model: str = "<new platform model name>",
...                   system_prompt: str = "<new platform system prompt>",
...                   stream: bool = True,
...                   return_trace: bool = False):
...         super().__init__(model_type="new_class_name",
...                          api_key=lazyllm.config['new_platform_api_key'],
...                          base_url=base_url,
...                          system_prompt=system_prompt,
...                          stream=stream,
...                          return_trace=return_trace)
...
>>> class NewPlatformChatModule1(OnlineChatModuleBase, FileHandlerBase):
...     TRAINABLE_MODELS_LIST = ['model_t1', 'model_t2', 'model_t3']
...     def __init__(self,
...                   base_url: str = "<new platform base url>",
...                   model: str = "<new platform model name>",
...                   system_prompt: str = "<new platform system prompt>",
...                   stream: bool = True,
...                   return_trace: bool = False):
...         OnlineChatModuleBase.__init__(self,
...                                       model_type="new_class_name",
...                                       api_key=lazyllm.config['new_platform_api_key'],
...                                       base_url=base_url,
...                                       system_prompt=system_prompt,
...                                       stream=stream,
...                                       trainable_models=NewPlatformChatModule1.TRAINABLE_MODELS_LIST,
...                                       return_trace=return_trace)
...         FileHandlerBase.__init__(self)
...     
...     def _convert_file_format(self, filepath:str) -> str:
...         pass
...         return data_str
...
...     def _upload_train_file(self, train_file):
...         pass
...         return train_file_id
...
...     def _create_finetuning_job(self, train_model, train_file_id, **kw):
...         pass
...         return fine_tuning_job_id, status
...
...     def _query_finetuning_job(self, fine_tuning_job_id):
...         pass
...         return fine_tuned_model, status
...
...     def _create_deployment(self):
...         pass
...         return self._model_name, "RUNNING"
... 
...     def _query_deployment(self, deployment_id):
...         pass
...         return "RUNNING"
...
Source code in lazyllm/module/onlineChatModule/onlineChatModuleBase.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
class OnlineChatModuleBase(ModuleBase):
    """OnlineChatModuleBase is a public component that manages the LLM interface for open platforms, and has key capabilities such as training, deployment, and inference. OnlineChatModuleBase itself does not support direct instantiation; it requires subclasses to inherit from this class and implement interfaces related to fine-tuning, such as uploading files, creating fine-tuning tasks, querying fine-tuning tasks, and deployment-related interfaces, such as creating deployment services and querying deployment tasks.
If you need to support the capabilities of a new open platform's LLM, please extend your custom class from OnlineChatModuleBase:

1. Consider post-processing the returned results based on the parameters returned by the new platform's model. If the model's return format is consistent with OpenAI, no processing is necessary.

2. If the new platform supports model fine-tuning, you must also inherit from the FileHandlerBase class. This class primarily validates file formats and converts .jsonl formatted data into a format supported by the model for subsequent training. 

3. If the new platform supports model fine-tuning, you must implement interfaces for file upload, creating fine-tuning services, and querying fine-tuning services. Even if the new platform does not require deployment of the fine-tuned model, please implement dummy interfaces for creating and querying deployment services.

4. If the new platform supports model fine-tuning, provide a list of models that support fine-tuning to facilitate judgment during the fine-tuning service process.

5. Configure the api_key supported by the new platform as a global variable by using ``lazyllm.config.add(variable_name, type, default_value, environment_variable_name)`` .


Examples:
    >>> import lazyllm
    >>> from lazyllm.module import OnlineChatModuleBase
    >>> from lazyllm.module.onlineChatModule.fileHandler import FileHandlerBase
    >>> class NewPlatformChatModule(OnlineChatModuleBase):
    ...     def __init__(self,
    ...                   base_url: str = "<new platform base url>",
    ...                   model: str = "<new platform model name>",
    ...                   system_prompt: str = "<new platform system prompt>",
    ...                   stream: bool = True,
    ...                   return_trace: bool = False):
    ...         super().__init__(model_type="new_class_name",
    ...                          api_key=lazyllm.config['new_platform_api_key'],
    ...                          base_url=base_url,
    ...                          system_prompt=system_prompt,
    ...                          stream=stream,
    ...                          return_trace=return_trace)
    ...
    >>> class NewPlatformChatModule1(OnlineChatModuleBase, FileHandlerBase):
    ...     TRAINABLE_MODELS_LIST = ['model_t1', 'model_t2', 'model_t3']
    ...     def __init__(self,
    ...                   base_url: str = "<new platform base url>",
    ...                   model: str = "<new platform model name>",
    ...                   system_prompt: str = "<new platform system prompt>",
    ...                   stream: bool = True,
    ...                   return_trace: bool = False):
    ...         OnlineChatModuleBase.__init__(self,
    ...                                       model_type="new_class_name",
    ...                                       api_key=lazyllm.config['new_platform_api_key'],
    ...                                       base_url=base_url,
    ...                                       system_prompt=system_prompt,
    ...                                       stream=stream,
    ...                                       trainable_models=NewPlatformChatModule1.TRAINABLE_MODELS_LIST,
    ...                                       return_trace=return_trace)
    ...         FileHandlerBase.__init__(self)
    ...     
    ...     def _convert_file_format(self, filepath:str) -> str:
    ...         pass
    ...         return data_str
    ...
    ...     def _upload_train_file(self, train_file):
    ...         pass
    ...         return train_file_id
    ...
    ...     def _create_finetuning_job(self, train_model, train_file_id, **kw):
    ...         pass
    ...         return fine_tuning_job_id, status
    ...
    ...     def _query_finetuning_job(self, fine_tuning_job_id):
    ...         pass
    ...         return fine_tuned_model, status
    ...
    ...     def _create_deployment(self):
    ...         pass
    ...         return self._model_name, "RUNNING"
    ... 
    ...     def _query_deployment(self, deployment_id):
    ...         pass
    ...         return "RUNNING"
    ...
    """

    def __init__(self,
                 model_series: str,
                 api_key: str,
                 base_url: str,
                 model_name: str,
                 stream: bool,
                 trainable_models: List[str],
                 return_trace: bool = False,
                 **kwargs):
        super().__init__(return_trace=return_trace)
        self._model_series = model_series
        if not api_key:
            raise ValueError("api_key is required")
        self._api_key = api_key
        self._base_url = base_url
        self._model_name = model_name
        self._stream = stream
        self.trainable_models = trainable_models
        self._set_headers()
        self._set_chat_url()
        self.prompt()
        self._is_trained = False
        self.formatter()
        self._field_extractor()
        self._model_optional_params = {}

    @property
    def series(self):
        return self._model_series

    @property
    def type(self):
        return "LLM"

    def prompt(self, prompt=None):
        if prompt is None:
            self._prompt = ChatPrompter()
        elif isinstance(prompt, PrompterBase):
            self._prompt = prompt
        elif isinstance(prompt, (str, dict)):
            self._prompt = ChatPrompter(prompt)
        else:
            raise TypeError(f"{prompt} type is not supported.")
        self._prompt._set_model_configs(system=self._get_system_prompt())
        return self

    def share(self, prompt: PrompterBase = None, format: FormatterBase = None):
        new = copy.copy(self)
        new._set_mid()
        if prompt is not None: new.prompt(prompt)
        if format is not None: new.formatter(format)
        return new

    def _get_system_prompt(self):
        raise NotImplementedError("_get_system_prompt is not implemented.")

    def _set_headers(self):
        self._headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer ' + self._api_key
        }

    def _set_chat_url(self):
        self._url = os.path.join(self._base_url, 'chat/completions')

    def _get_models_list(self):
        url = os.path.join(self._base_url, 'models')
        headers = {'Authorization': 'Bearer ' + self._api_key}
        with requests.get(url, headers=headers) as r:
            if r.status_code != 200:
                raise requests.RequestException('\n'.join([c.decode('utf-8') for c in r.iter_content(None)]))

            res_json = r.json()
            return res_json

    def _parse_output_by_key(self, key: str, data: Dict[str, Any]):
        if "choices" in data and isinstance(data["choices"], list):
            item = data['choices'][0]
            data = item.get("delta", {}) if "delta" in item else item.get("message", {})
            return data if not key or key == "." else data.get(key, "")
        else:
            raise ValueError(f"The response {data} does not contain a 'choices' field.")

    def formatter(self, format: FormatterBase = None):
        if isinstance(format, FormatterBase):
            self._formatter = format
        elif format is None:
            self._formatter = EmptyFormatter()
        else:
            raise TypeError("format must be a FormatterBase")

        return self

    def _field_extractor(self, key: Union[str, List[str]] = None):
        if key is None:
            self._extractor_fields = ["{content}" + globals['tool_delimiter'] + "{tool_calls|index}"]
        elif isinstance(key, str):
            self._extractor_fields = [key]
        elif isinstance(key, list):
            self._extractor_fields = key
        else:
            raise TypeError(f"Unsupported type: {type(key)}")

        return self

    def _convert_msg_format(self, msg: Dict[str, Any]):
        return msg

    def _str_to_json(self, msg: str):
        if isinstance(msg, bytes):
            pattern = re.compile(r"^data:\s*")
            msg = re.sub(pattern, "", msg.decode('utf-8'))
        try:
            chunk = json.loads(msg)
            message = self._convert_msg_format(chunk)
            if self._stream:
                for item in message.get("choices", []):
                    delta = item.get("delta", {})
                    content = delta.get("content", '')
                    if content and "tool_calls" not in delta: FileSystemQueue().enqueue(content)
            lazyllm.LOG.debug(f"message: {message}")
            return message
        except Exception:
            return ""

    def _get_benchmark_data(self, data: Dict[str, Any]):
        if "choices" in data and isinstance(data["choices"], list):
            item = data['choices'][0]
            return item.get("delta", {}) if "delta" in item else item.get("message", {})
        else:
            raise ValueError(f"The response {data} does not contain a 'choices' field.")

    def _extract_and_format(self, data, template):  # noqa: C901
        # finding placeholders in template and removing rules
        placeholders = re.findall(r"{(.*?)(?:\|(.*?))?}", template)
        delimiters = re.findall(r"<\|.*?\|>", template)
        # extract and format the fields corresponding to the placeholders
        extracted_data = {}
        pkeys = []
        for placeholder, remove_fields in placeholders:
            placeholder_key = placeholder + "|" + remove_fields if remove_fields else placeholder
            pkeys.append(placeholder_key)
            if 'tool_calls' in placeholder:
                # handling remove_fields
                remove_fields = remove_fields.split(',') if remove_fields else []

                # extract the tool_calls field
                keys = placeholder.split('.')
                value = data
                try:
                    for key in (int(key) if key.isdigit() else key for key in keys):
                        value = value[key]

                    if isinstance(value, list):
                        for item in value:
                            [item.pop(field) for field in remove_fields if field in item]
                    # value = json.dumps(value).replace('\n', '').replace(' ', '')
                    value = value if isinstance(value, str) else json.dumps(value, ensure_ascii=False)
                    extracted_data[placeholder_key] = value
                except (KeyError, IndexError, TypeError):
                    extracted_data[placeholder_key] = ""
            else:
                # extracting additional fields
                keys = placeholder.split('.')
                value = data
                try:
                    for key in (int(key) if key.isdigit() else key for key in keys):
                        value = value[key]
                    # convert the extracted value into a JSON string
                    value = value if isinstance(value, str) else json.dumps(value, ensure_ascii=False)
                    extracted_data[placeholder_key] = value
                except (KeyError, IndexError, TypeError):
                    extracted_data[placeholder_key] = ""

        # populate the template with the extracted data
        assert len(extracted_data) == len(delimiters) + 1, \
               "The delimiters and the number of extracted fields are inconsistent."
        result = extracted_data.get(pkeys[0])
        result += ''.join(delimiters[idx] + extracted_data[key]
                          for idx, key in enumerate(pkeys[1:]) if extracted_data.get(key))
        lazyllm.LOG.debug(f"result: {result}")
        return result

    def _extract_specified_key_fields(self, response: Dict[str, Any]):
        if len(self._extractor_fields) > 0:
            res = {}
            for key in self._extractor_fields:
                res[key] = (self._parse_output_by_key(key, response) if "{" not in key else self._extract_and_format(
                    self._get_benchmark_data(response), key) if key else "")
            return list(res.values())[0] if len(res) == 1 else json.dumps(res, ensure_ascii=False)
        else:
            return json.dumps(self._parse_output_by_key(".", response), ensure_ascii=False)

    def _merge_stream_result(self, src: List[str | int | list | dict]):
        types = set(type(ele) for ele in src if ele is not None)
        assert len(src) > 0 and len(types) <= 1, f"The elements in the list: {src} are of inconsistent types"
        if len(src) == 1:
            return src[0]
        if all(isinstance(ele, str) or ele is None for ele in src):
            if all(ele == src[-1] or ele is None for ele in src) or (self._model_optional_params
               and not self._model_optional_params.get("incremental_output", True)):
                return src[-1]
            else:
                return "".join(ele for ele in src if ele is not None)
        elif all(isinstance(ele, list) for ele in src):
            assert all(len(src[-1]) == len(ele) for ele in src), f"The lists of elements: {src} have different lengths."
            ret = [self._merge_stream_result([ele[idx] for ele in src]) for idx in range(len(src[-1]))]
            return ret[0] if isinstance(ret[0], list) else ret
        elif all(isinstance(ele, dict) for ele in src):
            if "index" in src[-1]:  # If there are multiple index values that need to be appended.
                data_sorted = sorted(src, key=lambda x: x['index'])
                grouped_data = [list(g) for k, g in groupby(data_sorted, key=lambda x: x['index'])]
                if len(grouped_data) > 1:
                    return [self._merge_stream_result(src) for src in grouped_data]
            return {k: "tool_calls" if k == "finish_reason" and "tool_calls" in [d[k] for d in src if k in d]
                    else self._merge_stream_result([d[k] for d in src if k in d]) for k in set().union(*src)}
        elif all(isinstance(ele, int) for ele in src):
            return src[-1] if all(ele == src[-1] for ele in src) else src[-1]
        else:
            raise TypeError(f"The elements in list {src} are of inconsistent types.")

    def forward(self, __input: Union[Dict, str] = None, *, llm_chat_history: List[List[str]] = None, tools: List[Dict[str, Any]] = None, **kw):  # noqa C901
        """LLM inference interface"""
        params = {"input": __input, "history": llm_chat_history}
        if tools:
            params["tools"] = tools
        params["return_dict"] = True
        data = self._prompt.generate_prompt(**params)

        data["model"] = self._model_name
        data["stream"] = self._stream
        if len(kw) > 0:
            data.update(kw)

        if len(self._model_optional_params) > 0:
            data.update(self._model_optional_params)

        with requests.post(self._url, json=data, headers=self._headers, stream=self._stream) as r:
            if r.status_code != 200:  # request error
                raise requests.RequestException('\n'.join([c.decode('utf-8') for c in r.iter_content(None)])) \
                    if self._stream else requests.RequestException(r.text)

            msg_json = list(filter(lambda x: x, [self._str_to_json(line) for line in r.iter_lines()
                                                 if len(line)] if self._stream else [self._str_to_json(r.text)]))
            extractor = self._extract_specified_key_fields(self._merge_stream_result(msg_json))

            return self._formatter.format(extractor) if extractor else ""

    def _set_template(self, template_message=None, keys_name_handle=None, template_headers=None):
        self.template_message = template_message
        self.keys_name_handle = keys_name_handle
        self.template_headers = template_headers

    def _upload_train_file(self, train_file) -> str:
        raise NotImplementedError(f"{self._model_series} not implemented _upload_train_file method in subclass")

    def _create_finetuning_job(self, train_model, train_file_id, **kw) -> Tuple[str, str]:
        raise NotImplementedError(f"{self._model_series} not implemented _create_finetuning_job method in subclass")

    def _query_finetuning_job(self, fine_tuning_job_id) -> Tuple[str, str]:
        raise NotImplementedError(f"{self._model_series} not implemented _query_finetuning_job method in subclass")

    def set_train_tasks(self, train_file, **kw):
        self._train_file = train_file
        self._train_parameters = kw

    def _get_train_tasks(self):
        if not self._model_name or not self._train_file:
            raise ValueError("train_model and train_file is required")
        if self._model_name not in self.trainable_models:
            lazyllm.LOG.log_once(f"The current model {self._model_name} is not in the trainable \
                                  model list {self.trainable_models}. The deadline for this list is June 1, 2024. \
                                  This model may not be trainable. If your model is a new model, \
                                  you can ignore this warning.")

        def _create_for_finetuning_job():
            """
            create for finetuning job to finish
            """
            file_id = self._upload_train_file(train_file=self._train_file)
            lazyllm.LOG.info(f"{os.path.basename(self._train_file)} upload success! file id is {file_id}")
            (fine_tuning_job_id, status) = self._create_finetuning_job(self._model_name,
                                                                       file_id,
                                                                       **self._train_parameters)
            lazyllm.LOG.info(f"fine tuning job {fine_tuning_job_id} created, status: {status}")

            if status.lower() == "failed":
                raise ValueError(f"Fine tuning job {fine_tuning_job_id} failed")
            import random
            while status.lower() != "succeeded":
                try:
                    # wait 10 seconds before querying again
                    time.sleep(random.randint(60, 120))
                    (fine_tuned_model, status) = self._query_finetuning_job(fine_tuning_job_id)
                    lazyllm.LOG.info(f"fine tuning job {fine_tuning_job_id} status: {status}")
                    if status.lower() == "failed":
                        raise ValueError(f"Finetuning job {fine_tuning_job_id} failed")
                except ValueError:
                    raise ValueError(f"Finetuning job {fine_tuning_job_id} failed")

            lazyllm.LOG.info(f"fine tuned model: {fine_tuned_model} finished")
            self._model_name = fine_tuned_model
            self._is_trained = True

        return Pipeline(_create_for_finetuning_job)

    def _create_deployment(self) -> Tuple[str, str]:
        raise NotImplementedError(f"{self._model_series} not implemented _create_deployment method in subclass")

    def _query_deployment(self, deployment_id) -> str:
        raise NotImplementedError(f"{self._model_series} not implemented _query_deployment method in subclass")

    def _get_deploy_tasks(self):
        if not self._is_trained: return None

        def _start_for_deployment():
            (deployment_id, status) = self._create_deployment()
            lazyllm.LOG.info(f"deployment {deployment_id} created, status: {status}")

            if status.lower() == "failed":
                raise ValueError(f"Deployment task {deployment_id} failed")
            status = self._query_deployment(deployment_id)
            while status.lower() != "running":
                # wait 10 seconds before querying again
                time.sleep(10)
                status = self._query_deployment(deployment_id)
                lazyllm.LOG.info(f"deployment {deployment_id} status: {status}")
                if status.lower() == "failed":
                    raise ValueError(f"Deployment task {deployment_id} failed")
            lazyllm.LOG.info(f"deployment {deployment_id} finished")
        return Pipeline(_start_for_deployment)

    def __repr__(self):
        return lazyllm.make_repr('Module', 'OnlineChat', name=self._module_name, url=self._base_url,
                                 stream=self._stream, return_trace=self._return_trace)

lazyllm.module.OnlineEmbeddingModuleBase

Bases: ModuleBase

OnlineEmbeddingModuleBase is the base class for managing embedding model interfaces on open platforms, used for requesting text to obtain embedding vectors. It is not recommended to directly instantiate this class. Specific platform classes should inherit from this class for instantiation. If you need to support the capabilities of embedding models on a new open platform, please extend your custom class from OnlineEmbeddingModuleBase:

  1. If the request and response data formats of the new platform's embedding model are the same as OpenAI's, no additional processing is needed; simply pass the URL and model.

  2. If the request or response data formats of the new platform's embedding model differ from OpenAI's, you need to override the _encapsulated_data or _parse_response methods.

  3. Configure the api_key supported by the new platform as a global variable by using lazyllm.config.add(variable_name, type, default_value, environment_variable_name) .

Examples:

>>> import lazyllm
>>> from lazyllm.module import OnlineEmbeddingModuleBase
>>> class NewPlatformEmbeddingModule(OnlineEmbeddingModuleBase):
...     def __init__(self,
...                 embed_url: str = '<new platform embedding url>',
...                 embed_model_name: str = '<new platform embedding model name>'):
...         super().__init__(embed_url, lazyllm.config['new_platform_api_key'], embed_model_name)
...
>>> class NewPlatformEmbeddingModule1(OnlineEmbeddingModuleBase):
...     def __init__(self,
...                 embed_url: str = '<new platform embedding url>',
...                 embed_model_name: str = '<new platform embedding model name>'):
...         super().__init__(embed_url, lazyllm.config['new_platform_api_key'], embed_model_name)
...
...     def _encapsulated_data(self, text:str, **kwargs):
...         pass
...         return json_data
...
...     def _parse_response(self, response: dict[str, any]):
...         pass
...         return embedding
Source code in lazyllm/module/onlineEmbedding/onlineEmbeddingModuleBase.py
class OnlineEmbeddingModuleBase(ModuleBase):
    """
OnlineEmbeddingModuleBase is the base class for managing embedding model interfaces on open platforms, used for requesting text to obtain embedding vectors. It is not recommended to directly instantiate this class. Specific platform classes should inherit from this class for instantiation.
If you need to support the capabilities of embedding models on a new open platform, please extend your custom class from OnlineEmbeddingModuleBase:

1. If the request and response data formats of the new platform's embedding model are the same as OpenAI's, no additional processing is needed; simply pass the URL and model.

2. If the request or response data formats of the new platform's embedding model differ from OpenAI's, you need to override the _encapsulated_data or _parse_response methods.

3. Configure the api_key supported by the new platform as a global variable by using ``lazyllm.config.add(variable_name, type, default_value, environment_variable_name)`` .


Examples:
    >>> import lazyllm
    >>> from lazyllm.module import OnlineEmbeddingModuleBase
    >>> class NewPlatformEmbeddingModule(OnlineEmbeddingModuleBase):
    ...     def __init__(self,
    ...                 embed_url: str = '<new platform embedding url>',
    ...                 embed_model_name: str = '<new platform embedding model name>'):
    ...         super().__init__(embed_url, lazyllm.config['new_platform_api_key'], embed_model_name)
    ...
    >>> class NewPlatformEmbeddingModule1(OnlineEmbeddingModuleBase):
    ...     def __init__(self,
    ...                 embed_url: str = '<new platform embedding url>',
    ...                 embed_model_name: str = '<new platform embedding model name>'):
    ...         super().__init__(embed_url, lazyllm.config['new_platform_api_key'], embed_model_name)
    ...
    ...     def _encapsulated_data(self, text:str, **kwargs):
    ...         pass
    ...         return json_data
    ...
    ...     def _parse_response(self, response: dict[str, any]):
    ...         pass
    ...         return embedding
    """

    def __init__(self,
                 model_series: str,
                 embed_url: str,
                 api_key: str,
                 embed_model_name: str,
                 return_trace: bool = False):
        super().__init__(return_trace=return_trace)
        self._model_series = model_series
        self._embed_url = embed_url
        self._api_key = api_key
        self._embed_model_name = embed_model_name
        self._set_headers()

    @property
    def series(self):
        return self._model_series

    @property
    def type(self):
        return "EMBED"

    def _set_headers(self) -> Dict[str, str]:
        self._headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self._api_key}"
        }

    def forward(self, text: str, **kwargs) -> List[float]:
        data = self._encapsulated_data(text, **kwargs)
        with requests.post(self._embed_url, json=data, headers=self._headers) as r:
            if r.status_code == 200:
                return self._parse_response(r.json())
            else:
                raise requests.RequestException('\n'.join([c.decode('utf-8') for c in r.iter_content(None)]))

    def _encapsulated_data(self, text: str, **kwargs) -> Dict[str, str]:
        json_data = {
            "input": text,
            "model": self._embed_model_name
        }
        if len(kwargs) > 0:
            json_data.update(kwargs)

        return json_data

    def _parse_response(self, response: Dict[str, Any]) -> List[float]:
        return response['data'][0]['embedding']