CustomRegion 工具介绍#

章节目标#

  • 了解什么场景下应该选用CustomRegion工具

  • 了解CustomRegion工具中配置器和算子的连接关系

  • 了解CustomRegion工具中的配置器和算子功能及使用方法

支持的使用场景#

CustomRegion 是 VisionFlow 平台中用于自定义区域检测的工具,允许用户通过 Python 脚本实现自己的训练和推理逻辑。 该工具特别适用于需要定制化区域检测算法的场景,例如需要集成自定义算法或第三方深度学习框架(如 PyTorch、TensorFlow)的场景 或需要快速验证新型区域检测算法。

Note

CustomRegion工具的核心优势在于其灵活性,用户可以使用任何 Python 深度学习框架(如 PyTorch、TensorFlow)或传统计算机视觉算法实现自己的检测逻辑,并无缝集成到 VisionFlow 流程中。

CustomRegion工具中配置器和算子的连接关系#

  • 训练器(CustomRegionTrainer):负责训练自定义区域检测模型,输出为模型二进制数据

  • 推理器(CustomRegionInfer):负责使用训练好的模型进行推理,接收单个图像并输出检测到的区域

  • 数据流:训练数据(原图、视图、原图信息、标注) → CustomRegionTrainer(训练) → 模型 → CustomRegionInfer(推理) → 检测结果

  • 详细参数、属性定义及各配置器、算子的输入输出请参考 工具及详细流程图

CustomRegion 工具中的配置器#

标签类别参数配置器(label_classes.conf)#

Warning

  • 背景类为默认类别,不需要显式设置在LabelClasses中

  • 不能出现重复的缺陷类别名

训练参数配置器(trainer.conf)#

训练二进制参数配置器(trainer_binary_arg.conf)#

  • 描述:用于配置训练过程中使用的二进制包参数(visionflow::param::BinaryPacks),这些参数会被传递给训练器作为额外的参数信息。如果不手动设置,若开启自动更新参数功能,将会生成空的BinaryPacks参数并设置给训练器。

自定义区域训练器 (trainer)#

  • 描述:CustomRegion训练器,基于用户提供的Python脚本实现自定义区域检测模型的训练,接收图像、标注数据和配置参数,输出训练好的模型二进制数据(visionflow::param::BinaryPacks)。

  • 基本原理
    1. 初始化Python执行环境,加载CustomConfigurator类

    2. 验证CustomConfigurator类的接口规范(__init__和execute方法)

    3. 调用CustomConfigurator.execute方法执行训练过程

    4. 输出训练完成的模型(binary_packs格式)

Warning

  • 必须实现CustomConfigurator类及其必需的方法

  • 训练过程需要合理使用device_allocator进行设备分配(详见下文 设备分配器使用指南

  • 训练接口

训练过程通过 CustomConfigurator 类实现,该类负责:

  • 从 VisionFlow 服务上下文中读取训练数据;

  • 执行模型训练逻辑;

  • 将训练后的模型序列化并返回;

用户必须严格按照以下约定实现训练类,否则系统将无法加载或执行训练逻辑:

  1. 类名 必须 定义为 CustomConfigurator (区分大小写)

  2. 必须实现如下构造函数签名:

    • def __init__(self, device_allocator, svc)

  3. 必须实现如下训练入口函数签名:

    • def execute(self, svc)

  4. execute() 方法 必须返回 一个 vflow.param.BinaryPacks 对象,用于向系统传递训练完成后的模型数据

  5. 不允许修改方法名、参数数量或参数顺序,否则系统将无法正确调用

接口规范示例代码

class CustomConfigurator:
    def __init__(self, device_allocator, svc):
        """
        初始化方法,必须接受两个参数:
        - device_allocator: 设备分配函数
        - svc: 服务上下文,用于访问训练数据
        """
        pass

    def execute(self, svc):
        """
        训练主函数,必须返回一个 vflow.param.BinaryPacks 对象,
        该对象包含训练好的模型数据

        参数:
        - svc: 服务上下文,提供进度报告等辅助功能
        """
        return model  # vflow.param.BinaryPacks 类型

推理参数配置器(infer.conf)#

CustomRegion工具中的算子#

标注器算子(label_oper)#

  • 描述:用于产生某张图像上的自定义区域标注truth和图像中代表训练/推理区域的tagged_polygon(visionflow::props::TaggedPolygonList)。标注器算子在当前CustomRegion工具中是一个占位算子,实际无法自动运行,需要交给标注器开发者自己触发运行。

  • 如何产生标注:通过直接给label_oper算子设置输出对应的缺陷标注和划分区域来完成标注过程。

Warning

  • 缺陷标注需要根据所在视图的状态(已标注在训练集中(train)、已标注在测试集中(test)、已标注未知集合(unknown))来进行相应的状态更新,否则会影响view_tagger的正常功能。

视图标签算子(view_tagger)#

  • 描述:在visionflow中,视图有四种状态,已标注在训练集中(train)、已标注在测试集中(test)、已标注未知集合(unknown)。当分割工具的前序工具的推理结果发生改变时,view_tagger可以复用之前已有的标注用于生成新的视图的状态。

  • 基本原理:在标注器算子中,生成的标注均带有标签,以表示某个标注是在kTrain、kTest、kUnknown状态,当前序工具推理结果发生改变并生成新的view时,会计算新的view和当前已有标注的IOU值,若某个view能找到CIOU大于指定阈值的标注,则将该标注的状态赋予该view。

推理算子(infer)#

  • 描述:CustomRegion推理器,基于自定义Python脚本实现的推理逻辑,接收图像、视图和模型,输出预测的多边形区域列表。

  • 基本原理
    1. 初始化Python执行环境,加载CustomOperator类

    2. 验证CustomOperator类的接口规范(__init__和execute方法)

    3. 调用CustomOperator.execute方法执行推理过程,传入图像、视图和图像信息

    4. 输出推理结果(visionflow::props::PolygonRegionList格式)

Warning

  • 必须实现CustomOperator类及其必需的方法

  • 推理过程需要正确的参数传递

  • 推理接口

模型推理通过 CustomOperator 类实现,该类负责:

  • 加载训练阶段生成的模型;

  • 对输入图像及其视图(views)执行推理;

  • 构造并返回推理结果;

用户必须严格按照以下约定实现推理类,否则系统将无法加载或执行推理逻辑:

  1. 类名 必须 定义为 CustomOperator (区分大小写)

  2. 必须实现如下构造函数签名:

    • def __init__(self, device_allocator, model)

  3. 必须实现如下推理入口函数签名:

    • def execute(self, image, views, img_info)

  4. execute() 方法 必须返回 一个 visionflow.props.PolygonRegionList 对象

  5. 不允许修改方法名、参数数量或参数顺序

接口规范示例代码

class CustomOperator:
    """
    默认示例实现:执行一个"空推理"。
    execute() 返回一个空的 PolygonRegionList
    注意:类名必须是 CustomOperator
    """

    def __init__(self, device_allocator, model):

        # 使用 VFLOW_USER_VARS["..."] 访问自定义参数
        # self.threshold = VFLOW_USER_VARS["置信度阈值"]
        # self.args_1 = VFLOW_USER_VARS["..."]

        self._model_buf = None
        if model.contains("torch_model"):
            self._model_buf = model.get("torch_model")

    def execute(self, image, views, img_info):
        """
        执行推理。

        当前示例不会真的做推理,只返回一个空的 PolygonRegionList
        """

        # 这里可以根据 image / views / img_info 构造检测结果
        # 直接返回PolygonRegionList作为空结果
        result = vflow.props.PolygonRegionList()
        return result

区域匹配算子(comparator)#

  • 描述:计算推理结果和对应标注的相似性,得到区域匹配结果。该算子通过RegionsMatcher实现,接收视图列表、真实标注和预测结果,输出区域匹配结果。

  • 基本原理
    • 接收三个输入属性:视图列表(ViewList)、真实标注(IRegionList)、预测结果(IRegionList)

    • 对真实标注和预测结果中的区域进行匹配计算(IOU计算匹配最大值)、确定区域匹配状态

    • 输出RegionMatchResultList格式的匹配结果

统计算子(statistician)#

  • 描述:基于区域匹配结果计算各种统计指标,输出模型性能统计数据和混淆矩阵。

  • 基本原理:统计推理结果与真实标注之间的匹配情况,生成准确率、召回率、混淆矩阵等评估指标,用于分析模型效果。

设备分配器使用指南#

CustomRegion 工具提供了灵活设备分配机制,允许用户在多GPU环境中指定计算设备或指定使用CPU进行计算。

在CustomRegion的训练和推理脚本中,您可以通过以下简单接口申请GPU设备资源(在CustomConfigurator和CustomOperator的构造函数中通过device_allocator参数提供):

# 在CustomConfigurator或CustomOperator类中
gpu_ids = device_allocator(require_num=1, require_memory_size=500*1024*1024)  # 申请1个至少有500MB空闲内存的GPU

参数说明:

  • require_num: 需要的 GPU 数量(默认为0,表示申请所有内存可用的GPU)

  • require_memory_size: 每个GPU需要的最小空闲内存(字节)(默认为0,表示不限制内存大小)

Note

1GB = 1024 * 1024 * 1024 字节

返回值:

  • 分配的GPU设备ID列表(Python列表,元素为整数)

  • 如果没有满足条件的设备,返回空列表

  • 当仅使用 CPU 时,返回[-1]

说明

  • 设备分配器采用RAII机制(资源获取即初始化),当您的脚本执行完毕或异常退出时,系统会自动释放所有已分配的资源,您无需编写额外的释放代码。

  • 分配资源时会优先使用指定的GPU设备,内存不足时会自动选择空闲内存更多的设备,当GPU不可用时,可自动降级到CPU模式(详见:cpp:class:visionflow::runtime::StrategyOptions

最佳实践建议#

  1. 使用常量提高可读性:定义内存常量使代码更易读

GB = 1024 * 1024 * 1024
MB = 1024 * 1024

# 申请2GB内存
gpu_ids = device_allocator(1, 2 * GB)

# 申请500MB内存
gpu_ids = device_allocator(1, 500 * MB)
  1. 最小化申请:只申请实际需要的GPU数量和内存

# 推荐
gpu_ids = device_allocator(1, 1 * GB)  # 只申请1个GPU,1GB内存

# 不推荐
gpu_ids = device_allocator(8, 16 * GB)  # 申请过多资源可能导致分配失败
  1. 利用默认参数:当不确定具体需求时,使用默认参数让系统自动选择

# 让系统自动选择最适合的设备
gpu_ids = device_allocator()

Note

您无需了解底层实现细节,只需通过简单接口获取设备ID,然后在您的算法中使用这些ID即可。系统会处理所有复杂的设备管理和资源回收工作,让您可以专注于核心算法开发。

自定义用户变量 (VFLOW_USER_VARS)#

在训练和推理脚本执行前,系统会自动将 VFLOW_USER_VARS 注入到脚本的全局作用域。该变量是一个 Python dict,其内容由用户在训练参数配置器(trainer.conf)或推理参数配置器(infer.conf)中配置的自定义变量决定。

支持的变量类型

配置类型

说明

Python类型

string

字符串类型

str

number

数值类型

float

bool

布尔类型(true/false/1/0)

bool

使用示例

# 下标访问(推荐,key 不存在时抛出 KeyError,便于排查配置缺失问题)
model_path = VFLOW_USER_VARS["ModelPath"]

# 读取数值类型变量,注意 number 类型在 Python 中为 float,需按需转换
epochs = int(VFLOW_USER_VARS["Epoch"])
lr = float(VFLOW_USER_VARS["LearningRate"])

# 布尔类型直接读取
use_aug = VFLOW_USER_VARS["UseAugmentation"]

# 也可以使用 .get() 并提供默认值(key 不存在时不报错,返回默认值)
lr = float(VFLOW_USER_VARS.get("LearningRate", 0.001))

完整示例:基于 PyTorch 的正方形检测#

本示例展示了一个完整的 CustomRegion 端到端流程:

  • 数据集:

    • 包含正方形和非正方形图像样本;

    • 包含正方形的图像样本, truth 节点非空,代表”有正方形存在”这种标注;

    • 不包含正方形的图像样本, truth 节点为空,代表”无正方形存在”这种标注;

  • 训练阶段:

    • 使用 PyTorch 极简搭建一个CNN模型判断图像中是否存在正方形;

    • 通过 truth 节点是否为空来标注样本是否有正方形;

  • 推理阶段:

    • 使用训练好的模型对输入图像进行推理,判断是否存在正方形:若存在则返回的 PolygonRegionList 非空,否则为空;

训练参数示例:

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import visionflow as vflow

def img_to_tensor(img: vflow.img.Image, device: torch.device) -> torch.Tensor:
    img_np = img.to_numpy()
    img_tensor = (
        torch.from_numpy(img_np)
        .permute(2, 0, 1)   # HWC -> CHW
        .unsqueeze(0)       # add batch dim
        .contiguous()
        .float() / 255.0
    )
    return img_tensor.to(device)

class SquareDetector(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(1, 4, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(4, 8, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(8, 2)  # [no_square, has_square]
        )

    def forward(self, x):
        return self.net(x)

class CustomConfigurator:
    def __init__(self, device_allocator, svc):

        # --- read USER_VARS (from C++ API set_user_vars) ---
        self.epochs = int(VFLOW_USER_VARS["Epoch"])
        self.lr = float(VFLOW_USER_VARS["LearningRate"])
        svc.on_progress(100, 0, f'[INIT] training args: epochs={self.epochs}, lr={self.lr}')

        # ---- set device ----
        gpu_ids = device_allocator(1, 512 * 1024 * 1024)
        self.device = torch.device(
            f'cuda:{gpu_ids[0]}' if gpu_ids and gpu_ids[0] >= 0 else 'cpu'
        )

        # --- initialize CNN model ---
        self.model = SquareDetector().to(self.device)
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.SGD(self.model.parameters(), lr=self.lr, momentum=0.9)
        svc.on_progress(100, 0, f'[INIT] SquareDetector model initialized on device: {self.device}')

    # ======================
    # training process
    # ======================
    def execute(self, svc):
        image_set, views_set, _, label_set = svc.property_sets()

        total_train_views = 0
        for id, views in views_set:
            svc.on_progress(100, 0, f'loop in views_set, id: {id}')
            total_train_views += views.tagged_views(vflow.kTrain).size()
        svc.on_progress(100, 0, f'total_train_views: {total_train_views}')

        total_steps = self.epochs * max(1, total_train_views)
        current_step = 0
        svc.on_progress(total_steps, 0, f'start training: {total_train_views} samples/epoch * {self.epochs} epochs')

        for epoch in range(self.epochs):
            epoch_loss = 0.0

            for id, views in views_set:
                train_views = views.tagged_views(vflow.kTrain)
                if train_views.empty():
                    continue

                img_prop = image_set.at(id)
                has_square = 1 if label_set.at(id).size() > 0 else 0
                label_tensor = torch.tensor([has_square], dtype=torch.long, device=self.device)

                for view_id, view in train_views:
                    # raw image to sub image according to view
                    sub_img = vflow.img.transform(img_prop.image(), view)
                    sub_img = sub_img.image()

                    # img 转为 Tensor 并归一化
                    img_tensor = img_to_tensor(sub_img, self.device)

                    # 训练步骤
                    self.optimizer.zero_grad()
                    output = self.model(img_tensor)
                    loss = self.criterion(output, label_tensor)
                    loss.backward()
                    self.optimizer.step()

                    epoch_loss += loss.item()
                    current_step += 1

                    svc.on_progress(
                        total_steps,
                        current_step,
                        f"Epoch {epoch+1}/{self.epochs} | Step {current_step} | Loss: {loss.item():.4f}"
                    )

            avg_loss = epoch_loss / total_train_views
            svc.on_progress(
                total_steps,
                (epoch + 1) * total_train_views,
                f"Epoch {epoch+1} finished | AvgLoss: {avg_loss:.4f}"
            )

        # ======================
        # save model to BinaryPacks
        # ======================
        import io
        buf = io.BytesIO()
        torch.save({
            'model_state_dict': self.model.state_dict(),
            'input_size': [1, sub_img.height(), sub_img.width()],
            'class_names': ['no_square', 'has_square']
        }, buf)

        model_packs = vflow.param.BinaryPacks()
        model_packs.insert(
            'square_detector_model',
            vflow.Buffer.FromBytes(buf.getvalue())
        )
        svc.on_progress(
            total_steps,
            total_steps,
            'Model saved successfully!'
        )
        return model_packs

推理参数示例:

import torch
import torch.nn as nn
import numpy as np
import visionflow as vflow
import io

def img_to_tensor(img: vflow.img.Image, device: torch.device) -> torch.Tensor:
    img_np = img.to_numpy()
    img_tensor = (
        torch.from_numpy(img_np)
        .permute(2, 0, 1)   # HWC -> CHW
        .unsqueeze(0)       # add batch dim
        .contiguous()
        .float() / 255.0
    )
    return img_tensor.to(device)

class SquareDetector(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(1, 4, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(4, 8, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(8, 2)  # [no_square, has_square]
        )

    def forward(self, x):
        return self.net(x)

class CustomOperator:

    def __init__(self, device_allocator, model):
        # ---- set device ----
        gpu_ids = device_allocator(1, 512 * 1024 * 1024)
        self.device = torch.device(
            f'cuda:{gpu_ids[0]}' if gpu_ids and gpu_ids[0] >= 0 else 'cpu'
        )

        # ---- load model ----
        buf = model.get('square_detector_model').to_bytes()
        state = torch.load(io.BytesIO(buf), map_location=self.device)

        self.model = SquareDetector().to(self.device)
        self.model.load_state_dict(state['model_state_dict'])
        self.model.eval()

    def execute(self, image, views, img_info):

        # ---- CNN check whether image has square ----
        img = image.image()
        img_tensor = img_to_tensor(img, self.device)

        with torch.no_grad():
            logits = self.model(img_tensor)
            pred = torch.argmax(logits, dim=1).item()
        print(f"pred: {pred}")
        result = vflow.props.PolygonRegionList()
        if pred == 0:
            return result

        # ---- add a polygon to result if square detected ----
        w, h = img.width(), img.height()

        ploy = vflow.geometry.Polygon2f()
        ploy.outer = vflow.geometry.Ring2f(
            [
                vflow.geometry.Point2f(0, 0),
                vflow.geometry.Point2f(w, 0),
                vflow.geometry.Point2f(w, h),
                vflow.geometry.Point2f(0, h),
            ]
        )
        poly_reg = vflow.PolygonRegion()
        poly_reg.set_polygon(ploy).set_name("square")

        result.add(poly_reg)
        return result