Skip to content

数据处理

数据处理算子

基类算子

lazyllm.tools.data.LazyLLMDataBase

数据处理算子基类。为注册到 data_register 的算子提供统一行为,包括并发执行、结果保存/恢复、进度记录和错误收集。

主要方法和行为:

  • forward(self, input, **kwargs): 处理单条数据(子类/函数实现)。
  • forward_batch_input(self, inputs, **kwargs): 处理批量数据并返回最终结果(子类/函数实现)。
  • call(self, inputs): 统一入口,会根据子类是否实现 forward 或 forward_batch_input 选择执行逻辑;支持并发执行、断点续传和保存结果。
  • set_output(self, path): 设置导出路径,调用后 call 返回导出文件路径而不是内存结果。

构造函数参数:

  • _concurrency_mode (str): 并发模式,'process'|'thread'|'single'。
  • _save_data (bool): 是否保存中间结果到磁盘以便 Resume。
  • _max_workers (int|None): 最大并发工作进程/线程数,None 表示使用默认。
  • _ignore_errors (bool): 是否忽略任务异常。
  • **kwargs (dict): 其它传递给算子的参数。

配置项(通过 lazyllm.config):

  • data_process_path (str): 存储处理结果的根路径。
  • data_process_resume (bool): 是否开启 Resume 功能,从进度文件继续处理。

Examples:

from lazyllm.tools.data import LazyLLMDataBase

# simple usage: subclass and implement forward
class EchoOp(LazyLLMDataBase):
    def forward(self, data):
        return {'text': data.get('text', '')}

op = EchoOp(_save_data=True)
res = op([{'text': 'hello'}])  # returns list or exported path depending on set_output
Source code in lazyllm/tools/data/base_data.py
class LazyLLMDataBase(metaclass=LazyLLMRegisterMetaClass):
    """数据处理算子基类。为注册到 data_register 的算子提供统一行为,包括并发执行、结果保存/恢复、进度记录和错误收集。

主要方法和行为:

- forward(self, input, **kwargs): 处理单条数据(子类/函数实现)。
- forward_batch_input(self, inputs, **kwargs): 处理批量数据并返回最终结果(子类/函数实现)。
- __call__(self, inputs): 统一入口,会根据子类是否实现 forward 或 forward_batch_input 选择执行逻辑;支持并发执行、断点续传和保存结果。
- set_output(self, path): 设置导出路径,调用后 __call__ 返回导出文件路径而不是内存结果。

构造函数参数:

- _concurrency_mode (str): 并发模式,'process'|'thread'|'single'。
- _save_data (bool): 是否保存中间结果到磁盘以便 Resume。
- _max_workers (int|None): 最大并发工作进程/线程数,None 表示使用默认。
- _ignore_errors (bool): 是否忽略任务异常。
- **kwargs (dict): 其它传递给算子的参数。

配置项(通过 lazyllm.config):

- data_process_path (str): 存储处理结果的根路径。
- data_process_resume (bool): 是否开启 Resume 功能,从进度文件继续处理。


Examples:
    ```python
    from lazyllm.tools.data import LazyLLMDataBase

    # simple usage: subclass and implement forward
    class EchoOp(LazyLLMDataBase):
        def forward(self, data):
            return {'text': data.get('text', '')}

    op = EchoOp(_save_data=True)
    res = op([{'text': 'hello'}])  # returns list or exported path depending on set_output
    ```
    """
    def __init__(self, _concurrency_mode=None, _save_data=True, _max_workers=None,
                 _ignore_errors=True, **kwargs):
        self._concurrency_mode = _concurrency_mode or getattr(self, '_concurrency_mode', 'process')
        if _max_workers:
            self._max_workers = _max_workers
        elif self._concurrency_mode == 'process':
            self._max_workers = os.cpu_count()
        else:
            self._max_workers = min(max(32, (os.cpu_count() or 1) * 5), 128)
        self._ignore_errors = _ignore_errors
        self._store = DataStateStore(self.__class__.__name__, _save_data)
        self._lazyllm_kwargs = kwargs
        self._export_path = None

    def set_output(self, output_path):
        """设置输出路径,用于把最终结果导出为 jsonl 文件并返回文件路径。

Args:
    output_path (str): 文件夹路径或具体 .jsonl 文件路径。若为文件夹,则在该文件夹下创建以类名命名的 jsonl 文件。

行为:

- 如果传入的是文件夹路径,则在该文件夹下创建以类名命名的 jsonl 文件。
- 如果传入的是以 .jsonl 结尾的路径,则直接写入该文件(必要时会创建目录)。
- 返回写入的绝对路径字符串。


Examples:
    ```python
    from lazyllm.tools.data import Demo2

    # export to a directory (will create DemoClass.jsonl)
    op = Demo2.rich_content(input_key='text').set_output('./out_dir')
    path = op([{'text': 'sample'}])
    print(path)  # ./out_dir/RichContent.jsonl or similar

    # export to a specific file
    op = Demo2.rich_content(input_key='text').set_output('./out_dir/results.jsonl')
    path = op([{'text': 'sample'}])
    print(path)  # ./out_dir/results.jsonl
    ```
    """
        self._export_path = output_path
        return self

    def _overwrote(self, f):
        return getattr(self.__class__, f) is not getattr(__class__, f) or \
            getattr(self.__class__, '__reg_overwrite__', None) == f

    def forward(self, input_data, **kwargs):
        """子类需要实现的方法,处理单条数据。返回值支持:

- dict: 表示处理后的单条结果。
- list: 表示将一条输入展开为多条输出。
- None: 表示保留原始输入(不修改)。
- 抛出异常或返回错误对象会被记录到错误文件并跳过(依赖配置和调用者)。

Args:
    input (dict): 单条输入数据字典。
    **kwargs (dict): 其它用户传入的参数。


Examples:
    ```python
    from lazyllm.tools.data import LazyLLMDataBase

    class MyOp(LazyLLMDataBase):
        def forward(self, data):
            # return dict or list or None
            return {'text': data.get('text', '').upper()}

    op = MyOp()
    print(op([{'text': 'a'}]))
    ```
    """
        raise NotImplementedError()

    def forward_batch_input(self, inputs, **kwargs):
        """子类可实现的批量处理方法,用于在非逐条并发场景下直接接收整个输入列表并返回最终结果列表(可用于自定义批量逻辑或外部服务一次性处理)。

Args:
    inputs (list[dict]): 输入数据列表。
    **kwargs (dict): 其它用户传入的参数。


Examples:
    ```python
    from lazyllm.tools.data import LazyLLMDataBase

    class BatchOp(LazyLLMDataBase):
        def forward_batch_input(self, inputs):
            # implement batch processing and return a list
            return [{'text': i.get('text', '').lower()} for i in inputs]

    op = BatchOp()
    print(op([{'text': 'A'}, {'text': 'B'}]))
    ```
    """
        raise NotImplementedError()

    def _run_one(self, data):
        try:
            kwargs = getattr(self, '_lazyllm_kwargs', {})
            return self.forward(data, **kwargs)
        except Exception as e:
            err_msg = str(e)
            if isinstance(data, dict):
                return {**data, 'infer_error': err_msg}
            return {'input': data, 'infer_error': err_msg}

    def _process_forward_common(self, data):
        self._store.load_progress()
        results = []
        pbar = tqdm(total=len(data), desc=f'Processing {self.__class__.__name__}', unit='item')

        if self._store.is_done:
            pbar.update(len(data))
            pending_indices = []
        else:
            if len(self._store.processed_indices) > 0:
                pbar.update(len(self._store.processed_indices))

            pending_indices = [idx for idx in range(len(data)) if idx not in self._store.processed_indices]

        if not pending_indices:
            pbar.close()
            return self._store.load_results()

        if self._concurrency_mode == 'single':
            for idx in pending_indices:
                res = self._run_one(data[idx])
                self._handle_result(res, data[idx], results, [idx])
                pbar.update(1)
        else:
            self._process_parallel(data, pending_indices, results, pbar)

        pbar.close()
        # Flush remaining
        if self._store.save_data:
            self._store.save_results([], force=True)  # Flush
            return self._store.load_results()
        return results

    def _process_parallel(self, data, pending_indices, results, pbar):

        executor_cls = ProcessPoolExecutor if self._concurrency_mode == 'process' else ThreadPoolExecutor
        idx_iter = iter(pending_indices)
        futures = {}

        with executor_cls(max_workers=self._max_workers) as executor:
            # 1. Submit initial batch
            for _ in range(self._max_workers):
                try:
                    idx = next(idx_iter)
                    fut = executor.submit(self._run_one, data[idx])
                    futures[fut] = idx
                except StopIteration:
                    break

            # 2. Loop
            while futures:
                done, _ = wait(futures.keys(), return_when=FIRST_COMPLETED)

                for fut in done:
                    idx = futures.pop(fut)
                    try:
                        res = fut.result()
                        self._handle_result(res, data[idx], results, [idx])
                    except Exception as e:
                        if not self._ignore_errors:
                            raise e
                        LOG.error(f'Task failed: {e}')

                    pbar.update(1)

                    # Submit next
                    try:
                        next_idx = next(idx_iter)
                        new_fut = executor.submit(self._run_one, data[next_idx])
                        futures[new_fut] = next_idx
                    except StopIteration:
                        pass

    def _handle_result(self, res, original_data, results, indices):
        if isinstance(res, dict) and 'infer_error' in res:
            if self._store.save_data:
                self._store.save_errors(res)
                self._store.save_results([], indices)
            return

        # Logic to interpret return value
        final_res = []
        if res is None:
            final_res.append(original_data)  # Keep original
        elif isinstance(res, list):
            if res:  # Not empty
                final_res.extend(res)
            # Empty list means delete (do nothing)
        elif isinstance(res, dict):
            final_res.append(res)
        else:
            # Treat unexpected return types as errors
            err_msg = f'Invalid return type {type(res)} from {self.__class__.__name__}, expect dict or list or None'
            LOG.error(err_msg)
            if isinstance(original_data, dict):
                error_res = original_data.copy()
                error_res['infer_error'] = err_msg
            else:
                error_res = {'input': original_data, 'infer_error': err_msg}

            if self._store.save_data:
                self._store.save_errors(error_res)
                self._store.save_results([], indices)
            return

        if self._store.save_data:
            self._store.save_results(final_res, indices)
        else:
            results.extend(final_res)

    def _export_file(self, result):
        if not self._export_path or result is None:
            return result

        path = self._export_path
        if not path.endswith('.jsonl'):
            os.makedirs(path, exist_ok=True)
            path = os.path.join(path, f'{self.__class__.__name__}.jsonl')
        else:
            dir_name = os.path.dirname(path)
            if dir_name:
                os.makedirs(dir_name, exist_ok=True)

        abs_path = os.path.abspath(path)
        with open(abs_path, 'w', encoding='utf-8') as f:
            for item in result:
                f.write(json.dumps(item, ensure_ascii=False) + '\n')
        return abs_path

    def __call__(self, inputs):
        if not isinstance(inputs, list):
            inputs = [inputs]

        kwargs = getattr(self, '_lazyllm_kwargs', {})
        res = []

        if self._overwrote('forward_batch_input'):
            self._store.load_progress()
            if self._store.save_data and self._store.resume and self._store.is_done:
                LOG.warning(f'skip {self.__class__.__name__} and load data from {self._store.save_path}')
                res = self._store.load_results()
            else:
                res = self.forward_batch_input(inputs, **kwargs)

                if self._store.save_data and res is not None:
                    self._store.save_results(res if isinstance(res, list) else [res], indices='Done', force=True)

        elif self._overwrote('forward'):
            res = self._process_forward_common(inputs)
        else:
            raise RuntimeError('Must implement forward or forward_batch_input')

        return self._export_file(res)

forward(input_data, **kwargs)

子类需要实现的方法,处理单条数据。返回值支持:

  • dict: 表示处理后的单条结果。
  • list: 表示将一条输入展开为多条输出。
  • None: 表示保留原始输入(不修改)。
  • 抛出异常或返回错误对象会被记录到错误文件并跳过(依赖配置和调用者)。

Parameters:

  • input (dict) –

    单条输入数据字典。

  • **kwargs (dict, default: {} ) –

    其它用户传入的参数。

Examples:

from lazyllm.tools.data import LazyLLMDataBase

class MyOp(LazyLLMDataBase):
    def forward(self, data):
        # return dict or list or None
        return {'text': data.get('text', '').upper()}

op = MyOp()
print(op([{'text': 'a'}]))
Source code in lazyllm/tools/data/base_data.py
    def forward(self, input_data, **kwargs):
        """子类需要实现的方法,处理单条数据。返回值支持:

- dict: 表示处理后的单条结果。
- list: 表示将一条输入展开为多条输出。
- None: 表示保留原始输入(不修改)。
- 抛出异常或返回错误对象会被记录到错误文件并跳过(依赖配置和调用者)。

Args:
    input (dict): 单条输入数据字典。
    **kwargs (dict): 其它用户传入的参数。


Examples:
    ```python
    from lazyllm.tools.data import LazyLLMDataBase

    class MyOp(LazyLLMDataBase):
        def forward(self, data):
            # return dict or list or None
            return {'text': data.get('text', '').upper()}

    op = MyOp()
    print(op([{'text': 'a'}]))
    ```
    """
        raise NotImplementedError()

forward_batch_input(inputs, **kwargs)

子类可实现的批量处理方法,用于在非逐条并发场景下直接接收整个输入列表并返回最终结果列表(可用于自定义批量逻辑或外部服务一次性处理)。

Parameters:

  • inputs (list[dict]) –

    输入数据列表。

  • **kwargs (dict, default: {} ) –

    其它用户传入的参数。

Examples:

from lazyllm.tools.data import LazyLLMDataBase

class BatchOp(LazyLLMDataBase):
    def forward_batch_input(self, inputs):
        # implement batch processing and return a list
        return [{'text': i.get('text', '').lower()} for i in inputs]

op = BatchOp()
print(op([{'text': 'A'}, {'text': 'B'}]))
Source code in lazyllm/tools/data/base_data.py
    def forward_batch_input(self, inputs, **kwargs):
        """子类可实现的批量处理方法,用于在非逐条并发场景下直接接收整个输入列表并返回最终结果列表(可用于自定义批量逻辑或外部服务一次性处理)。

Args:
    inputs (list[dict]): 输入数据列表。
    **kwargs (dict): 其它用户传入的参数。


Examples:
    ```python
    from lazyllm.tools.data import LazyLLMDataBase

    class BatchOp(LazyLLMDataBase):
        def forward_batch_input(self, inputs):
            # implement batch processing and return a list
            return [{'text': i.get('text', '').lower()} for i in inputs]

    op = BatchOp()
    print(op([{'text': 'A'}, {'text': 'B'}]))
    ```
    """
        raise NotImplementedError()

set_output(output_path)

设置输出路径,用于把最终结果导出为 jsonl 文件并返回文件路径。

Parameters:

  • output_path (str) –

    文件夹路径或具体 .jsonl 文件路径。若为文件夹,则在该文件夹下创建以类名命名的 jsonl 文件。

行为:

  • 如果传入的是文件夹路径,则在该文件夹下创建以类名命名的 jsonl 文件。
  • 如果传入的是以 .jsonl 结尾的路径,则直接写入该文件(必要时会创建目录)。
  • 返回写入的绝对路径字符串。

Examples:

from lazyllm.tools.data import Demo2

# export to a directory (will create DemoClass.jsonl)
op = Demo2.rich_content(input_key='text').set_output('./out_dir')
path = op([{'text': 'sample'}])
print(path)  # ./out_dir/RichContent.jsonl or similar

# export to a specific file
op = Demo2.rich_content(input_key='text').set_output('./out_dir/results.jsonl')
path = op([{'text': 'sample'}])
print(path)  # ./out_dir/results.jsonl
Source code in lazyllm/tools/data/base_data.py
    def set_output(self, output_path):
        """设置输出路径,用于把最终结果导出为 jsonl 文件并返回文件路径。

Args:
    output_path (str): 文件夹路径或具体 .jsonl 文件路径。若为文件夹,则在该文件夹下创建以类名命名的 jsonl 文件。

行为:

- 如果传入的是文件夹路径,则在该文件夹下创建以类名命名的 jsonl 文件。
- 如果传入的是以 .jsonl 结尾的路径,则直接写入该文件(必要时会创建目录)。
- 返回写入的绝对路径字符串。


Examples:
    ```python
    from lazyllm.tools.data import Demo2

    # export to a directory (will create DemoClass.jsonl)
    op = Demo2.rich_content(input_key='text').set_output('./out_dir')
    path = op([{'text': 'sample'}])
    print(path)  # ./out_dir/RichContent.jsonl or similar

    # export to a specific file
    op = Demo2.rich_content(input_key='text').set_output('./out_dir/results.jsonl')
    path = op([{'text': 'sample'}])
    print(path)  # ./out_dir/results.jsonl
    ```
    """
        self._export_path = output_path
        return self

演示算子

lazyllm.tools.data.operators.demo_ops

AddSuffix

Bases: Demo2

通过类方式实现的算子,为指定字段添加后缀。支持并发配置(通过构造参数)。

Parameters:

  • suffix (str) –

    要添加的后缀

  • input_key (str, default: 'content' ) –

    文本字段名

  • _max_workers (int | None) –

    可选,最大并发数

  • _concurrency_mode (str, default: 'process' ) –

    可选,并发模式

  • _save_data (bool) –

    可选,是否保存结果

Examples:

from lazyllm.tools.data.operators.demo_ops import AddSuffix

op = AddSuffix(suffix='!!!', input_key='text', _max_workers=2)
print(op([{'text': 'wow'}]))  # [{'text': 'wow!!!'}]
Source code in lazyllm/tools/data/operators/demo_ops.py
class AddSuffix(Demo2):
    """通过类方式实现的算子,为指定字段添加后缀。支持并发配置(通过构造参数)。

Args:
    suffix (str): 要添加的后缀
    input_key (str): 文本字段名
    _max_workers (int|None): 可选,最大并发数
    _concurrency_mode (str): 可选,并发模式
    _save_data (bool): 可选,是否保存结果


Examples:
    ```python
    from lazyllm.tools.data.operators.demo_ops import AddSuffix

    op = AddSuffix(suffix='!!!', input_key='text', _max_workers=2)
    print(op([{'text': 'wow'}]))  # [{'text': 'wow!!!'}]
    ```
    """
    def __init__(self, suffix, input_key='content', _concurrency_mode='process', **kwargs):
        super().__init__(_concurrency_mode=_concurrency_mode, **kwargs)
        self.suffix = suffix
        self.input_key = input_key

    def forward(self, data, **kwargs):
        assert isinstance(data, dict)
        data[self.input_key] = f'{data.get(self.input_key, "")}{self.suffix}'
        return data

build_pre_suffix(data, input_key='content', prefix='', suffix='')

对输入列表中每项在指定字段前后添加前缀和后缀。此算子以批处理函数注册(forward_batch_input)。

Parameters:

  • data (list[dict]) –

    输入列表

  • input_key (str, default: 'content' ) –

    文本字段名

  • prefix (str, default: '' ) –

    要添加的前缀

  • suffix (str, default: '' ) –

    要添加的后缀

Examples:

from lazyllm.tools.data.operators.demo_ops import build_pre_suffix

op = build_pre_suffix(input_key='text', prefix='Hello, ', suffix='!')
print(op([{'text': 'world'}]))
# [{'text': 'Hello, world!'}]
Source code in lazyllm/tools/data/operators/demo_ops.py
@data_register('data.demo1', rewrite_func='forward_batch_input')
def build_pre_suffix(data, input_key='content', prefix='', suffix=''):
    """对输入列表中每项在指定字段前后添加前缀和后缀。此算子以批处理函数注册(forward_batch_input)。

Args:
    data (list[dict]): 输入列表
    input_key (str): 文本字段名
    prefix (str): 要添加的前缀
    suffix (str): 要添加的后缀


Examples:
    ```python
    from lazyllm.tools.data.operators.demo_ops import build_pre_suffix

    op = build_pre_suffix(input_key='text', prefix='Hello, ', suffix='!')
    print(op([{'text': 'world'}]))
    # [{'text': 'Hello, world!'}]
    ```
    """
    assert isinstance(data, list)
    for item in data:
        item[input_key] = f'{prefix}{item.get(input_key, "")}{suffix}'
    return data

error_prone_op(data, input_key='content')

一个用于测试的算子:在特定输入(content == 'fail')时抛出异常,否则返回处理后的字典结果。用于验证错误收集与跳过逻辑。

Parameters:

  • data (dict) –

    单条数据字典

  • input_key (str, default: 'content' ) –

    文本字段名

Examples:

from lazyllm.tools.data.operators.demo_ops import error_prone_op

op = error_prone_op(input_key='text', _save_data=True, _concurrency_mode='single')
res = op([{'text': 'ok'}, {'text': 'fail'}, {'text': 'ok2'}])
# valid results skip the failed item; error details written to error file
Source code in lazyllm/tools/data/operators/demo_ops.py
@data_register('data.demo2', rewrite_func='forward')
def error_prone_op(data, input_key='content'):
    """一个用于测试的算子:在特定输入(content == 'fail')时抛出异常,否则返回处理后的字典结果。用于验证错误收集与跳过逻辑。

Args:
    data (dict): 单条数据字典
    input_key (str): 文本字段名


Examples:
    ```python
    from lazyllm.tools.data.operators.demo_ops import error_prone_op

    op = error_prone_op(input_key='text', _save_data=True, _concurrency_mode='single')
    res = op([{'text': 'ok'}, {'text': 'fail'}, {'text': 'ok2'}])
    # valid results skip the failed item; error details written to error file
    ```
    """
    assert isinstance(data, dict)
    content = data.get(input_key, '')
    if content == 'fail':
        raise ValueError('Intentional error for testing.')
    data[input_key] = f'Processed: {content}'
    return data

process_uppercase(data, input_key='content')

将输入文本字段转换为大写。适用于单条处理函数注册(forward)。

Parameters:

  • data (dict) –

    单条数据字典

  • input_key (str, default: 'content' ) –

    文本字段名,默认 'content'

Examples:

from lazyllm.tools.data.operators.demo_ops import process_uppercase

op = process_uppercase(input_key='text')
print(op({'text': 'hello'}))  # {'text': 'HELLO'}
Source code in lazyllm/tools/data/operators/demo_ops.py
@data_register('data.demo1', rewrite_func='forward', _concurrency_mode='process')
def process_uppercase(data, input_key='content'):
    """将输入文本字段转换为大写。适用于单条处理函数注册(forward)。

Args:
    data (dict): 单条数据字典
    input_key (str): 文本字段名,默认 'content'


Examples:
    ```python
    from lazyllm.tools.data.operators.demo_ops import process_uppercase

    op = process_uppercase(input_key='text')
    print(op({'text': 'hello'}))  # {'text': 'HELLO'}
    ```
    """
    assert isinstance(data, dict)
    data[input_key] = data.get(input_key, '').upper()
    return data

rich_content(data, input_key='content')

将单条输入拆分为多条输出,生成富内容表示(原始 + 若干派生)。适用于返回 list 的 forward。

Parameters:

  • data (dict) –

    单条数据字典

  • input_key (str, default: 'content' ) –

    文本字段名

Examples:

from lazyllm.tools.data.operators.demo_ops import rich_content

op = rich_content(input_key='text')
print(op({'text': 'This is a test.'}))
# [
#   {'text': 'This is a test.'},
#   {'text': 'This is a test. - part 1'},
#   {'text': 'This is a test. - part 2'}
# ]
Source code in lazyllm/tools/data/operators/demo_ops.py
@data_register('data.demo2', rewrite_func='forward', _concurrency_mode='process')
def rich_content(data, input_key='content'):
    """将单条输入拆分为多条输出,生成富内容表示(原始 + 若干派生)。适用于返回 list 的 forward。

Args:
    data (dict): 单条数据字典
    input_key (str): 文本字段名


Examples:
    ```python
    from lazyllm.tools.data.operators.demo_ops import rich_content

    op = rich_content(input_key='text')
    print(op({'text': 'This is a test.'}))
    # [
    #   {'text': 'This is a test.'},
    #   {'text': 'This is a test. - part 1'},
    #   {'text': 'This is a test. - part 2'}
    # ]
    ```
    """
    assert isinstance(data, dict)
    content = data.get(input_key, '')
    new_res = [data]
    for i in range(2):
        new_data = data.copy()
        new_data[input_key] = f'{content} - part {i+1}'
        new_res.append(new_data)
    return new_res

数据处理 Pipeline

演示Pipeline

lazyllm.tools.data.pipelines.demo_pipelines

build_demo_pipeline(input_key='text')

构建演示用数据处理流水线(Pipeline),包含若干示例算子并展示如何在 pipeline 上组合使用这些算子。

Parameters:

  • input_key (str, default: 'text' ) –

    要处理的文本字段名,默认 'text'

Returns:

一个可调用的 pipeline 对象,调用时会按顺序执行其中注册的算子。

Examples:

from lazyllm.tools.data.pipelines.demo_pipelines import build_demo_pipeline

ppl = build_demo_pipeline(input_key='text')
data = [{'text': 'lazyLLM'}]
res = ppl(data)
print(res)  # demonstrates how operators are combined and applied
Source code in lazyllm/tools/data/pipelines/demo_pipelines.py
def build_demo_pipeline(input_key='text'):
    """构建演示用数据处理流水线(Pipeline),包含若干示例算子并展示如何在 pipeline 上组合使用这些算子。

Args:
    input_key (str): 要处理的文本字段名,默认 'text'

**Returns:**

    一个可调用的 pipeline 对象,调用时会按顺序执行其中注册的算子。


Examples:
    ```python
    from lazyllm.tools.data.pipelines.demo_pipelines import build_demo_pipeline

    ppl = build_demo_pipeline(input_key='text')
    data = [{'text': 'lazyLLM'}]
    res = ppl(data)
    print(res)  # demonstrates how operators are combined and applied
    ```
    """
    with pipeline() as ppl:
        ppl.build_pre_suffix = demo1.build_pre_suffix(input_key=input_key, prefix='Hello, ', suffix='!')
        ppl.process_uppercase = demo1.process_uppercase(input_key=input_key)
        ppl.add_suffix = demo2.AddSuffix(input_key=input_key, suffix='!!!', _max_workers=4)
        ppl.rich_content = demo2.rich_content(input_key=input_key, _concurrency_mode='single')
    return ppl