import os
import argparse
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import time
def check_tile_continuity_practical(base_path, target_z_levels=None):
“””
检查瓦片文件的连续性(基于实际存在的X文件夹,检查Y的连续性包括边界)
Args:
base_path: 瓦片文件根目录
target_z_levels: 要检查的z层级列表,默认为[10, 12, 14, 16]
Returns:
dict: 包含检查结果的字典
“””
if target_z_levels is None:
target_z_levels = [10, 12, 14, 16]
results = {}
for z in target_z_levels:
z_path = Path(base_path) / str(z)
print(f”正在检查层级 z={z}…”)
if not z_path.exists():
print(f” 警告: 层级 {z} 的目录不存在: {z_path}”)
results[z] = {‘status’: ‘missing’, ‘message’: f’目录不存在: {z_path}’}
continue
# 获取所有x子目录
x_dirs = [d for d in z_path.iterdir() if d.is_dir() and d.name.isdigit()]
if not x_dirs:
print(f” 警告: 层级 {z} 下没有找到x子目录”)
results[z] = {‘status’: ’empty’, ‘message’: ‘没有x子目录’}
continue
x_dirs.sort(key=lambda x: int(x.name))
x_values = [int(x_dir.name) for x_dir in x_dirs]
print(f” 实际X范围: {min(x_values)}-{max(x_values)}, 共{len(x_values)}个目录”)
# 检测该层级的Y理论范围(基于所有X目录的Y文件)
y_global_range = detect_y_global_range(x_dirs)
if y_global_range:
print(f” 检测到Y全局范围: {y_global_range[0]}-{y_global_range[1]}”)
else:
print(f” 警告: 无法检测到Y全局范围”)
y_global_range = (0, 0)
y_continuity_issues = []
total_tiles = 0
missing_tiles = 0
# 检查每个x目录下的y文件连续性
for x_dir in x_dirs:
x_value = int(x_dir.name)
# 获取所有y.png文件
y_files = list(x_dir.glob(“*.png”))
y_values = []
for y_file in y_files:
if y_file.stem.isdigit():
y_values.append(int(y_file.stem))
if not y_values:
# 空目录 – 如果检测到全局范围,则整个范围都缺失
if y_global_range and y_global_range[0] != y_global_range[1]:
all_missing_y = list(range(y_global_range[0], y_global_range[1] + 1))
missing_tiles += len(all_missing_y)
y_continuity_issues.append({
‘x’: x_value,
‘missing_y’: all_missing_y,
‘y_range’: y_global_range,
‘expected_count’: y_global_range[1] – y_global_range[0] + 1,
‘actual_count’: 0,
‘note’: ‘空目录’
})
else:
y_continuity_issues.append({
‘x’: x_value,
‘missing_y’: [],
‘y_range’: (0, 0),
‘expected_count’: 0,
‘actual_count’: 0,
‘note’: ‘空目录’
})
continue
y_values.sort()
# 使用全局范围检查Y的连续性(包括边界)
if y_global_range and y_global_range[0] != y_global_range[1]:
y_min, y_max = y_global_range
y_missing = check_number_continuity_with_bounds(y_values, y_min, y_max)
expected_count = y_max – y_min + 1
else:
# 如果没有全局范围,使用该目录的实际范围
y_min, y_max = min(y_values), max(y_values)
y_missing = check_number_continuity(y_values)
expected_count = len(y_values) + len(y_missing)
total_tiles += len(y_values)
if y_missing:
missing_tiles += len(y_missing)
y_continuity_issues.append({
‘x’: x_value,
‘missing_y’: y_missing,
‘y_range’: (y_min, y_max),
‘expected_count’: expected_count,
‘actual_count’: len(y_values),
‘note’: ‘部分缺失’
})
# 汇总结果
results[z] = {
‘status’: ‘checked’,
‘actual_x_range’: (min(x_values), max(x_values)) if x_values else (0, 0),
‘y_global_range’: y_global_range,
‘x_count’: len(x_values),
‘x_missing’: [], # 不检查X的连续性,只基于实际存在的X目录
‘total_tiles’: total_tiles,
‘missing_tiles’: missing_tiles,
‘y_continuity_issues’: y_continuity_issues,
‘completeness_rate’: (total_tiles / (total_tiles + missing_tiles)) * 100 if (total_tiles + missing_tiles) > 0 else 0,
‘method’: ‘practical’
}
return results
def detect_y_global_range(x_dirs, sample_limit=30):
“””
检测该层级所有X目录的Y全局范围(不扩展边界)
Args:
x_dirs: X目录列表
sample_limit: 最大抽样目录数量
Returns:
tuple: (y_min, y_max) 或 None(如果无法检测)
“””
all_y_values = []
# 等间距抽样以提高覆盖率
sample_count = min(sample_limit, len(x_dirs))
if len(x_dirs) <= sample_count:
sampled_dirs = x_dirs # 目录不多时全量检查
else:
step = len(x_dirs) // sample_count
sampled_dirs = x_dirs[::step] + [x_dirs[-1]] # 确保包含最后一个目录
print(f" 抽样检查 {len(sampled_dirs)} 个X目录的Y范围")
for i, x_dir in enumerate(sampled_dirs):
y_files = list(x_dir.glob("*.png"))
y_values = [int(f.stem) for f in y_files if f.stem.isdigit()]
if y_values:
dir_min = min(y_values)
dir_max = max(y_values)
all_y_values.extend(y_values)
if i < 3: # 显示前3个目录的范围作为参考
print(f" X={x_dir.name}: Y范围 {dir_min}-{dir_max}")
if not all_y_values:
return None
# 直接使用检测到的最小最大值,不扩展边界
y_min = min(all_y_values)
y_max = max(all_y_values)
print(f" 最终Y全局范围: {y_min}-{y_max}")
return (y_min, y_max)
def check_number_continuity_with_bounds(numbers, expected_min, expected_max):
"""
检查数字序列的连续性,包括边界检测
Args:
numbers: 现有的数字列表
expected_min: 预期的最小值
expected_max: 预期的最大值
Returns:
list: 缺失的数字列表(包括边界缺失)
"""
if not numbers:
return list(range(expected_min, expected_max + 1))
existing_set = set(numbers)
expected_set = set(range(expected_min, expected_max + 1))
missing = sorted(list(expected_set - existing_set))
return missing
def check_number_continuity(numbers):
"""
检查数字序列的连续性(基于实际范围)
Args:
numbers: 排序后的数字列表
Returns:
list: 缺失的数字列表
"""
if not numbers:
return []
min_val = min(numbers)
max_val = max(numbers)
expected_set = set(range(min_val, max_val + 1))
actual_set = set(numbers)
missing = sorted(list(expected_set - actual_set))
return missing
def print_results(results):
"""打印检查结果"""
print("\n" + "="*80)
print("瓦片文件连续性检查报告(基于实际X目录,包含Y边界检测)")
print("="*80)
total_actual_tiles = 0
total_missing_tiles = 0
for z, data in results.items():
print(f"\n层级 z={z}:")
if data['status'] != 'checked':
print(f" ⚠️ {data['message']}")
continue
print(f" 实际X范围: {data['actual_x_range'][0]}-{data['actual_x_range'][1]}")
if data['y_global_range']:
print(f" Y全局范围: {data['y_global_range'][0]}-{data['y_global_range'][1]}")
print(f" X目录数量: {data['x_count']}")
print(f" 总瓦片数: {data['total_tiles']:,}")
print(f" 缺失瓦片数: {data['missing_tiles']:,}")
print(f" 完整度: {data['completeness_rate']:.2f}%")
total_actual_tiles += data['total_tiles']
total_missing_tiles += data['missing_tiles']
if data['y_continuity_issues']:
empty_dirs = sum(1 for issue in data['y_continuity_issues'] if issue.get('note') == '空目录')
partial_dirs = sum(1 for issue in data['y_continuity_issues'] if issue.get('note') == '部分缺失')
print(f" ⚠️ 发现 {len(data['y_continuity_issues'])} 个问题目录:")
if empty_dirs > 0:
print(f” 空目录: {empty_dirs}个”)
if partial_dirs > 0:
print(f” Y不连续目录: {partial_dirs}个”)
# 显示部分缺失目录的详细信息
partial_issues = [issue for issue in data[‘y_continuity_issues’] if issue.get(‘note’) == ‘部分缺失’]
if partial_issues:
print(f”\n Y不连续目录详情:”)
for issue in partial_issues[:5]: # 显示前5个
boundary_missing = []
if issue[‘missing_y’]:
y_min, y_max = issue[‘y_range’]
boundary_missing = [y for y in issue[‘missing_y’] if y == y_min or y == y_max]
print(f” X={issue[‘x’]}: Y范围{issue[‘y_range’]}, 缺失{len(issue[‘missing_y’])}个文件”)
if boundary_missing:
print(f” 边界缺失: {boundary_missing}”)
if len(issue[‘missing_y’]) <= 10:
print(f" 所有缺失Y: {issue['missing_y']}")
else:
print(f" 缺失Y示例: {issue['missing_y'][:5]} ... {issue['missing_y'][-5:]}")
if len(partial_issues) > 5:
print(f” … 还有 {len(partial_issues) – 5} 个目录”)
else:
print(f” ✅ 所有Y轴文件连续完整”)
# 汇总统计
print(“\n” + “=”*80)
print(“汇总统计”)
print(“=”*80)
total_files = total_actual_tiles + total_missing_tiles
overall_completeness = (total_actual_tiles / total_files) * 100 if total_files > 0 else 0
print(f”总瓦片数: {total_files:,}”)
print(f”实际瓦片数: {total_actual_tiles:,}”)
print(f”缺失瓦片数: {total_missing_tiles:,}”)
print(f”总体完整度: {overall_completeness:.2f}%”)
def save_results_to_txt(results, output_file=”tile_check_results.txt”):
“””将详细检查结果保存到txt文件”””
with open(output_file, ‘w’, encoding=’utf-8′) as f:
f.write(“=” * 80 + “\n”)
f.write(“瓦片文件连续性检查报告(基于实际X目录,包含Y边界检测)\n”)
f.write(“=” * 80 + “\n”)
f.write(f”生成时间: {time.strftime(‘%Y-%m-%d %H:%M:%S’)}\n”)
f.write(“说明: 基于实际存在的X目录检查Y文件的连续性,包含边界检测\n\n”)
total_actual_tiles = 0
total_missing_tiles = 0
checked_levels = 0
for z, data in results.items():
f.write(f”\n层级 z={z}:\n”)
f.write(“-” * 50 + “\n”)
if data[‘status’] != ‘checked’:
f.write(f”状态: {data[‘message’]}\n”)
continue
checked_levels += 1
f.write(f”实际X范围: {data[‘actual_x_range’][0]}-{data[‘actual_x_range’][1]}\n”)
if data[‘y_global_range’]:
f.write(f”Y全局范围: {data[‘y_global_range’][0]}-{data[‘y_global_range’][1]}\n”)
f.write(f”X目录数量: {data[‘x_count’]}\n”)
f.write(f”总瓦片数: {data[‘total_tiles’]:,}\n”)
f.write(f”缺失瓦片数: {data[‘missing_tiles’]:,}\n”)
f.write(f”完整度: {data[‘completeness_rate’]:.2f}%\n”)
total_actual_tiles += data[‘total_tiles’]
total_missing_tiles += data[‘missing_tiles’]
# Y轴连续性问题
if data[‘y_continuity_issues’]:
empty_dirs = [issue for issue in data[‘y_continuity_issues’] if issue.get(‘note’) == ‘空目录’]
partial_dirs = [issue for issue in data[‘y_continuity_issues’] if issue.get(‘note’) == ‘部分缺失’]
f.write(f”\nY轴连续性问题:\n”)
f.write(f” 空目录: {len(empty_dirs)}个\n”)
f.write(f” Y不连续目录: {len(partial_dirs)}个\n”)
# 详细列出空目录
if empty_dirs:
f.write(f”\n 空目录列表:\n”)
for issue in empty_dirs:
f.write(f” X={issue[‘x’]}\n”)
# 详细列出部分缺失目录
if partial_dirs:
f.write(f”\n Y不连续目录详情:\n”)
for issue in partial_dirs:
# 检查边界缺失
boundary_missing = []
if issue[‘missing_y’]:
y_min, y_max = issue[‘y_range’]
boundary_missing = [y for y in issue[‘missing_y’] if y == y_min or y == y_max]
f.write(f” X={issue[‘x’]}: Y范围{issue[‘y_range’]}, 应有{issue[‘expected_count’]}个, 实有{issue[‘actual_count’]}个, 缺失{len(issue[‘missing_y’])}个\n”)
if boundary_missing:
f.write(f” 边界缺失: {boundary_missing}\n”)
if len(issue[‘missing_y’]) <= 20:
f.write(f" 所有缺失Y: {issue['missing_y']}\n")
else:
f.write(f" 缺失Y: {issue['missing_y'][:10]} ... {issue['missing_y'][-10:]} (共{len(issue['missing_y'])}个)\n")
else:
f.write("Y轴文件: 连续完整 ✓\n")
f.write("\n")
# 汇总统计
f.write("\n" + "=" * 80 + "\n")
f.write("汇总统计\n")
f.write("=" * 80 + "\n")
f.write(f"检查层级数量: {checked_levels}\n")
total_files = total_actual_tiles + total_missing_tiles
f.write(f"总瓦片数: {total_files:,}\n")
f.write(f"实际瓦片数: {total_actual_tiles:,}\n")
f.write(f"缺失瓦片数: {total_missing_tiles:,}\n")
if total_files > 0:
overall_completeness = (total_actual_tiles / total_files) * 100
f.write(f”总体完整度: {overall_completeness:.2f}%\n”)
print(f”详细报告已保存到: {output_file}”)
def save_missing_list(results, output_file=”missing_tiles.txt”):
“””保存缺失瓦片列表到文件”””
with open(output_file, ‘w’, encoding=’utf-8′) as f:
f.write(“# 缺失瓦片列表(基于实际X目录,包含Y边界检测)\n”)
f.write(“# 格式: z/x/y\n”)
f.write(“# 生成时间: {}\n”.format(time.strftime(“%Y-%m-%d %H:%M:%S”)))
f.write(“# 说明: 列出实际存在的X目录中缺失的Y文件,包括边界缺失\n”)
f.write(“=”*50 + “\n”)
total_missing = 0
for z, data in results.items():
if data[‘status’] == ‘checked’ and data[‘y_continuity_issues’]:
f.write(f”\n# 层级 z={z}\n”)
# 记录所有缺失的Y文件
for issue in data[‘y_continuity_issues’]:
if issue.get(‘note’) in [‘部分缺失’, ‘空目录’]:
for missing_y in issue[‘missing_y’]:
f.write(f”{z}/{issue[‘x’]}/{missing_y}\n”)
total_missing += 1
f.write(f”\n# 总共缺失 {total_missing} 个瓦片文件\n”)
print(f”缺失列表已保存到: {output_file}”)
def main():
parser = argparse.ArgumentParser(description=’检查瓦片文件连续性(基于实际X目录,包含Y边界检测)’)
parser.add_argument(‘path’, help=’瓦片文件根目录路径’)
parser.add_argument(‘-z’, ‘–zoom-levels’, nargs=’+’, type=int,
default=[10, 12, 14, 16], help=’要检查的z层级,默认为10 12 14 16′)
parser.add_argument(‘-o’, ‘–output’, default=’tile_check_results.txt’,
help=’输出结果文件名’)
args = parser.parse_args()
if not os.path.exists(args.path):
print(f”错误: 路径不存在: {args.path}”)
return
print(f”检查路径: {args.path}”)
print(f”检查层级: {args.zoom_levels}”)
print(“检查方法: 基于实际X目录检查Y连续性(包含边界检测)”)
print(“开始检查…” + “\n”)
start_time = time.time()
results = check_tile_continuity_practical(args.path, args.zoom_levels)
end_time = time.time()
print_results(results)
# 保存详细报告
save_results_to_txt(results, args.output)
# 保存缺失列表
missing_list_file = args.output.replace(‘.txt’, ‘_missing_list.txt’)
save_missing_list(results, missing_list_file)
print(f”\n检查完成! 耗时: {end_time – start_time:.2f}秒”)
if __name__ == “__main__”:
base_path = r”E:\Map\Layer_2025-09-27_231600\Layer”
# 使用改进的方法检查
print(“=== 基于实际X目录检查Y连续性(包含边界检测)===”)
results = check_tile_continuity_practical(base_path)
print_results(results)
save_results_to_txt(results, “improved_check_results.txt”)
save_missing_list(results, “improved_missing_list.txt”)
