Compare commits
15 Commits
930f1952d5
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d8dabd1951 | ||
|
|
d29bc650c3 | ||
|
|
afd48c2d86 | ||
|
|
bacf8cd69d | ||
|
|
26763fa75c | ||
|
|
6e3d01bc83 | ||
|
|
3258b7b6de | ||
|
|
3d75ed722a | ||
|
|
116551af18 | ||
|
|
f8975b26b4 | ||
|
|
ebda75fa5e | ||
|
|
0a45856b14 | ||
|
|
d2c75a2d14 | ||
| f95a2bd2db | |||
|
|
49fe21fb2f |
125
tools/batch_rasterize_10_0.py
Normal file
125
tools/batch_rasterize_10_0.py
Normal file
@@ -0,0 +1,125 @@
|
||||
import pya
|
||||
import os
|
||||
import glob
|
||||
|
||||
def batch_rasterize_layer_10_0(input_dir, output_dir, width_px=256):
|
||||
# --- 1. 环境准备 ---
|
||||
if not os.path.exists(input_dir):
|
||||
print(f"Error: Input directory not found: {input_dir}")
|
||||
return
|
||||
|
||||
if not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir)
|
||||
print(f"Created output directory: {output_dir}")
|
||||
|
||||
# 获取所有 gds 文件 (不区分大小写)
|
||||
gds_files = glob.glob(os.path.join(input_dir, "*.gds")) + \
|
||||
glob.glob(os.path.join(input_dir, "*.GDS"))
|
||||
|
||||
# 去重并排序
|
||||
gds_files = sorted(list(set(gds_files)))
|
||||
|
||||
total_files = len(gds_files)
|
||||
print(f"Found {total_files} GDS files in {input_dir}")
|
||||
print("-" * 50)
|
||||
|
||||
# 定义目标层
|
||||
TARGET_LAYER = 10
|
||||
TARGET_DATATYPE = 0
|
||||
|
||||
# --- 2. 批量处理循环 ---
|
||||
for i, gds_path in enumerate(gds_files):
|
||||
try:
|
||||
gds_filename = os.path.basename(gds_path)
|
||||
gds_basename = os.path.splitext(gds_filename)[0]
|
||||
|
||||
# 输出文件路径: out_dir/filename.png
|
||||
output_path = os.path.join(output_dir, f"{gds_basename}.png")
|
||||
|
||||
print(f"[{i+1}/{total_files}] Processing: {gds_filename} ...", end="", flush=True)
|
||||
|
||||
# --- 加载 Layout ---
|
||||
layout = pya.Layout()
|
||||
layout.read(gds_path)
|
||||
top_cell = layout.top_cell()
|
||||
|
||||
if top_cell is None:
|
||||
print(" -> Error: No Top Cell")
|
||||
continue
|
||||
|
||||
# --- 获取微米单位的 BBox (关键修复) ---
|
||||
global_dbbox = top_cell.dbbox()
|
||||
|
||||
# 如果 BBox 无效,跳过
|
||||
if global_dbbox.width() <= 0 or global_dbbox.height() <= 0:
|
||||
print(" -> Error: Empty Layout")
|
||||
continue
|
||||
|
||||
# --- 计算分辨率 ---
|
||||
aspect_ratio = global_dbbox.height() / global_dbbox.width()
|
||||
height_px = int(width_px * aspect_ratio)
|
||||
height_px = max(1, height_px)
|
||||
|
||||
# --- 初始化视图 ---
|
||||
view = pya.LayoutView()
|
||||
view.show_layout(layout, False)
|
||||
view.max_hier_levels = 1000 # 保证显示所有层级
|
||||
|
||||
# 配置背景 (黑底)
|
||||
view.set_config("background-color", "#000000")
|
||||
view.set_config("grid-visible", "false")
|
||||
|
||||
# --- 配置 Layer 10/0 ---
|
||||
|
||||
# 1. 清除默认图层
|
||||
iter = view.begin_layers()
|
||||
while not iter.at_end():
|
||||
view.delete_layer(iter)
|
||||
|
||||
# 2. 查找目标层索引
|
||||
# find_layer 返回索引,如果没找到通常需要在后续判断
|
||||
# 注意:即使文件里没有这一层,我们通常也需要生成一张全黑图片以保持数据集完整性
|
||||
layer_idx = layout.find_layer(TARGET_LAYER, TARGET_DATATYPE)
|
||||
|
||||
# 检查该层是否存在于 layout 中
|
||||
if layer_idx is not None:
|
||||
# 检查该层在 Top Cell 下是否有内容 (可选,为了效率)
|
||||
# 如果你需要即便没内容也输出黑图,可以保留逻辑继续
|
||||
|
||||
props = pya.LayerPropertiesNode()
|
||||
props.source_layer_index = layer_idx
|
||||
|
||||
# --- 沿用你确认可用的参数 ---
|
||||
props.dither_pattern = 0 # 你的配置: 0
|
||||
props.width = 0 # 你的配置: 0
|
||||
props.fill_color = 0xFFFFFF
|
||||
props.frame_color = 0xFFFFFF
|
||||
props.visible = True
|
||||
|
||||
view.insert_layer(view.end_layers(), props)
|
||||
else:
|
||||
# 如果没找到层,保持 view 里没有层,结果将是纯黑背景
|
||||
# 这在机器学习数据集中通常是期望的行为(Label为空)
|
||||
pass
|
||||
|
||||
# --- 锁定视角 (使用 Micron 坐标) ---
|
||||
view.zoom_box(global_dbbox)
|
||||
|
||||
# --- 保存图片 ---
|
||||
view.save_image(output_path, width_px, height_px)
|
||||
print(" Done.")
|
||||
|
||||
except Exception as e:
|
||||
print(f" -> Exception: {e}")
|
||||
|
||||
print("-" * 50)
|
||||
print("Batch processing finished.")
|
||||
|
||||
# --- 主程序入口 ---
|
||||
if __name__ == "__main__":
|
||||
# 配置输入输出文件夹
|
||||
input_folder = "/home/jiao77/Documents/data/ICCAD2019/layout" # 你的 GDS 文件夹
|
||||
output_folder = "/home/jiao77/Documents/data/ICCAD2019/img" # 输出图片文件夹
|
||||
resolution_width = 256 # 图片宽度
|
||||
|
||||
batch_rasterize_layer_10_0(input_folder, output_folder, resolution_width)
|
||||
@@ -1,253 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
一键生成扩散数据的脚本:
|
||||
1. 基于原始数据训练扩散模型
|
||||
2. 使用训练好的模型生成相似图像
|
||||
3. 更新配置文件
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import yaml
|
||||
import argparse
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
import logging
|
||||
|
||||
|
||||
def setup_logging():
|
||||
"""设置日志"""
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.StreamHandler(sys.stdout)
|
||||
]
|
||||
)
|
||||
return logging.getLogger(__name__)
|
||||
|
||||
|
||||
def train_diffusion_model(data_dir, model_dir, logger, **train_kwargs):
|
||||
"""训练扩散模型"""
|
||||
logger.info("开始训练扩散模型...")
|
||||
|
||||
# 构建训练命令
|
||||
cmd = [
|
||||
sys.executable, "tools/diffusion/ic_layout_diffusion.py", "train",
|
||||
"--data_dir", data_dir,
|
||||
"--output_dir", model_dir,
|
||||
"--image_size", str(train_kwargs.get("image_size", 256)),
|
||||
"--batch_size", str(train_kwargs.get("batch_size", 8)),
|
||||
"--epochs", str(train_kwargs.get("epochs", 100)),
|
||||
"--lr", str(train_kwargs.get("lr", 1e-4)),
|
||||
"--timesteps", str(train_kwargs.get("timesteps", 1000)),
|
||||
"--num_samples", str(train_kwargs.get("num_samples", 50)),
|
||||
"--save_interval", str(train_kwargs.get("save_interval", 10))
|
||||
]
|
||||
|
||||
if train_kwargs.get("augment", False):
|
||||
cmd.append("--augment")
|
||||
|
||||
# 执行训练
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
logger.error(f"扩散模型训练失败: {result.stderr}")
|
||||
return False
|
||||
|
||||
logger.info("扩散模型训练完成")
|
||||
return True
|
||||
|
||||
|
||||
def generate_samples(model_dir, output_dir, num_samples, logger, **gen_kwargs):
|
||||
"""生成样本"""
|
||||
logger.info(f"开始生成 {num_samples} 个样本...")
|
||||
|
||||
# 查找最终模型
|
||||
model_path = Path(model_dir) / "diffusion_final.pth"
|
||||
if not model_path.exists():
|
||||
# 如果没有最终模型,查找最新的检查点
|
||||
checkpoints = list(Path(model_dir).glob("diffusion_epoch_*.pth"))
|
||||
if checkpoints:
|
||||
model_path = max(checkpoints, key=lambda x: int(x.stem.split('_')[-1]))
|
||||
else:
|
||||
logger.error(f"在 {model_dir} 中找不到模型检查点")
|
||||
return False
|
||||
|
||||
logger.info(f"使用模型: {model_path}")
|
||||
|
||||
# 构建生成命令
|
||||
cmd = [
|
||||
sys.executable, "tools/diffusion/ic_layout_diffusion.py", "generate",
|
||||
"--checkpoint", str(model_path),
|
||||
"--output_dir", output_dir,
|
||||
"--num_samples", str(num_samples),
|
||||
"--image_size", str(gen_kwargs.get("image_size", 256)),
|
||||
"--timesteps", str(gen_kwargs.get("timesteps", 1000))
|
||||
]
|
||||
|
||||
# 执行生成
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
logger.error(f"样本生成失败: {result.stderr}")
|
||||
return False
|
||||
|
||||
logger.info("样本生成完成")
|
||||
return True
|
||||
|
||||
|
||||
def update_config(config_path, output_dir, ratio, logger):
|
||||
"""更新配置文件"""
|
||||
logger.info(f"更新配置文件: {config_path}")
|
||||
|
||||
# 读取配置
|
||||
with open(config_path, 'r', encoding='utf-8') as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
# 确保必要的结构存在
|
||||
if 'synthetic' not in config:
|
||||
config['synthetic'] = {}
|
||||
|
||||
# 更新扩散配置
|
||||
config['synthetic']['enabled'] = True
|
||||
config['synthetic']['ratio'] = 0.0 # 禁用程序化合成
|
||||
|
||||
if 'diffusion' not in config['synthetic']:
|
||||
config['synthetic']['diffusion'] = {}
|
||||
|
||||
config['synthetic']['diffusion']['enabled'] = True
|
||||
config['synthetic']['diffusion']['png_dir'] = output_dir
|
||||
config['synthetic']['diffusion']['ratio'] = ratio
|
||||
|
||||
# 保存配置
|
||||
with open(config_path, 'w', encoding='utf-8') as f:
|
||||
yaml.dump(config, f, default_flow_style=False, allow_unicode=True)
|
||||
|
||||
logger.info(f"配置文件更新完成,扩散数据比例: {ratio}")
|
||||
|
||||
|
||||
def validate_generated_data(output_dir, logger):
|
||||
"""验证生成的数据"""
|
||||
logger.info("验证生成的数据...")
|
||||
|
||||
output_path = Path(output_dir)
|
||||
if not output_path.exists():
|
||||
logger.error(f"输出目录不存在: {output_dir}")
|
||||
return False
|
||||
|
||||
# 统计生成的图像
|
||||
png_files = list(output_path.glob("*.png"))
|
||||
if not png_files:
|
||||
logger.error("没有找到生成的PNG图像")
|
||||
return False
|
||||
|
||||
logger.info(f"验证通过,生成了 {len(png_files)} 个图像")
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="一键生成扩散数据管线")
|
||||
parser.add_argument("--config", type=str, required=True, help="配置文件路径")
|
||||
parser.add_argument("--data_dir", type=str, help="原始数据目录(覆盖配置文件)")
|
||||
parser.add_argument("--model_dir", type=str, default="models/diffusion", help="扩散模型保存目录")
|
||||
parser.add_argument("--output_dir", type=str, default="data/diffusion_generated", help="生成数据保存目录")
|
||||
parser.add_argument("--num_samples", type=int, default=200, help="生成的样本数量")
|
||||
parser.add_argument("--ratio", type=float, default=0.3, help="扩散数据在训练中的比例")
|
||||
parser.add_argument("--skip_training", action="store_true", help="跳过训练,直接生成")
|
||||
parser.add_argument("--model_checkpoint", type=str, help="指定模型检查点路径(skip_training时使用)")
|
||||
|
||||
# 训练参数
|
||||
parser.add_argument("--epochs", type=int, default=100, help="训练轮数")
|
||||
parser.add_argument("--batch_size", type=int, default=8, help="批次大小")
|
||||
parser.add_argument("--lr", type=float, default=1e-4, help="学习率")
|
||||
parser.add_argument("--image_size", type=int, default=256, help="图像尺寸")
|
||||
parser.add_argument("--augment", action="store_true", help="启用数据增强")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# 设置日志
|
||||
logger = setup_logging()
|
||||
|
||||
# 读取配置文件获取数据目录
|
||||
config_path = Path(args.config)
|
||||
if not config_path.exists():
|
||||
logger.error(f"配置文件不存在: {config_path}")
|
||||
return False
|
||||
|
||||
with open(config_path, 'r', encoding='utf-8') as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
# 确定数据目录
|
||||
if args.data_dir:
|
||||
data_dir = args.data_dir
|
||||
else:
|
||||
# 从配置文件获取数据目录
|
||||
config_dir = config_path.parent
|
||||
layout_dir = config.get('paths', {}).get('layout_dir', 'data/layouts')
|
||||
data_dir = str(config_dir / layout_dir)
|
||||
|
||||
data_path = Path(data_dir)
|
||||
if not data_path.exists():
|
||||
logger.error(f"数据目录不存在: {data_path}")
|
||||
return False
|
||||
|
||||
logger.info(f"使用数据目录: {data_path}")
|
||||
logger.info(f"模型保存目录: {args.model_dir}")
|
||||
logger.info(f"生成数据目录: {args.output_dir}")
|
||||
logger.info(f"生成样本数量: {args.num_samples}")
|
||||
logger.info(f"训练比例: {args.ratio}")
|
||||
|
||||
# 1. 训练扩散模型(如果需要)
|
||||
if not args.skip_training:
|
||||
success = train_diffusion_model(
|
||||
data_dir=data_dir,
|
||||
model_dir=args.model_dir,
|
||||
logger=logger,
|
||||
image_size=args.image_size,
|
||||
batch_size=args.batch_size,
|
||||
epochs=args.epochs,
|
||||
lr=args.lr,
|
||||
num_samples=args.num_samples,
|
||||
augment=args.augment
|
||||
)
|
||||
if not success:
|
||||
logger.error("扩散模型训练失败")
|
||||
return False
|
||||
else:
|
||||
logger.info("跳过训练步骤")
|
||||
|
||||
# 2. 生成样本
|
||||
success = generate_samples(
|
||||
model_dir=args.model_dir,
|
||||
output_dir=args.output_dir,
|
||||
num_samples=args.num_samples,
|
||||
logger=logger,
|
||||
image_size=args.image_size
|
||||
)
|
||||
if not success:
|
||||
logger.error("样本生成失败")
|
||||
return False
|
||||
|
||||
# 3. 验证生成的数据
|
||||
if not validate_generated_data(args.output_dir, logger):
|
||||
logger.error("数据验证失败")
|
||||
return False
|
||||
|
||||
# 4. 更新配置文件
|
||||
update_config(
|
||||
config_path=args.config,
|
||||
output_dir=args.output_dir,
|
||||
ratio=args.ratio,
|
||||
logger=logger
|
||||
)
|
||||
|
||||
logger.info("=== 扩散数据生成管线完成 ===")
|
||||
logger.info(f"生成数据位置: {args.output_dir}")
|
||||
logger.info(f"配置文件已更新: {args.config}")
|
||||
logger.info(f"扩散数据比例: {args.ratio}")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
success = main()
|
||||
sys.exit(0 if success else 1)
|
||||
@@ -1,393 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
基于原始IC版图数据训练扩散模型,生成相似图像的完整实现。
|
||||
|
||||
使用DDPM (Denoising Diffusion Probabilistic Models)
|
||||
针对单通道灰度IC版图图像进行优化。
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.nn.functional as F
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
from pathlib import Path
|
||||
from torch.utils.data import Dataset, DataLoader
|
||||
from torchvision import transforms
|
||||
import logging
|
||||
|
||||
# 尝试导入tqdm,如果没有则使用简单的进度显示
|
||||
try:
|
||||
from tqdm import tqdm
|
||||
except ImportError:
|
||||
def tqdm(iterable, **kwargs):
|
||||
return iterable
|
||||
|
||||
|
||||
class ICDiffusionDataset(Dataset):
|
||||
"""IC版图扩散模型训练数据集"""
|
||||
|
||||
def __init__(self, image_dir, image_size=256, augment=True):
|
||||
self.image_dir = Path(image_dir)
|
||||
self.image_size = image_size
|
||||
|
||||
# 获取所有PNG图像
|
||||
self.image_paths = []
|
||||
for ext in ['*.png', '*.jpg', '*.jpeg']:
|
||||
self.image_paths.extend(list(self.image_dir.glob(ext)))
|
||||
|
||||
# 数据变换
|
||||
self.transform = transforms.Compose([
|
||||
transforms.Resize((image_size, image_size)),
|
||||
transforms.ToTensor(), # 转换到[0,1]范围
|
||||
])
|
||||
|
||||
# 数据增强
|
||||
self.augment = augment
|
||||
if augment:
|
||||
self.aug_transform = transforms.Compose([
|
||||
transforms.RandomHorizontalFlip(p=0.5),
|
||||
transforms.RandomVerticalFlip(p=0.5),
|
||||
transforms.RandomRotation(90, fill=0),
|
||||
])
|
||||
|
||||
def __len__(self):
|
||||
return len(self.image_paths)
|
||||
|
||||
def __getitem__(self, idx):
|
||||
img_path = self.image_paths[idx]
|
||||
image = Image.open(img_path).convert('L') # 确保是灰度图
|
||||
|
||||
# 基础变换
|
||||
image = self.transform(image)
|
||||
|
||||
# 数据增强
|
||||
if self.augment and np.random.random() > 0.5:
|
||||
image = self.aug_transform(image)
|
||||
|
||||
return image
|
||||
|
||||
|
||||
class UNet(nn.Module):
|
||||
"""简化的U-Net架构用于扩散模型"""
|
||||
|
||||
def __init__(self, in_channels=1, out_channels=1, time_dim=256):
|
||||
super().__init__()
|
||||
|
||||
# 时间嵌入
|
||||
self.time_mlp = nn.Sequential(
|
||||
nn.Linear(1, time_dim),
|
||||
nn.SiLU(),
|
||||
nn.Linear(time_dim, time_dim)
|
||||
)
|
||||
|
||||
# 编码器
|
||||
self.encoder = nn.ModuleList([
|
||||
nn.Conv2d(in_channels, 64, 3, padding=1),
|
||||
nn.Conv2d(64, 128, 3, stride=2, padding=1),
|
||||
nn.Conv2d(128, 256, 3, stride=2, padding=1),
|
||||
nn.Conv2d(256, 512, 3, stride=2, padding=1),
|
||||
])
|
||||
|
||||
# 中间层
|
||||
self.middle = nn.Sequential(
|
||||
nn.Conv2d(512, 512, 3, padding=1),
|
||||
nn.SiLU(),
|
||||
nn.Conv2d(512, 512, 3, padding=1)
|
||||
)
|
||||
|
||||
# 解码器
|
||||
self.decoder = nn.ModuleList([
|
||||
nn.ConvTranspose2d(512, 256, 3, stride=2, padding=1, output_padding=1),
|
||||
nn.ConvTranspose2d(256, 128, 3, stride=2, padding=1, output_padding=1),
|
||||
nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1),
|
||||
])
|
||||
|
||||
# 输出层
|
||||
self.output = nn.Conv2d(64, out_channels, 3, padding=1)
|
||||
|
||||
# 时间融合层
|
||||
self.time_fusion = nn.ModuleList([
|
||||
nn.Linear(time_dim, 64),
|
||||
nn.Linear(time_dim, 128),
|
||||
nn.Linear(time_dim, 256),
|
||||
nn.Linear(time_dim, 512),
|
||||
])
|
||||
|
||||
# 归一化层
|
||||
self.norms = nn.ModuleList([
|
||||
nn.GroupNorm(8, 64),
|
||||
nn.GroupNorm(8, 128),
|
||||
nn.GroupNorm(8, 256),
|
||||
nn.GroupNorm(8, 512),
|
||||
])
|
||||
|
||||
def forward(self, x, t):
|
||||
# 时间嵌入
|
||||
t_emb = self.time_mlp(t.float().unsqueeze(-1)) # [B, time_dim]
|
||||
|
||||
# 编码器路径
|
||||
skips = []
|
||||
for i, (conv, norm, fusion) in enumerate(zip(self.encoder, self.norms, self.time_fusion)):
|
||||
x = conv(x)
|
||||
x = norm(x)
|
||||
# 融合时间信息
|
||||
t_feat = fusion(t_emb).unsqueeze(-1).unsqueeze(-1)
|
||||
x = x + t_feat
|
||||
x = F.silu(x)
|
||||
skips.append(x)
|
||||
if i < len(self.encoder) - 1:
|
||||
x = F.silu(x)
|
||||
|
||||
# 中间层
|
||||
x = self.middle(x)
|
||||
x = F.silu(x)
|
||||
|
||||
# 解码器路径
|
||||
for i, (deconv, skip) in enumerate(zip(self.decoder, reversed(skips[:-1]))):
|
||||
x = deconv(x)
|
||||
x = x + skip # 跳跃连接
|
||||
x = F.silu(x)
|
||||
|
||||
# 输出
|
||||
x = self.output(x)
|
||||
return x
|
||||
|
||||
|
||||
class NoiseScheduler:
|
||||
"""噪声调度器"""
|
||||
|
||||
def __init__(self, num_timesteps=1000, beta_start=1e-4, beta_end=0.02):
|
||||
self.num_timesteps = num_timesteps
|
||||
|
||||
# beta调度
|
||||
self.betas = torch.linspace(beta_start, beta_end, num_timesteps)
|
||||
|
||||
# 预计算
|
||||
self.alphas = 1.0 - self.betas
|
||||
self.alphas_cumprod = torch.cumprod(self.alphas, axis=0)
|
||||
self.sqrt_alphas_cumprod = torch.sqrt(self.alphas_cumprod)
|
||||
self.sqrt_one_minus_alphas_cumprod = torch.sqrt(1.0 - self.alphas_cumprod)
|
||||
|
||||
def add_noise(self, x_0, t):
|
||||
"""向干净图像添加噪声"""
|
||||
noise = torch.randn_like(x_0)
|
||||
sqrt_alphas_cumprod_t = self.sqrt_alphas_cumprod[t].unsqueeze(-1).unsqueeze(-1).unsqueeze(-1)
|
||||
sqrt_one_minus_alphas_cumprod_t = self.sqrt_one_minus_alphas_cumprod[t].unsqueeze(-1).unsqueeze(-1).unsqueeze(-1)
|
||||
|
||||
return sqrt_alphas_cumprod_t * x_0 + sqrt_one_minus_alphas_cumprod_t * noise, noise
|
||||
|
||||
def sample_timestep(self, batch_size):
|
||||
"""采样时间步"""
|
||||
return torch.randint(0, self.num_timesteps, (batch_size,))
|
||||
|
||||
def step(self, model, x_t, t):
|
||||
"""单步去噪"""
|
||||
# 预测噪声
|
||||
predicted_noise = model(x_t, t)
|
||||
|
||||
# 计算系数
|
||||
alpha_t = self.alphas[t].unsqueeze(-1).unsqueeze(-1).unsqueeze(-1)
|
||||
sqrt_alpha_t = torch.sqrt(alpha_t)
|
||||
beta_t = self.betas[t].unsqueeze(-1).unsqueeze(-1).unsqueeze(-1)
|
||||
sqrt_one_minus_alpha_cumprod_t = self.sqrt_one_minus_alphas_cumprod[t].unsqueeze(-1).unsqueeze(-1).unsqueeze(-1)
|
||||
|
||||
# 计算均值
|
||||
model_mean = (1.0 / sqrt_alpha_t) * (x_t - (beta_t / sqrt_one_minus_alpha_cumprod_t) * predicted_noise)
|
||||
|
||||
if t.min() == 0:
|
||||
return model_mean
|
||||
else:
|
||||
noise = torch.randn_like(x_t)
|
||||
return model_mean + torch.sqrt(beta_t) * noise
|
||||
|
||||
|
||||
class DiffusionTrainer:
|
||||
"""扩散模型训练器"""
|
||||
|
||||
def __init__(self, model, scheduler, device='cuda'):
|
||||
self.model = model.to(device)
|
||||
self.scheduler = scheduler
|
||||
self.device = device
|
||||
self.loss_fn = nn.MSELoss()
|
||||
|
||||
def train_step(self, optimizer, dataloader):
|
||||
"""单步训练"""
|
||||
self.model.train()
|
||||
total_loss = 0
|
||||
|
||||
for batch in dataloader:
|
||||
batch = batch.to(self.device)
|
||||
|
||||
# 采样时间步
|
||||
t = self.scheduler.sample_timestep(batch.shape[0]).to(self.device)
|
||||
|
||||
# 添加噪声
|
||||
noisy_batch, noise = self.scheduler.add_noise(batch, t)
|
||||
|
||||
# 预测噪声
|
||||
predicted_noise = self.model(noisy_batch, t)
|
||||
|
||||
# 计算损失
|
||||
loss = self.loss_fn(predicted_noise, noise)
|
||||
|
||||
# 反向传播
|
||||
optimizer.zero_grad()
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
|
||||
total_loss += loss.item()
|
||||
|
||||
return total_loss / len(dataloader)
|
||||
|
||||
def generate(self, num_samples, image_size=256, save_dir=None):
|
||||
"""生成图像"""
|
||||
self.model.eval()
|
||||
|
||||
with torch.no_grad():
|
||||
# 从纯噪声开始
|
||||
x = torch.randn(num_samples, 1, image_size, image_size).to(self.device)
|
||||
|
||||
# 逐步去噪
|
||||
for t in reversed(range(self.scheduler.num_timesteps)):
|
||||
t_batch = torch.full((num_samples,), t, device=self.device)
|
||||
x = self.scheduler.step(self.model, x, t_batch)
|
||||
|
||||
# 限制到[0,1]范围
|
||||
x = torch.clamp(x, 0.0, 1.0)
|
||||
|
||||
# 保存图像
|
||||
if save_dir:
|
||||
save_dir = Path(save_dir)
|
||||
save_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for i in range(num_samples):
|
||||
img_tensor = x[i].cpu()
|
||||
img_array = (img_tensor.squeeze().numpy() * 255).astype(np.uint8)
|
||||
img = Image.fromarray(img_array, mode='L')
|
||||
img.save(save_dir / f"generated_{i:06d}.png")
|
||||
|
||||
return x.cpu()
|
||||
|
||||
|
||||
def train_diffusion_model(args):
|
||||
"""训练扩散模型的主函数"""
|
||||
# 设置日志
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 设备检查
|
||||
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||||
logger.info(f"使用设备: {device}")
|
||||
|
||||
# 创建数据集和数据加载器
|
||||
dataset = ICDiffusionDataset(args.data_dir, args.image_size, args.augment)
|
||||
dataloader = DataLoader(dataset, batch_size=args.batch_size, shuffle=True, num_workers=4)
|
||||
logger.info(f"数据集大小: {len(dataset)}")
|
||||
|
||||
# 创建模型和调度器
|
||||
model = UNet(in_channels=1, out_channels=1)
|
||||
scheduler = NoiseScheduler(num_timesteps=args.timesteps)
|
||||
trainer = DiffusionTrainer(model, scheduler, device)
|
||||
|
||||
# 优化器
|
||||
optimizer = torch.optim.Adam(model.parameters(), lr=args.lr)
|
||||
|
||||
# 训练循环
|
||||
logger.info(f"开始训练 {args.epochs} 个epoch...")
|
||||
for epoch in range(args.epochs):
|
||||
loss = trainer.train_step(optimizer, dataloader)
|
||||
logger.info(f"Epoch {epoch+1}/{args.epochs}, Loss: {loss:.6f}")
|
||||
|
||||
# 定期保存模型
|
||||
if (epoch + 1) % args.save_interval == 0:
|
||||
checkpoint = {
|
||||
'epoch': epoch,
|
||||
'model_state_dict': model.state_dict(),
|
||||
'optimizer_state_dict': optimizer.state_dict(),
|
||||
'loss': loss,
|
||||
}
|
||||
checkpoint_path = Path(args.output_dir) / f"diffusion_epoch_{epoch+1}.pth"
|
||||
checkpoint_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
torch.save(checkpoint, checkpoint_path)
|
||||
logger.info(f"保存检查点: {checkpoint_path}")
|
||||
|
||||
# 生成样本
|
||||
logger.info("生成示例图像...")
|
||||
trainer.generate(
|
||||
num_samples=args.num_samples,
|
||||
image_size=args.image_size,
|
||||
save_dir=os.path.join(args.output_dir, 'samples')
|
||||
)
|
||||
|
||||
# 保存最终模型
|
||||
final_checkpoint = {
|
||||
'epoch': args.epochs,
|
||||
'model_state_dict': model.state_dict(),
|
||||
'optimizer_state_dict': optimizer.state_dict(),
|
||||
'loss': loss,
|
||||
}
|
||||
final_path = Path(args.output_dir) / "diffusion_final.pth"
|
||||
torch.save(final_checkpoint, final_path)
|
||||
logger.info(f"训练完成,最终模型保存在: {final_path}")
|
||||
|
||||
|
||||
def generate_with_trained_model(args):
|
||||
"""使用训练好的模型生成图像"""
|
||||
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||||
|
||||
# 加载模型
|
||||
model = UNet(in_channels=1, out_channels=1)
|
||||
checkpoint = torch.load(args.checkpoint, map_location=device)
|
||||
model.load_state_dict(checkpoint['model_state_dict'])
|
||||
model.to(device)
|
||||
|
||||
# 创建调度器和训练器
|
||||
scheduler = NoiseScheduler(num_timesteps=args.timesteps)
|
||||
trainer = DiffusionTrainer(model, scheduler, device)
|
||||
|
||||
# 生成图像
|
||||
trainer.generate(
|
||||
num_samples=args.num_samples,
|
||||
image_size=args.image_size,
|
||||
save_dir=args.output_dir
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="IC版图扩散模型训练和生成")
|
||||
subparsers = parser.add_subparsers(dest='command', help='命令')
|
||||
|
||||
# 训练命令
|
||||
train_parser = subparsers.add_parser('train', help='训练扩散模型')
|
||||
train_parser.add_argument('--data_dir', type=str, required=True, help='训练数据目录')
|
||||
train_parser.add_argument('--output_dir', type=str, required=True, help='输出目录')
|
||||
train_parser.add_argument('--image_size', type=int, default=256, help='图像尺寸')
|
||||
train_parser.add_argument('--batch_size', type=int, default=8, help='批次大小')
|
||||
train_parser.add_argument('--epochs', type=int, default=100, help='训练轮数')
|
||||
train_parser.add_argument('--lr', type=float, default=1e-4, help='学习率')
|
||||
train_parser.add_argument('--timesteps', type=int, default=1000, help='扩散时间步数')
|
||||
train_parser.add_argument('--num_samples', type=int, default=50, help='生成的样本数量')
|
||||
train_parser.add_argument('--save_interval', type=int, default=10, help='保存间隔')
|
||||
train_parser.add_argument('--augment', action='store_true', help='启用数据增强')
|
||||
|
||||
# 生成命令
|
||||
gen_parser = subparsers.add_parser('generate', help='使用训练好的模型生成图像')
|
||||
gen_parser.add_argument('--checkpoint', type=str, required=True, help='模型检查点路径')
|
||||
gen_parser.add_argument('--output_dir', type=str, required=True, help='输出目录')
|
||||
gen_parser.add_argument('--num_samples', type=int, default=200, help='生成样本数量')
|
||||
gen_parser.add_argument('--image_size', type=int, default=256, help='图像尺寸')
|
||||
gen_parser.add_argument('--timesteps', type=int, default=1000, help='扩散时间步数')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.command == 'train':
|
||||
train_diffusion_model(args)
|
||||
elif args.command == 'generate':
|
||||
generate_with_trained_model(args)
|
||||
else:
|
||||
parser.print_help()
|
||||
@@ -1,46 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Prepare raster patch dataset and optional condition maps for diffusion training.
|
||||
|
||||
Planned inputs:
|
||||
- --src_dirs: one or more directories containing PNG layout images
|
||||
- --out_dir: output root for images/ and conditions/
|
||||
- --size: patch size (e.g., 256)
|
||||
- --stride: sliding stride for patch extraction
|
||||
- --min_fg_ratio: minimum foreground ratio to keep a patch (0-1)
|
||||
- --make_conditions: flags to generate edge/skeleton/distance maps
|
||||
|
||||
Current status: CLI skeleton and TODOs only.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Prepare patch dataset for diffusion training (skeleton)")
|
||||
parser.add_argument("--src_dirs", type=str, nargs="+", help="Source PNG dirs for layouts")
|
||||
parser.add_argument("--out_dir", type=str, required=True, help="Output root directory")
|
||||
parser.add_argument("--size", type=int, default=256, help="Patch size")
|
||||
parser.add_argument("--stride", type=int, default=256, help="Patch stride")
|
||||
parser.add_argument("--min_fg_ratio", type=float, default=0.02, help="Min foreground ratio to keep a patch")
|
||||
parser.add_argument("--make_edge", action="store_true", help="Generate edge map conditions (e.g., Sobel/Canny)")
|
||||
parser.add_argument("--make_skeleton", action="store_true", help="Generate morphological skeleton condition")
|
||||
parser.add_argument("--make_dist", action="store_true", help="Generate distance transform condition")
|
||||
args = parser.parse_args()
|
||||
|
||||
out_root = Path(args.out_dir)
|
||||
out_root.mkdir(parents=True, exist_ok=True)
|
||||
(out_root / "images").mkdir(exist_ok=True)
|
||||
(out_root / "conditions").mkdir(exist_ok=True)
|
||||
|
||||
# TODO: implement extraction loop over src_dirs, crop patches, filter by min_fg_ratio,
|
||||
# and save into images/; generate optional condition maps into conditions/ mirroring filenames.
|
||||
# Keep file naming consistent: images/xxx.png, conditions/xxx_edge.png, etc.
|
||||
|
||||
print("[TODO] Implement patch extraction and condition map generation.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,38 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Sample layout patches using a trained diffusion model (skeleton).
|
||||
|
||||
Outputs raster PNGs into a target directory compatible with current training pipeline (no H pairing).
|
||||
|
||||
Current status: CLI skeleton and TODOs only.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Sample layout patches from diffusion model (skeleton)")
|
||||
parser.add_argument("--ckpt", type=str, required=True, help="Path to trained diffusion checkpoint or HF repo id")
|
||||
parser.add_argument("--out_dir", type=str, required=True, help="Directory to write sampled PNGs")
|
||||
parser.add_argument("--num", type=int, default=200)
|
||||
parser.add_argument("--image_size", type=int, default=256)
|
||||
parser.add_argument("--guidance", type=float, default=5.0)
|
||||
parser.add_argument("--steps", type=int, default=50)
|
||||
parser.add_argument("--seed", type=int, default=42)
|
||||
parser.add_argument("--cond_dir", type=str, default=None, help="Optional condition maps directory")
|
||||
parser.add_argument("--cond_types", type=str, nargs="*", default=None, help="e.g., edge skeleton dist")
|
||||
args = parser.parse_args()
|
||||
|
||||
out_dir = Path(args.out_dir)
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# TODO: load pipeline from ckpt, set scheduler, handle conditions if provided,
|
||||
# sample args.num images, save as PNG files into out_dir.
|
||||
|
||||
print("[TODO] Implement diffusion sampling and PNG saving.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,37 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Train a diffusion model for layout patch generation (skeleton).
|
||||
|
||||
Planned: fine-tune Stable Diffusion (or Latent Diffusion) with optional ControlNet edge/skeleton conditions.
|
||||
|
||||
Dependencies to consider: diffusers, transformers, accelerate, torch, torchvision, opencv-python.
|
||||
|
||||
Current status: CLI skeleton and TODOs only.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Train diffusion model for layout patches (skeleton)")
|
||||
parser.add_argument("--data_dir", type=str, required=True, help="Prepared dataset root (images/ + conditions/)")
|
||||
parser.add_argument("--output_dir", type=str, required=True, help="Checkpoint output directory")
|
||||
parser.add_argument("--image_size", type=int, default=256)
|
||||
parser.add_argument("--batch_size", type=int, default=8)
|
||||
parser.add_argument("--lr", type=float, default=1e-4)
|
||||
parser.add_argument("--max_steps", type=int, default=100000)
|
||||
parser.add_argument("--use_controlnet", action="store_true", help="Train with ControlNet conditioning")
|
||||
parser.add_argument("--condition_types", type=str, nargs="*", default=["edge"], help="e.g., edge skeleton dist")
|
||||
args = parser.parse_args()
|
||||
|
||||
# TODO: implement dataset/dataloader (images and optional conditions)
|
||||
# TODO: load base pipeline (Stable Diffusion or Latent Diffusion) and optionally ControlNet
|
||||
# TODO: set up optimizer, LR schedule, EMA, gradient accumulation, and run training loop
|
||||
# TODO: save periodic checkpoints to output_dir
|
||||
|
||||
print("[TODO] Implement diffusion training loop and checkpoints.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
102
tools/rasterize.py
Normal file
102
tools/rasterize.py
Normal file
@@ -0,0 +1,102 @@
|
||||
import pya
|
||||
import os
|
||||
|
||||
def rasterize_final(gds_path, output_dir, width_px=256):
|
||||
# --- 1. 检查与设置 ---
|
||||
if not os.path.exists(gds_path):
|
||||
print(f"Error: File not found: {gds_path}")
|
||||
return
|
||||
|
||||
if not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir)
|
||||
|
||||
gds_basename = os.path.splitext(os.path.basename(gds_path))[0]
|
||||
print(f"Processing: {gds_basename}")
|
||||
|
||||
# --- 2. 加载 Layout ---
|
||||
layout = pya.Layout()
|
||||
layout.read(gds_path)
|
||||
top_cell = layout.top_cell()
|
||||
|
||||
if top_cell is None:
|
||||
print("Error: No top cell found.")
|
||||
return
|
||||
|
||||
# [核心修复] 使用 dbbox() 获取微米(Micron)单位的边框
|
||||
# bbox() 返回的是 DBU (Database Units, 整数),View 可能会把它当做微米导致比例尺错误
|
||||
global_dbbox = top_cell.dbbox()
|
||||
|
||||
print(f"Global BBox (Microns): {global_dbbox}")
|
||||
print(f"Width: {global_dbbox.width()} um, Height: {global_dbbox.height()} um")
|
||||
|
||||
if global_dbbox.width() <= 0:
|
||||
print("Error: Layout is empty or zero width.")
|
||||
return
|
||||
|
||||
# 计算分辨率
|
||||
aspect_ratio = global_dbbox.height() / global_dbbox.width()
|
||||
height_px = int(width_px * aspect_ratio)
|
||||
height_px = max(1, height_px)
|
||||
|
||||
# --- 3. 初始化视图 ---
|
||||
view = pya.LayoutView()
|
||||
view.show_layout(layout, False)
|
||||
view.max_hier_levels = 1000
|
||||
|
||||
# 设置为黑底(用于正式输出)
|
||||
view.set_config("background-color", "#000000")
|
||||
view.set_config("grid-visible", "false")
|
||||
|
||||
layer_indices = layout.layer_indices()
|
||||
saved_count = 0
|
||||
|
||||
for layer_idx in layer_indices:
|
||||
# 检查内容 (注意:bbox_per_layer 也要看情况,这里我们直接渲染不设防)
|
||||
# 为了效率,可以先检查该层是否为空
|
||||
if top_cell.bbox_per_layer(layer_idx).empty():
|
||||
continue
|
||||
|
||||
layer_info = layout.get_info(layer_idx)
|
||||
|
||||
# 输出文件名
|
||||
filename = f"{gds_basename}_{layer_info.layer}_{layer_info.datatype}.png"
|
||||
full_output_path = os.path.join(output_dir, filename)
|
||||
|
||||
# --- 4. 配置图层 ---
|
||||
iter = view.begin_layers()
|
||||
while not iter.at_end():
|
||||
view.delete_layer(iter)
|
||||
|
||||
props = pya.LayerPropertiesNode()
|
||||
props.source_layer_index = layer_idx
|
||||
|
||||
# 实心填充
|
||||
props.dither_pattern = 0
|
||||
|
||||
# 白色填充 + 白色边框
|
||||
props.fill_color = 0xFFFFFF
|
||||
props.frame_color = 0xFFFFFF
|
||||
|
||||
# 稍微加粗一点边框,保证极细线条也能被渲染
|
||||
props.width = 0
|
||||
props.visible = True
|
||||
|
||||
view.insert_layer(view.end_layers(), props)
|
||||
|
||||
# [核心修复] 使用微米坐标 Zoom
|
||||
view.zoom_box(global_dbbox)
|
||||
|
||||
# 保存
|
||||
view.save_image(full_output_path, width_px, height_px)
|
||||
print(f"Saved: {filename}")
|
||||
saved_count += 1
|
||||
|
||||
print(f"Done. Generated {saved_count} images.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 请替换为你的实际路径
|
||||
input_gds = "/home/jiao77/Documents/data/ICCAD2019/layout/patid_MX_Benchmark2_clip_hotspot1_11_orig_0.gds"
|
||||
output_folder = "out/final_images"
|
||||
resolution_width = 256
|
||||
|
||||
rasterize_final(input_gds, output_folder, resolution_width)
|
||||
Reference in New Issue
Block a user