LingBot-Depth进阶使用:结合API实现批量图片深度估计自动化
LingBot-Depth进阶使用结合API实现批量图片深度估计自动化1. 引言为什么需要批量深度估计在日常的计算机视觉项目中我们经常需要处理大量图片的深度估计任务。无论是构建3D场景数据集、开发机器人导航系统还是进行AR/VR内容创作手动一张张上传图片到Web界面显然效率太低。这就是为什么我们需要掌握LingBot-Depth的API调用方法实现批量图片处理的自动化流程。LingBot-Depth提供的FastAPI接口让我们可以绕过图形界面直接通过编程方式发送请求并获取深度图结果。本文将手把手教你如何搭建一个完整的自动化处理流水线从环境准备到代码实现再到错误处理和性能优化。2. 环境准备与API基础2.1 确认服务状态在开始之前请确保你已经按照基础教程部署了LingBot-Depth镜像并且服务正常运行。可以通过以下方式验证# 检查FastAPI服务是否运行 curl http://localhost:8000/docs如果看到Swagger API文档页面说明服务已就绪。2.2 了解核心API端点LingBot-Depth提供了两个主要API端点/predict/monocular- 单目深度估计/predict/completion- 深度补全我们主要关注单目深度估计端点其基本请求格式如下{ image: base64编码的RGB图像, output_type: heatmap|raw|both, # 输出类型 resize: null # 可选缩放尺寸 }3. 构建批量处理脚本3.1 基础脚本框架下面是一个完整的Python脚本框架用于批量处理指定目录下的所有图片import os import base64 import requests from PIL import Image import numpy as np import time class DepthBatchProcessor: def __init__(self, api_urlhttp://localhost:8000/predict/monocular): self.api_url api_url self.headers {Content-Type: application/json} def image_to_base64(self, image_path): with open(image_path, rb) as image_file: return base64.b64encode(image_file.read()).decode(utf-8) def process_single_image(self, image_path, output_dir): try: # 准备请求数据 image_b64 self.image_to_base64(image_path) payload { image: image_b64, output_type: both, resize: None } # 发送API请求 start_time time.time() response requests.post( self.api_url, jsonpayload, headersself.headers ) processing_time time.time() - start_time if response.status_code 200: result response.json() # 保存结果 self.save_results(image_path, output_dir, result) return True, processing_time else: print(f处理失败: {image_path}, 状态码: {response.status_code}) return False, 0 except Exception as e: print(f处理异常: {image_path}, 错误: {str(e)}) return False, 0 def save_results(self, image_path, output_dir, result): # 创建输出目录 os.makedirs(output_dir, exist_okTrue) # 提取文件名 base_name os.path.splitext(os.path.basename(image_path))[0] # 保存伪彩色深度图 if heatmap in result: heatmap_data base64.b64decode(result[heatmap]) with open(f{output_dir}/{base_name}_depth.png, wb) as f: f.write(heatmap_data) # 保存原始深度数据 if raw_depth in result: depth_array np.frombuffer( base64.b64decode(result[raw_depth]), dtypenp.float32 ).reshape(result[height], result[width]) np.save(f{output_dir}/{base_name}_depth.npy, depth_array) def process_batch(self, input_dir, output_dir, max_workers4): # 获取所有图片文件 image_files [ f for f in os.listdir(input_dir) if f.lower().endswith((.png, .jpg, .jpeg)) ] total_images len(image_files) success_count 0 total_time 0 print(f开始处理 {total_images} 张图片...) for idx, image_file in enumerate(image_files, 1): image_path os.path.join(input_dir, image_file) print(f正在处理 [{idx}/{total_images}]: {image_file}) success, time_taken self.process_single_image(image_path, output_dir) if success: success_count 1 total_time time_taken print(f完成处理 [{idx}/{total_images}], 耗时: {time_taken:.2f}s) else: print(f跳过 [{idx}/{total_images}] 由于处理失败) print(\n批量处理完成!) print(f成功处理: {success_count}/{total_images}) if success_count 0: print(f平均处理时间: {total_time/success_count:.2f}s/张)3.2 脚本使用方法要使用这个批量处理脚本只需按照以下步骤将上述代码保存为depth_batch_processor.py准备输入图片目录如./input_images创建输出目录如./output_depth运行脚本python depth_batch_processor.py --input ./input_images --output ./output_depth4. 高级功能与优化技巧4.1 并行处理加速对于大批量图片我们可以使用多线程/多进程来加速处理。下面是修改后的并行处理版本from concurrent.futures import ThreadPoolExecutor class DepthBatchProcessor: # ... 保留之前的其他方法 ... def process_batch_parallel(self, input_dir, output_dir, max_workers4): image_files [ f for f in os.listdir(input_dir) if f.lower().endswith((.png, .jpg, .jpeg)) ] total_images len(image_files) success_count 0 total_time 0 print(f开始并行处理 {total_images} 张图片 (workers{max_workers})...) with ThreadPoolExecutor(max_workersmax_workers) as executor: futures [] for idx, image_file in enumerate(image_files, 1): image_path os.path.join(input_dir, image_file) futures.append(executor.submit( self.process_single_image, image_path, output_dir )) for idx, future in enumerate(futures, 1): success, time_taken future.result() if success: success_count 1 total_time time_taken print(f完成 [{idx}/{total_images}], 耗时: {time_taken:.2f}s) else: print(f跳过 [{idx}/{total_images}] 由于处理失败) print(\n批量处理完成!) print(f成功处理: {success_count}/{total_images}) if success_count 0: print(f平均处理时间: {total_time/success_count:.2f}s/张)4.2 错误处理与重试机制网络请求可能会遇到临时性问题添加重试机制可以提高稳定性def process_single_image(self, image_path, output_dir, max_retries3): retry_count 0 while retry_count max_retries: try: # 准备请求数据 image_b64 self.image_to_base64(image_path) payload { image: image_b64, output_type: both, resize: None } # 发送API请求 start_time time.time() response requests.post( self.api_url, jsonpayload, headersself.headers, timeout30 # 添加超时 ) processing_time time.time() - start_time if response.status_code 200: result response.json() self.save_results(image_path, output_dir, result) return True, processing_time else: print(f尝试 {retry_count1}/{max_retries} 失败: {image_path}) retry_count 1 time.sleep(1) # 等待1秒再重试 except Exception as e: print(f尝试 {retry_count1}/{max_retries} 异常: {str(e)}) retry_count 1 time.sleep(1) print(f达到最大重试次数 {max_retries}, 跳过: {image_path}) return False, 04.3 内存优化技巧处理大量高分辨率图片时内存管理很重要图片尺寸调整在发送到API前可以先将大图缩小def resize_image(self, image_path, max_size1024): img Image.open(image_path) if max(img.size) max_size: ratio max_size / max(img.size) new_size (int(img.size[0] * ratio), int(img.size[1] * ratio)) img img.resize(new_size, Image.LANCZOS) return img分批处理对于超大图片集可以分批加载处理def process_batch_in_chunks(self, input_dir, output_dir, chunk_size100): all_files [f for f in os.listdir(input_dir) if f.lower().endswith((.png, .jpg, .jpeg))] for i in range(0, len(all_files), chunk_size): chunk all_files[i:ichunk_size] print(f处理批次 {i//chunk_size 1}/{(len(all_files)-1)//chunk_size 1}) self.process_batch_parallel( input_dir, output_dir, image_fileschunk )5. 实际应用案例5.1 3D场景重建流水线将批量深度估计集成到3D重建流程中class SceneReconstructor: def __init__(self, depth_processor): self.depth_processor depth_processor def process_scene_sequence(self, image_dir, output_dir): # 第一步批量生成深度图 self.depth_processor.process_batch(image_dir, output_dir) # 第二步加载深度图和彩色图 depth_maps self.load_depth_maps(output_dir) color_images self.load_color_images(image_dir) # 第三步点云生成与配准 point_clouds self.generate_point_clouds(color_images, depth_maps) merged_cloud self.align_point_clouds(point_clouds) # 保存最终3D模型 self.save_3d_model(merged_cloud, os.path.join(output_dir, scene.ply)) # 其他辅助方法...5.2 机器人导航数据集生成为机器人导航系统自动生成训练数据def generate_navigation_dataset(rgb_dir, output_dir, camera_params): processor DepthBatchProcessor() # 处理所有RGB图像 processor.process_batch(rgb_dir, output_dir) # 为每张图片生成3D语义信息 for depth_file in glob.glob(os.path.join(output_dir, *_depth.npy)): base_name os.path.basename(depth_file).replace(_depth.npy, ) rgb_path os.path.join(rgb_dir, f{base_name}.jpg) depth_map np.load(depth_file) rgb_image cv2.imread(rgb_path) # 生成点云 points depth_to_pointcloud(depth_map, camera_params) # 保存为机器人导航系统需要的格式 save_navigation_data( points, rgb_image, os.path.join(output_dir, f{base_name}_navdata.bin) )6. 总结与最佳实践通过本文的指导你应该已经掌握了使用LingBot-Depth API进行批量图片深度估计的完整流程。以下是几个关键的最佳实践建议预处理很重要确保输入图片格式正确必要时进行尺寸调整合理设置并行度根据服务器性能调整max_workers避免过载实施错误处理网络请求可能失败要有重试和日志记录机制结果验证定期抽样检查生成的深度图质量资源管理大批量处理时注意内存和磁盘空间使用自动化批量处理可以极大提高工作效率特别是在需要处理成百上千张图片的项目中。LingBot-Depth的API接口设计简洁高效结合Python的强大生态能够轻松集成到各种计算机视觉流水线中。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。