OpenCode技能系统基础:DSL解析器详细方案
我们专注于技能DSL解析器的详细设计方案,包括UML建模和具体实现。
考虑到时间限制,需要高效地设计一个可扩展的解析器,能够处理YAML和Markdown混合的DSL。
步骤:
-
设计技能元数据Schema(使用Pydantic进行验证)
-
设计DSL解析器,能够解析YAML frontmatter和Markdown正文
-
创建技能验证器,验证技能语法和语义
将使用以下技术栈:
-
Pydantic:用于数据验证和设置管理
-
PyYAML:用于解析YAML
-
Markdown:用于解析Markdown(如果需要提取结构)
-
自定义解析逻辑
由于技能DSL是YAML和Markdown的混合,假设结构如下:
name: skill-name
version: 1.0.0
description: A brief description
author: Your Name
tags:
- tag1
- tag2
inputs:
input1:
type: string
description: "输入描述"
required: true
outputs:
output1:
type: string
description: "输出描述"
steps:
- step1:
action: "action1"
parameters:
param1: value1
- step2:
action: "action2"
parameters:
param2: value2
---
这里是Markdown正文,用于详细描述技能的使用方法、示例等。
也可以包含一些代码块,用于表示技能的步骤或配置。
注意:使用---作为YAML和Markdown的分隔符。
设计思路:
-
读取文件内容,按
---分割成两部分(如果不存在分隔符,则只有YAML部分,Markdown部分为空) -
第一部分用YAML解析,得到元数据和步骤
-
第二部分保留为Markdown字符串,用于文档展示
但是,技能DSL需要被机器解析,因此步骤部分必须结构化。因此,我们将步骤定义在YAML部分,而Markdown部分仅作为补充文档。
接下来,将进行UML建模(类图)和详细设计。
OpenCode技能系统基础:DSL解析器详细方案
我将为你提供一个完整、深入的技能DSL解析器设计方案,涵盖UML建模、Schema设计、解析器实现和验证器创建。
一、系统架构设计:UML建模
1. 核心类图设计
2. 解析流程序列图
二、技能元数据Schema设计
1. 核心Schema定义(基于JSON Schema)
# schema/skill-schema-v1.0.0.yaml
$schema: "http://json-schema.org/draft-07/schema#"
$id: "https://opencode.ai/schemas/skill/v1.0.0"
title: "OpenCode Skill Schema"
description: "Schema for defining OpenCode skills"
type: "object"
definitions:
# 元数据定义
metadata:
type: "object"
properties:
name:
type: "string"
pattern: "^[a-z0-9-_]+$"
minLength: 3
maxLength: 50
description: "Unique identifier for the skill"
version:
type: "string"
pattern: "^d+.d+.d+$"
description: "Semantic version of the skill"
description:
type: "string"
minLength: 10
maxLength: 200
description: "Brief description of what the skill does"
author:
type: "string"
description: "Author or maintainer of the skill"
tags:
type: "array"
items:
type: "string"
pattern: "^[a-z0-9-]+$"
uniqueItems: true
maxItems: 10
description: "Keywords for categorization and search"
created_at:
type: "string"
format: "date-time"
description: "Creation timestamp"
updated_at:
type: "string"
format: "date-time"
description: "Last update timestamp"
requirements:
$ref: "#/definitions/requirements"
required: ["name", "version", "description"]
additionalProperties: false
# 依赖要求定义
requirements:
type: "object"
properties:
opencode_version:
type: "string"
pattern: "^d+.d+.d+$"
description: "Minimum OpenCode version required"
tools:
type: "array"
items:
type: "object"
properties:
name:
type: "string"
version:
type: "string"
optional:
type: "boolean"
default: false
permissions:
type: "array"
items:
type: "string"
enum: ["file.read", "file.write", "network", "process"]
python_packages:
type: "array"
items:
type: "object"
properties:
name:
type: "string"
version:
type: "string"
optional:
type: "boolean"
default: false
# 输入参数定义
input_schema:
type: "object"
patternProperties:
"^[a-z][a-z0-9_]*$":
type: "object"
properties:
type:
type: "string"
enum: ["string", "integer", "number", "boolean", "array", "object"]
description:
type: "string"
minLength: 5
required:
type: "boolean"
default: true
default:
oneOf:
- {type: "string"}
- {type: "number"}
- {type: "boolean"}
- {type: "array"}
- {type: "object"}
choices:
type: "array"
minItems: 1
validation:
type: "object"
properties:
pattern:
type: "string"
min:
type: "number"
max:
type: "number"
minLength:
type: "integer"
maxLength:
type: "integer"
examples:
type: "array"
items:
type: "object"
properties:
value:
oneOf:
- {type: "string"}
- {type: "number"}
- {type: "boolean"}
description:
type: "string"
required: ["type", "description"]
additionalProperties: false
# 步骤定义
step_schema:
type: "object"
properties:
name:
type: "string"
pattern: "^[a-z][a-z0-9_]*$"
type:
type: "string"
enum: ["command", "script", "template", "condition", "loop", "parallel"]
description:
type: "string"
minLength: 10
condition:
type: "string"
description: "JavaScript-like condition expression"
parameters:
type: "object"
additionalProperties: true
template:
type: "string"
description: "Jinja2 template for content generation"
script:
type: "string"
description: "JavaScript/Python script to execute"
outputs:
type: "object"
patternProperties:
"^[a-z][a-z0-9_]*$":
type: "string"
description: "Output variable mapping"
timeout:
type: "integer"
minimum: 1
maximum: 3600
default: 300
retry:
type: "object"
properties:
attempts:
type: "integer"
minimum: 0
maximum: 5
delay:
type: "integer"
minimum: 1
backoff_factor:
type: "number"
minimum: 1.0
required: ["name", "type"]
additionalProperties: false
# 输出定义
output_schema:
type: "object"
patternProperties:
"^[a-z][a-z0-9_]*$":
type: "object"
properties:
description:
type: "string"
type:
type: "string"
value:
type: "string"
description: "JavaScript expression to compute output"
additionalProperties: false
# 根级Schema
properties:
metadata:
$ref: "#/definitions/metadata"
inputs:
$ref: "#/definitions/input_schema"
steps:
type: "array"
items:
$ref: "#/definitions/step_schema"
minItems: 1
outputs:
$ref: "#/definitions/output_schema"
documentation:
type: "object"
properties:
overview:
type: "string"
examples:
type: "array"
items:
type: "object"
properties:
title:
type: "string"
description:
type: "string"
code:
type: "string"
troubleshooting:
type: "array"
items:
type: "object"
properties:
problem:
type: "string"
solution:
type: "string"
permissions:
type: "object"
properties:
read:
type: "array"
items:
type: "string"
write:
type: "array"
items:
type: "string"
execute:
type: "array"
items:
type: "string"
required: ["metadata", "steps"]
additionalProperties: false
2. Python数据模型(Pydantic)
# models/skill_models.py
from typing import Dict, List, Optional, Any, Union
from enum import Enum
from datetime import datetime
from pydantic import BaseModel, Field, validator, root_validator
import re
class StepType(str, Enum):
COMMAND = "command"
SCRIPT = "script"
TEMPLATE = "template"
CONDITION = "condition"
LOOP = "loop"
PARALLEL = "parallel"
VALIDATION = "validation"
HTTP_REQUEST = "http_request"
DATABASE_QUERY = "database_query"
class InputType(str, Enum):
STRING = "string"
INTEGER = "integer"
NUMBER = "number"
BOOLEAN = "boolean"
ARRAY = "array"
OBJECT = "object"
FILE = "file"
DIRECTORY = "directory"
class Severity(str, Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
class SkillRequirements(BaseModel):
"""技能依赖要求"""
opencode_version: Optional[str] = Field(
None,
regex=r"^d+.d+.d+$",
description="Minimum OpenCode version required"
)
tools: List[Dict[str, Union[str, bool]]] = Field(
default_factory=list,
description="Required external tools"
)
permissions: List[str] = Field(
default_factory=list,
description="Required permissions"
)
python_packages: List[Dict[str, Union[str, bool]]] = Field(
default_factory=list,
description="Required Python packages"
)
@validator('permissions')
def validate_permissions(cls, v):
valid_permissions = {"file.read", "file.write", "network", "process", "env"}
for perm in v:
if perm not in valid_permissions:
raise ValueError(f"Invalid permission: {perm}. Valid permissions: {valid_permissions}")
return v
class SkillMetadata(BaseModel):
"""技能元数据"""
name: str = Field(
...,
min_length=3,
max_length=50,
regex=r"^[a-z0-9-_]+$",
description="Unique identifier for the skill"
)
version: str = Field(
...,
regex=r"^d+.d+.d+$",
description="Semantic version of the skill"
)
description: str = Field(
...,
min_length=10,
max_length=200,
description="Brief description of what the skill does"
)
author: Optional[str] = Field(None, description="Author or maintainer")
tags: List[str] = Field(
default_factory=list,
max_items=10,
description="Keywords for categorization"
)
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
requirements: SkillRequirements = Field(default_factory=SkillRequirements)
@validator('tags')
def validate_tags(cls, v):
for tag in v:
if not re.match(r"^[a-z0-9-]+$", tag):
raise ValueError(f"Tag '{tag}' must contain only lowercase letters, numbers, and hyphens")
return v
class Config:
json_schema_extra = {
"example": {
"name": "git-release",
"version": "1.0.0",
"description": "Automates semantic versioning and release process",
"author": "OpenCode Team",
"tags": ["git", "release", "automation"]
}
}
class SkillInput(BaseModel):
"""技能输入参数定义"""
name: str = Field(
...,
regex=r"^[a-z][a-z0-9_]*$",
description="Input parameter name"
)
type: InputType = Field(..., description="Data type of the input")
description: str = Field(..., min_length=5, description="Description of the input")
required: bool = Field(True, description="Whether the input is required")
default: Optional[Any] = Field(None, description="Default value if not provided")
choices: Optional[List[Any]] = Field(None, description="Allowed values")
validation: Optional[Dict[str, Any]] = Field(
None,
description="Validation rules (pattern, min, max, etc.)"
)
examples: Optional[List[Dict[str, Any]]] = Field(
None,
description="Example values and descriptions"
)
@validator('default')
def validate_default_type(cls, v, values):
if v is not None and 'type' in values:
expected_type = values['type']
type_checks = {
InputType.STRING: isinstance(v, str),
InputType.INTEGER: isinstance(v, int),
InputType.NUMBER: isinstance(v, (int, float)),
InputType.BOOLEAN: isinstance(v, bool),
InputType.ARRAY: isinstance(v, list),
InputType.OBJECT: isinstance(v, dict),
InputType.FILE: isinstance(v, str),
InputType.DIRECTORY: isinstance(v, str),
}
if not type_checks.get(expected_type, True):
raise ValueError(f"Default value type {type(v)} doesn't match input type {expected_type}")
return v
@root_validator
def validate_choices(cls, values):
choices = values.get('choices')
default = values.get('default')
if choices and default and default not in choices:
raise ValueError(f"Default value {default} not in choices {choices}")
return values
class SkillStep(BaseModel):
"""技能执行步骤"""
name: str = Field(
...,
regex=r"^[a-z][a-z0-9_]*$",
description="Step identifier"
)
type: StepType = Field(..., description="Type of step")
description: Optional[str] = Field(None, min_length=10, description="Step description")
condition: Optional[str] = Field(
None,
description="JavaScript-like condition expression for conditional execution"
)
parameters: Dict[str, Any] = Field(
default_factory=dict,
description="Step-specific parameters"
)
template: Optional[str] = Field(None, description="Jinja2 template for content generation")
script: Optional[str] = Field(None, description="JavaScript/Python script to execute")
outputs: Dict[str, str] = Field(
default_factory=dict,
description="Output variable mappings"
)
timeout: int = Field(
300,
ge=1,
le=3600,
description="Maximum execution time in seconds"
)
retry: Optional[Dict[str, Any]] = Field(
None,
description="Retry configuration"
)
dependencies: List[str] = Field(
default_factory=list,
description="Steps that must complete before this step"
)
@validator('condition')
def validate_condition_syntax(cls, v):
if v:
# Basic validation for condition syntax
# In production, use a proper JavaScript parser
forbidden_keywords = ['import', 'exec', 'eval', '__']
for keyword in forbidden_keywords:
if keyword in v:
raise ValueError(f"Forbidden keyword '{keyword}' in condition")
return v
@validator('retry')
def validate_retry_config(cls, v):
if v:
if 'attempts' not in v:
raise ValueError("Retry config must include 'attempts'")
if v['attempts'] < 0 or v['attempts'] > 5:
raise ValueError("Retry attempts must be between 0 and 5")
return v
class SkillOutput(BaseModel):
"""技能输出定义"""
name: str = Field(
...,
regex=r"^[a-z][a-z0-9_]*$",
description="Output variable name"
)
description: str = Field(..., description="Description of the output")
type: str = Field(..., description="Data type of the output")
value: str = Field(
...,
description="JavaScript expression to compute the output value"
)
class SkillDocumentation(BaseModel):
"""技能文档"""
overview: Optional[str] = Field(None, description="High-level overview")
examples: List[Dict[str, str]] = Field(
default_factory=list,
description="Usage examples"
)
troubleshooting: List[Dict[str, str]] = Field(
default_factory=list,
description="Common problems and solutions"
)
markdown_content: Optional[str] = Field(
None,
description="Full Markdown documentation"
)
def generate_markdown(self) -> str:
"""生成完整的Markdown文档"""
sections = []
if self.overview:
sections.append(f"# Overview
{self.overview}")
if self.examples:
sections.append("## Examples")
for i, example in enumerate(self.examples, 1):
sections.append(f"### Example {i}: {example.get('title', '')}")
if 'description' in example:
sections.append(f"{example['description']}
")
if 'code' in example:
sections.append(f"```yaml
{example['code']}
```")
if self.troubleshooting:
sections.append("## Troubleshooting")
for item in self.troubleshooting:
sections.append(f"### {item.get('problem', 'Problem')}")
sections.append(f"{item.get('solution', 'Solution')}")
if self.markdown_content:
sections.append(self.markdown_content)
return "
".join(sections)
class SkillPermissions(BaseModel):
"""技能权限要求"""
read: List[str] = Field(default_factory=list, description="Files/directories to read")
write: List[str] = Field(default_factory=list, description="Files/directories to write")
execute: List[str] = Field(default_factory=list, description="Commands to execute")
network: List[str] = Field(default_factory=list, description="Network endpoints to access")
@validator('read', 'write')
def validate_path_patterns(cls, v):
for path in v:
if not re.match(r'^[a-zA-Z0-9_-./*?]+$', path):
raise ValueError(f"Invalid path pattern: {path}")
return v
class Skill(BaseModel):
"""完整的技能定义"""
metadata: SkillMetadata
inputs: Dict[str, SkillInput] = Field(default_factory=dict)
steps: List[SkillStep] = Field(..., min_items=1)
outputs: Dict[str, SkillOutput] = Field(default_factory=dict)
documentation: SkillDocumentation = Field(default_factory=SkillDocumentation)
permissions: SkillPermissions = Field(default_factory=SkillPermissions)
@validator('steps')
def validate_step_dependencies(cls, v):
"""验证步骤依赖关系"""
step_names = {step.name for step in v}
for step in v:
for dep in step.dependencies:
if dep not in step_names:
raise ValueError(f"Step '{step.name}' depends on unknown step '{dep}'")
# 检查循环依赖
visited = set()
recursion_stack = set()
def has_cycle(step_name, steps_dict):
visited.add(step_name)
recursion_stack.add(step_name)
step = steps_dict[step_name]
for dep in step.dependencies:
if dep not in visited:
if has_cycle(dep, steps_dict):
return True
elif dep in recursion_stack:
return True
recursion_stack.remove(step_name)
return False
steps_dict = {step.name: step for step in v}
for step in v:
if step.name not in visited:
if has_cycle(step.name, steps_dict):
raise ValueError("Circular dependency detected in steps")
return v
@root_validator
def validate_output_references(cls, values):
"""验证输出引用是否有效"""
steps = values.get('steps', [])
outputs = values.get('outputs', {})
# 收集所有可用的输出变量
available_outputs = set()
for step in steps:
available_outputs.update(step.outputs.keys())
# 验证输出定义中的引用
for output_name, output_def in outputs.items():
# 这里可以添加更复杂的引用解析逻辑
# 暂时只检查基本的语法
if not output_def.value:
raise ValueError(f"Output '{output_name}' must have a value expression")
return values
class Config:
json_schema_extra = {
"example": {
"metadata": {
"name": "example-skill",
"version": "1.0.0",
"description": "An example skill"
},
"steps": [
{
"name": "step1",
"type": "command",
"description": "First step"
}
]
}
}
三、技能DSL解析器实现
1. 主解析器类实现
# parsers/skill_parser.py
import re
import yaml
import json
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any, Union
from dataclasses import dataclass
from enum import Enum
import jsonschema
from jsonschema import validate, ValidationError
import jinja2
from jinja2.sandbox import SandboxedEnvironment
from models.skill_models import Skill, SkillMetadata, SkillStep, SkillInput, SkillOutput
class ParseError(Exception):
"""解析错误异常"""
def __init__(self, message: str, line: Optional[int] = None, column: Optional[int] = None):
self.message = message
self.line = line
self.column = column
super().__init__(f"{message} (line {line}, column {column})" if line else message)
class ValidationSeverity(str, Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class ValidationIssue:
"""验证问题"""
code: str
message: str
severity: ValidationSeverity
path: str
line: Optional[int] = None
column: Optional[int] = None
suggestion: Optional[str] = None
@dataclass
class ValidationResult:
"""验证结果"""
is_valid: bool
issues: List[ValidationIssue]
schema_version: str
def add_issue(self, issue: ValidationIssue):
self.issues.append(issue)
if issue.severity in [ValidationSeverity.ERROR, ValidationSeverity.CRITICAL]:
self.is_valid = False
def merge(self, other: 'ValidationResult'):
self.issues.extend(other.issues)
self.is_valid = self.is_valid and other.is_valid
def to_dict(self) -> Dict:
return {
"is_valid": self.is_valid,
"schema_version": self.schema_version,
"issues": [
{
"code": i.code,
"message": i.message,
"severity": i.severity,
"path": i.path,
"line": i.line,
"column": i.column,
"suggestion": i.suggestion
}
for i in self.issues
],
"summary": {
"total_issues": len(self.issues),
"errors": len([i for i in self.issues if i.severity == ValidationSeverity.ERROR]),
"warnings": len([i for i in self.issues if i.severity == ValidationSeverity.WARNING]),
"infos": len([i for i in self.issues if i.severity == ValidationSeverity.INFO])
}
}
class SkillSchemaValidator:
"""技能Schema验证器"""
def __init__(self, schema_dir: Optional[Path] = None):
self.schema_dir = schema_dir or Path(__file__).parent / "schemas"
self.schemas = self._load_schemas()
self.custom_validators = []
def _load_schemas(self) -> Dict[str, Dict]:
"""加载所有Schema文件"""
schemas = {}
for schema_file in self.schema_dir.glob("skill-schema-*.yaml"):
try:
with open(schema_file, 'r', encoding='utf-8') as f:
schema = yaml.safe_load(f)
version = schema.get('$id', '').split('/')[-1]
schemas[version] = schema
except Exception as e:
print(f"Warning: Failed to load schema {schema_file}: {e}")
return schemas
def validate_yaml(self, yaml_data: Dict, schema_version: str = "v1.0.0") -> ValidationResult:
"""验证YAML数据是否符合Schema"""
result = ValidationResult(
is_valid=True,
issues=[],
schema_version=schema_version
)
if schema_version not in self.schemas:
result.add_issue(ValidationIssue(
code="SCHEMA_NOT_FOUND",
message=f"Schema version {schema_version} not found",
severity=ValidationSeverity.ERROR,
path="$"
))
return result
schema = self.schemas[schema_version]
try:
# 基础Schema验证
validate(instance=yaml_data, schema=schema)
except ValidationError as e:
result.add_issue(ValidationIssue(
code="SCHEMA_VALIDATION_FAILED",
message=e.message,
severity=ValidationSeverity.ERROR,
path=e.json_path or "$",
line=getattr(e, 'line', None),
column=getattr(e, 'column', None)
))
# 自定义验证规则
self._validate_custom_rules(yaml_data, result)
return result
def _validate_custom_rules(self, data: Dict, result: ValidationResult):
"""应用自定义验证规则"""
# 1. 验证技能名称唯一性模式
if 'metadata' in data and 'name' in data['metadata']:
name = data['metadata']['name']
if not re.match(r'^[a-z0-9-_]+$', name):
result.add_issue(ValidationIssue(
code="INVALID_SKILL_NAME",
message=f"Skill name '{name}' must contain only lowercase letters, numbers, hyphens, and underscores",
severity=ValidationSeverity.ERROR,
path="metadata.name"
))
# 2. 验证版本格式
if 'metadata' in data and 'version' in data['metadata']:
version = data['metadata']['version']
if not re.match(r'^d+.d+.d+$', version):
result.add_issue(ValidationIssue(
code="INVALID_VERSION_FORMAT",
message=f"Version '{version}' must follow semantic versioning (e.g., 1.0.0)",
severity=ValidationSeverity.ERROR,
path="metadata.version"
))
# 3. 验证步骤名称唯一性
if 'steps' in data:
step_names = []
for i, step in enumerate(data['steps']):
if 'name' in step:
if step['name'] in step_names:
result.add_issue(ValidationIssue(
code="DUPLICATE_STEP_NAME",
message=f"Duplicate step name '{step['name']}'",
severity=ValidationSeverity.ERROR,
path=f"steps[{i}].name"
))
step_names.append(step['name'])
# 4. 验证输入参数名称
if 'inputs' in data:
for input_name in data['inputs'].keys():
if not re.match(r'^[a-z][a-z0-9_]*$', input_name):
result.add_issue(ValidationIssue(
code="INVALID_INPUT_NAME",
message=f"Input name '{input_name}' must start with a letter and contain only lowercase letters, numbers, and underscores",
severity=ValidationSeverity.ERROR,
path=f"inputs.{input_name}"
))
# 5. 验证依赖关系
if 'steps' in data:
all_step_names = {step.get('name', f'step_{i}') for i, step in enumerate(data['steps'])}
for i, step in enumerate(data['steps']):
if 'dependencies' in step:
for dep in step['dependencies']:
if dep not in all_step_names:
result.add_issue(ValidationIssue(
code="UNKNOWN_DEPENDENCY",
message=f"Step '{step.get('name', f'step_{i}')}' depends on unknown step '{dep}'",
severity=ValidationSeverity.ERROR,
path=f"steps[{i}].dependencies"
))
def validate_skill_structure(self, skill: Skill) -> ValidationResult:
"""验证技能结构完整性"""
result = ValidationResult(
is_valid=True,
issues=[],
schema_version="v1.0.0"
)
# 验证步骤执行顺序
self._validate_execution_order(skill, result)
# 验证输入默认值
self._validate_input_defaults(skill, result)
# 验证模板语法
self._validate_templates(skill, result)
# 验证条件表达式
self._validate_conditions(skill, result)
return result
def _validate_execution_order(self, skill: Skill, result: ValidationResult):
"""验证执行顺序"""
# 检查循环依赖已在Skill模型中验证
# 这里可以添加其他执行顺序相关的验证
step_names = [step.name for step in skill.steps]
# 验证输出引用
for output_name, output_def in skill.outputs.items():
# 检查输出表达式是否引用有效的步骤输出
# 这里可以添加更复杂的分析
pass
def _validate_input_defaults(self, skill: Skill, result: ValidationResult):
"""验证输入默认值"""
for input_name, input_def in skill.inputs.items():
if input_def.default is not None:
# 验证默认值类型
try:
input_def.validate(input_def.default)
except ValueError as e:
result.add_issue(ValidationIssue(
code="INVALID_DEFAULT_VALUE",
message=f"Invalid default value for input '{input_name}': {str(e)}",
severity=ValidationSeverity.ERROR,
path=f"inputs.{input_name}.default"
))
# 验证默认值在choices中(如果定义了choices)
if input_def.choices and input_def.default not in input_def.choices:
result.add_issue(ValidationIssue(
code="DEFAULT_NOT_IN_CHOICES",
message=f"Default value '{input_def.default}' not in choices {input_def.choices}",
severity=ValidationSeverity.ERROR,
path=f"inputs.{input_name}.default"
))
def _validate_templates(self, skill: Skill, result: ValidationResult):
"""验证模板语法"""
env = SandboxedEnvironment()
for i, step in enumerate(skill.steps):
if step.template:
try:
template = env.from_string(step.template)
# 尝试编译模板
template.render()
except jinja2.TemplateError as e:
result.add_issue(ValidationIssue(
code="INVALID_TEMPLATE",
message=f"Invalid template in step '{step.name}': {str(e)}",
severity=ValidationSeverity.ERROR,
path=f"steps[{i}].template",
suggestion="Check Jinja2 template syntax"
))
def _validate_conditions(self, skill: Skill, result: ValidationResult):
"""验证条件表达式语法"""
# 这里可以集成JavaScript表达式解析器
# 暂时只进行基本验证
forbidden_patterns = [
(r'imports+', "Import statements not allowed in conditions"),
(r'__w+__', "Dunder methods not allowed in conditions"),
(r'eval(', "eval() function not allowed in conditions"),
(r'exec(', "exec() function not allowed in conditions"),
(r'open(', "File operations not allowed in conditions"),
]
for i, step in enumerate(skill.steps):
if step.condition:
condition = step.condition
for pattern, message in forbidden_patterns:
if re.search(pattern, condition):
result.add_issue(ValidationIssue(
code="FORBIDDEN_EXPRESSION",
message=f"{message} in condition: {condition[:50]}...",
severity=ValidationSeverity.ERROR,
path=f"steps[{i}].condition"
))
# 检查语法基本有效性
try:
# 这里可以添加更复杂的语法检查
# 例如使用pyparsing或类似的库
if not condition.strip():
result.add_issue(ValidationIssue(
code="EMPTY_CONDITION",
message="Condition expression is empty",
severity=ValidationSeverity.WARNING,
path=f"steps[{i}].condition"
))
except Exception as e:
result.add_issue(ValidationIssue(
code="CONDITION_SYNTAX_ERROR",
message=f"Condition syntax error: {str(e)}",
severity=ValidationSeverity.ERROR,
path=f"steps[{i}].condition"
))
def register_custom_validator(self, validator_func):
"""注册自定义验证器"""
self.custom_validators.append(validator_func)
class SkillLoader:
"""技能加载器"""
def __init__(self):
self.yaml_loader = yaml.SafeLoader
self.template_env = SandboxedEnvironment()
def load_yaml(self, content: str) -> Dict:
"""加载YAML内容"""
try:
data = yaml.safe_load(content)
if data is None:
return {}
return data
except yaml.YAMLError as e:
# 尝试提供更详细的错误信息
if hasattr(e, 'problem_mark'):
mark = e.problem_mark
raise ParseError(
f"YAML parse error: {e.problem}",
line=mark.line + 1,
column=mark.column + 1
)
else:
raise ParseError(f"YAML parse error: {str(e)}")
def extract_markdown(self, content: str) -> Tuple[str, str]:
"""提取YAML和Markdown部分"""
# 支持三种分隔符格式
separators = [
r'^---s*$', # 标准分隔符
r'^...s*$', # 替代分隔符
r'^```yamls*$', # 代码块风格
]
lines = content.split('
')
yaml_lines = []
markdown_lines = []
in_yaml = True
separator_found = False
for i, line in enumerate(lines):
if in_yaml and any(re.match(pattern, line) for pattern in separators):
if yaml_lines: # 只有已经有YAML内容时才认为是分隔符
separator_found = True
in_yaml = False
continue
if in_yaml:
yaml_lines.append(line)
else:
markdown_lines.append(line)
yaml_content = '
'.join(yaml_lines)
markdown_content = '
'.join(markdown_lines)
return yaml_content, markdown_content
def resolve_references(self, data: Dict, context: Dict = None) -> Dict:
"""解析引用和变量"""
if context is None:
context = {}
def _resolve(obj, path=""):
if isinstance(obj, dict):
resolved = {}
for key, value in obj.items():
new_path = f"{path}.{key}" if path else key
resolved[key] = _resolve(value, new_path)
return resolved
elif isinstance(obj, list):
return [_resolve(item, f"{path}[{i}]") for i, item in enumerate(obj)]
elif isinstance(obj, str):
# 解析变量引用 {{variable}}
return self._resolve_variables(obj, context)
else:
return obj
return _resolve(data)
def _resolve_variables(self, text: str, context: Dict) -> str:
"""解析变量引用"""
if not isinstance(text, str):
return text
pattern = r'{{s*([a-zA-Z_][a-zA-Z0-9_]*(.[a-zA-Z_][a-zA-Z0-9_]*)*)s*}}'
def replace_match(match):
var_path = match.group(1)
try:
# 简单的点号分割路径解析
value = context
for part in var_path.split('.'):
if isinstance(value, dict) and part in value:
value = value[part]
else:
# 如果找不到变量,保持原样(将在执行时解析)
return match.group(0)
return str(value)
except:
return match.group(0)
return re.sub(pattern, replace_match, text)
class SkillDSLParser:
"""技能DSL主解析器"""
def __init__(self, schema_dir: Optional[Path] = None):
self.validator = SkillSchemaValidator(schema_dir)
self.loader = SkillLoader()
self.skills_cache = {}
def parse(self, content: str, validate: bool = True) -> Skill:
"""解析技能DSL内容"""
# 提取YAML和Markdown部分
yaml_content, markdown_content = self.loader.extract_markdown(content)
if not yaml_content.strip():
raise ParseError("No YAML content found in skill definition")
# 加载YAML
yaml_data = self.loader.load_yaml(yaml_content)
# 验证YAML结构
if validate:
validation_result = self.validator.validate_yaml(yaml_data)
if not validation_result.is_valid:
errors = "
".join(
f"- {issue.message} (path: {issue.path})"
for issue in validation_result.issues
if issue.severity in [ValidationSeverity.ERROR, ValidationSeverity.CRITICAL]
)
raise ParseError(f"Skill validation failed:
{errors}")
# 解析变量引用
resolved_data = self.loader.resolve_references(yaml_data)
# 构建技能对象
try:
skill = self._build_skill_object(resolved_data, markdown_content)
except Exception as e:
raise ParseError(f"Failed to build skill object: {str(e)}")
# 验证技能结构
if validate:
structure_result = self.validator.validate_skill_structure(skill)
if not structure_result.is_valid:
errors = "
".join(
f"- {issue.message} (path: {issue.path})"
for issue in structure_result.issues
if issue.severity in [ValidationSeverity.ERROR, ValidationSeverity.CRITICAL]
)
raise ParseError(f"Skill structure validation failed:
{errors}")
return skill
def parse_file(self, file_path: Union[str, Path], validate: bool = True) -> Skill:
"""从文件解析技能"""
file_path = Path(file_path)
# 检查缓存
cache_key = (str(file_path), file_path.stat().st_mtime if file_path.exists() else 0)
if cache_key in self.skills_cache:
return self.skills_cache[cache_key]
if not file_path.exists():
raise FileNotFoundError(f"Skill file not found: {file_path}")
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except UnicodeDecodeError:
# 尝试其他编码
with open(file_path, 'r', encoding='latin-1') as f:
content = f.read()
skill = self.parse(content, validate)
# 缓存结果
self.skills_cache[cache_key] = skill
return skill
def validate(self, content: str) -> ValidationResult:
"""验证技能DSL内容"""
try:
# 尝试解析但不构建完整对象
yaml_content, _ = self.loader.extract_markdown(content)
yaml_data = self.loader.load_yaml(yaml_content)
# Schema验证
schema_result = self.validator.validate_yaml(yaml_data)
# 如果Schema验证通过,尝试构建对象进行结构验证
if schema_result.is_valid:
try:
skill = self._build_skill_object(yaml_data)
structure_result = self.validator.validate_skill_structure(skill)
schema_result.merge(structure_result)
except Exception as e:
schema_result.add_issue(ValidationIssue(
code="STRUCTURE_BUILD_FAILED",
message=f"Failed to build skill structure: {str(e)}",
severity=ValidationSeverity.ERROR,
path="$"
))
return schema_result
except Exception as e:
return ValidationResult(
is_valid=False,
issues=[ValidationIssue(
code="PARSE_ERROR",
message=f"Parse error: {str(e)}",
severity=ValidationSeverity.ERROR,
path="$"
)],
schema_version="v1.0.0"
)
def serialize(self, skill: Skill, format: str = "yaml") -> str:
"""序列化技能对象为DSL格式"""
if format.lower() == "yaml":
return self._serialize_to_yaml(skill)
elif format.lower() == "json":
return self._serialize_to_json(skill)
else:
raise ValueError(f"Unsupported format: {format}. Use 'yaml' or 'json'.")
def _build_skill_object(self, data: Dict, markdown_content: str = "") -> Skill:
"""从字典数据构建技能对象"""
# 处理元数据
metadata_data = data.get('metadata', {})
metadata = SkillMetadata(**metadata_data)
# 处理输入
inputs = {}
for name, input_data in data.get('inputs', {}).items():
inputs[name] = SkillInput(name=name, **input_data)
# 处理步骤
steps = []
for step_data in data.get('steps', []):
steps.append(SkillStep(**step_data))
# 处理输出
outputs = {}
for name, output_data in data.get('outputs', {}).items():
outputs[name] = SkillOutput(name=name, **output_data)
# 处理文档
doc_data = data.get('documentation', {})
if markdown_content:
doc_data['markdown_content'] = markdown_content
documentation = SkillDocumentation(**doc_data)
# 处理权限
permissions_data = data.get('permissions', {})
from models.skill_models import SkillPermissions
permissions = SkillPermissions(**permissions_data)
# 构建完整技能对象
skill_data = {
'metadata': metadata,
'inputs': inputs,
'steps': steps,
'outputs': outputs,
'documentation': documentation,
'permissions': permissions
}
return Skill(**skill_data)
def _serialize_to_yaml(self, skill: Skill) -> str:
"""序列化为YAML格式"""
import yaml
def skill_to_dict(skill_obj: Skill) -> Dict:
"""将技能对象转换为字典"""
data = skill_obj.dict(exclude_none=True)
# 处理特殊字段
if 'documentation' in data and 'markdown_content' in data['documentation']:
markdown_content = data['documentation'].pop('markdown_content')
else:
markdown_content = ""
return data, markdown_content
data, markdown_content = skill_to_dict(skill)
# 生成YAML
yaml_content = yaml.dump(data, default_flow_style=False, sort_keys=False, allow_unicode=True)
# 添加分隔符和Markdown
if markdown_content:
yaml_content = f"{yaml_content}
---
{markdown_content}"
return yaml_content
def _serialize_to_json(self, skill: Skill) -> str:
"""序列化为JSON格式"""
import json
data = skill.dict(exclude_none=True)
# 添加Schema信息
data['$schema'] = 'https://opencode.ai/schemas/skill/v1.0.0'
return json.dumps(data, indent=2, ensure_ascii=False, default=str)
# 快捷函数
def parse_skill(content: str, **kwargs) -> Skill:
"""快捷解析函数"""
parser = SkillDSLParser()
return parser.parse(content, **kwargs)
def parse_skill_file(file_path: Union[str, Path], **kwargs) -> Skill:
"""快捷文件解析函数"""
parser = SkillDSLParser()
return parser.parse_file(file_path, **kwargs)
def validate_skill(content: str) -> ValidationResult:
"""快捷验证函数"""
parser = SkillDSLParser()
return parser.validate(content)
2. 扩展验证器实现
# parsers/extended_validators.py
import re
from typing import Dict, List, Any
from pathlib import Path
import ast
import json
class ExtendedSkillValidator:
"""扩展的技能验证器"""
def __init__(self):
self.validators = {
'security': self.validate_security,
'performance': self.validate_performance,
'best_practices': self.validate_best_practices,
'compatibility': self.validate_compatibility
}
def validate_security(self, skill_data: Dict) -> List[Dict]:
"""安全验证"""
issues = []
# 检查危险操作
dangerous_patterns = [
(r'evals*(', 'eval() usage', 'high'),
(r'execs*(', 'exec() usage', 'high'),
(r'__import__s*(', 'dynamic import', 'medium'),
(r'subprocess.Popen', 'subprocess execution', 'medium'),
(r'os.system', 'system command execution', 'medium'),
]
# 检查步骤中的脚本和模板
for step in skill_data.get('steps', []):
text_to_check = []
if 'script' in step:
text_to_check.append(('script', step['script']))
if 'template' in step:
text_to_check.append(('template', step['template']))
if 'condition' in step:
text_to_check.append(('condition', step['condition']))
for field_type, text in text_to_check:
for pattern, description, severity in dangerous_patterns:
if re.search(pattern, text, re.IGNORECASE):
issues.append({
'type': 'security',
'severity': severity,
'description': f'Potential security issue in {field_type}: {description}',
'location': f"steps[{step.get('name', 'unnamed')}].{field_type}",
'suggestion': 'Consider using safer alternatives or adding proper validation'
})
# 检查文件权限
permissions = skill_data.get('permissions', {})
write_permissions = permissions.get('write', [])
for path in write_permissions:
if path in ['/', '/etc', '/usr', '/bin', '/sbin', '/var']:
issues.append({
'type': 'security',
'severity': 'high',
'description': f'Writing to system directory: {path}',
'location': 'permissions.write',
'suggestion': 'Restrict write permissions to application directories only'
})
return issues
def validate_performance(self, skill_data: Dict) -> List[Dict]:
"""性能验证"""
issues = []
# 检查步骤超时设置
for step in skill_data.get('steps', []):
timeout = step.get('timeout', 300)
if timeout > 3600: # 1小时
issues.append({
'type': 'performance',
'severity': 'medium',
'description': f'Step timeout too long: {timeout} seconds',
'location': f"steps[{step.get('name', 'unnamed')}].timeout",
'suggestion': 'Consider breaking long-running steps into smaller units'
})
# 检查潜在的无限循环模式
if 'script' in step and 'while True' in step['script']:
issues.append({
'type': 'performance',
'severity': 'high',
'description': 'Potential infinite loop detected',
'location': f"steps[{step.get('name', 'unnamed')}].script",
'suggestion': 'Add proper loop termination conditions'
})
# 检查资源密集型操作
resource_intensive_patterns = [
(r'for.*in.*range(d{6,})', 'Large loop iteration'),
(r'.read()', 'Reading entire file to memory'),
(r'SELECT *', 'Selecting all columns without limit'),
]
for step in skill_data.get('steps', []):
if 'script' in step:
for pattern, description in resource_intensive_patterns:
if re.search(pattern, step['script']):
issues.append({
'type': 'performance',
'severity': 'medium',
'description': f'Potential performance issue: {description}',
'location': f"steps[{step.get('name', 'unnamed')}].script",
'suggestion': 'Optimize for memory usage and processing time'
})
return issues
def validate_best_practices(self, skill_data: Dict) -> List[Dict]:
"""最佳实践验证"""
issues = []
metadata = skill_data.get('metadata', {})
# 检查文档完整性
if not metadata.get('description') or len(metadata.get('description', '')) < 20:
issues.append({
'type': 'best_practices',
'severity': 'low',
'description': 'Skill description is too brief',
'location': 'metadata.description',
'suggestion': 'Provide a more detailed description of what the skill does'
})
if not metadata.get('tags'):
issues.append({
'type': 'best_practices',
'severity': 'low',
'description': 'No tags specified',
'location': 'metadata.tags',
'suggestion': 'Add relevant tags for better discoverability'
})
# 检查输入验证
inputs = skill_data.get('inputs', {})
for input_name, input_def in inputs.items():
if input_def.get('required', False) and 'validation' not in input_def:
issues.append({
'type': 'best_practices',
'severity': 'medium',
'description': f'Required input "{input_name}" has no validation rules',
'location': f'inputs.{input_name}',
'suggestion': 'Add validation rules to ensure input quality'
})
# 检查错误处理
steps = skill_data.get('steps', [])
has_error_handling = False
for step in steps:
if 'retry' in step:
has_error_handling = True
break
if not has_error_handling and len(steps) > 1:
issues.append({
'type': 'best_practices',
'severity': 'medium',
'description': 'No error handling or retry logic specified',
'location': 'steps',
'suggestion': 'Add retry configuration for network or transient failures'
})
return issues
def validate_compatibility(self, skill_data: Dict) -> List[Dict]:
"""兼容性验证"""
issues = []
requirements = skill_data.get('metadata', {}).get('requirements', {})
# 检查OpenCode版本兼容性
opencode_version = requirements.get('opencode_version')
if opencode_version:
try:
from packaging import version
current_version = version.parse("1.0.0") # 假设当前版本
required_version = version.parse(opencode_version)
if required_version > current_version:
issues.append({
'type': 'compatibility',
'severity': 'high',
'description': f'Requires OpenCode {opencode_version}, but current is 1.0.0',
'location': 'metadata.requirements.opencode_version',
'suggestion': 'Update OpenCode or adjust skill requirements'
})
except:
pass
# 检查工具依赖
tools = requirements.get('tools', [])
for tool in tools:
if isinstance(tool, dict) and tool.get('optional', False) is False:
# 检查工具是否常见
common_tools = {'git', 'docker', 'kubectl', 'aws', 'terraform'}
if tool.get('name') not in common_tools:
issues.append({
'type': 'compatibility',
'severity': 'medium',
'description': f'Requires uncommon tool: {tool.get("name")}',
'location': 'metadata.requirements.tools',
'suggestion': 'Consider making this tool optional or providing installation instructions'
})
return issues
def validate_all(self, skill_data: Dict) -> Dict[str, List[Dict]]:
"""执行所有验证"""
results = {}
for validator_name, validator_func in self.validators.items():
try:
results[validator_name] = validator_func(skill_data)
except Exception as e:
results[validator_name] = [{
'type': 'validator_error',
'severity': 'high',
'description': f'Validator {validator_name} failed: {str(e)}',
'location': '$',
'suggestion': 'Check skill data format'
}]
return results
四、测试用例与验证
1. 测试数据准备
# tests/test_data/sample_skill.yaml
metadata:
name: git-release
version: 1.0.0
description: Automates semantic versioning and GitHub release creation
author: OpenCode Team
tags: [git, release, automation, ci-cd]
requirements:
opencode_version: "1.0.0"
tools:
- name: git
version: ">=2.25"
optional: false
- name: gh
version: ">=2.0"
optional: true
permissions: [file.read, file.write, network]
python_packages:
- name: requests
version: ">=2.25"
optional: false
inputs:
release_type:
type: string
description: Type of release (major, minor, patch)
required: true
choices: [major, minor, patch]
default: patch
examples:
- value: major
description: Major version bump for breaking changes
- value: minor
description: Minor version bump for new features
- value: patch
description: Patch version bump for bug fixes
dry_run:
type: boolean
description: Perform a dry run without creating release
required: false
default: false
changelog_path:
type: string
description: Path to changelog file
required: false
default: CHANGELOG.md
validation:
pattern: "^[a-zA-Z0-9_-./]+.md$"
steps:
- name: validate_environment
type: validation
description: Validate Git environment and permissions
condition: "git.status === 'clean' && git.branch === 'main'"
script: |
// Check Git status
const isClean = await git.isClean();
const currentBranch = await git.currentBranch();
if (!isClean) {
throw new Error('Working directory is not clean');
}
if (currentBranch !== 'main') {
throw new Error('Must be on main branch for releases');
}
outputs:
git_info: "context.git"
- name: analyze_commits
type: command
description: Analyze Git commits since last release
dependencies: [validate_environment]
parameters:
command: git
args:
- log
- --oneline
- --no-merges
- "{{git_info.latest_tag}}..HEAD"
outputs:
commits: "result.stdout"
commit_count: "result.stdout.split('
').length"
- name: determine_version
type: script
description: Determine next version based on commit analysis
dependencies: [analyze_commits]
script: |
const commits = context.commits;
let bumpType = 'patch';
// Analyze commit messages
for (const commit of commits.split('
')) {
if (commit.includes('BREAKING CHANGE')) {
bumpType = 'major';
break;
}
if (commit.startsWith('feat:')) {
bumpType = 'minor';
}
}
// Use user input if provided
if (inputs.release_type) {
bumpType = inputs.release_type;
}
// Calculate new version
const currentVersion = context.git_info.latest_tag.replace(/^v/, '');
const [major, minor, patch] = currentVersion.split('.').map(Number);
let newVersion;
switch (bumpType) {
case 'major':
newVersion = `${major + 1}.0.0`;
break;
case 'minor':
newVersion = `${major}.${minor + 1}.0`;
break;
case 'patch':
newVersion = `${major}.${minor}.${patch + 1}`;
break;
}
return { bumpType, newVersion };
timeout: 30
retry:
attempts: 2
delay: 5
- name: generate_changelog
type: template
description: Generate changelog from commits
dependencies: [analyze_commits, determine_version]
template: |
# Changelog
## Version {{determine_version.new_version}} ({{now | date('%Y-%m-%d')}})
### Features
{% for commit in commits %}
{% if commit.startswith('feat:') %}
- {{commit[6:]}}
{% endif %}
{% endfor %}
### Bug Fixes
{% for commit in commits %}
{% if commit.startswith('fix:') %}
- {{commit[5:]}}
{% endif %}
{% endfor %}
outputs:
changelog_content: "result"
- name: create_release
type: command
description: Create Git tag and GitHub release
dependencies: [generate_changelog, determine_version]
condition: "!inputs.dry_run"
parameters:
command: gh
args:
- release
- create
- "v{{determine_version.new_version}}"
- --notes-file
- "{{generate_changelog.changelog_content_path}}"
timeout: 60
outputs:
version:
description: "The new version that was released"
type: string
value: "determine_version.new_version"
changelog:
description: "Generated changelog content"
type: string
value: "generate_changelog.changelog_content"
release_url:
description: "URL of the created GitHub release"
type: string
value: "create_release.release_url"
documentation:
overview: |
This skill automates the process of creating semantic version releases
based on commit history. It analyzes commits, determines the appropriate
version bump, generates a changelog, and creates a GitHub release.
examples:
- title: "Create a patch release"
description: "Create a patch release for bug fixes"
code: |
opencode skill execute git-release --release-type=patch
- title: "Dry run for a major release"
description: "Test a major release without actually creating it"
code: |
opencode skill execute git-release --release-type=major --dry-run
troubleshooting:
- problem: "Git working directory is not clean"
solution: "Commit or stash your changes before running the release"
- problem: "Not on main branch"
solution: "Switch to the main branch before creating a release"
permissions:
read: [".git", "CHANGELOG.md", "package.json"]
write: ["CHANGELOG.md"]
execute: ["git", "gh"]
network: ["api.github.com"]
2. 单元测试实现
# tests/test_skill_parser.py
import pytest
import tempfile
import yaml
import json
from pathlib import Path
from unittest.mock import Mock, patch
from parsers.skill_parser import (
SkillDSLParser,
SkillLoader,
SkillSchemaValidator,
ParseError,
ValidationResult
)
from models.skill_models import Skill, SkillMetadata, SkillStep
class TestSkillLoader:
"""测试技能加载器"""
def setup_method(self):
self.loader = SkillLoader()
def test_load_yaml_valid(self):
"""测试加载有效的YAML"""
yaml_content = """
metadata:
name: test-skill
version: 1.0.0
description: A test skill
steps:
- name: step1
type: command
"""
result = self.loader.load_yaml(yaml_content)
assert result['metadata']['name'] == 'test-skill'
assert result['metadata']['version'] == '1.0.0'
assert len(result['steps']) == 1
def test_load_yaml_invalid(self):
"""测试加载无效的YAML"""
yaml_content = """
metadata:
name: test-skill
version: 1.0.0
description: A test skill
invalid: : :
"""
with pytest.raises(ParseError):
self.loader.load_yaml(yaml_content)
def test_extract_markdown_standard_separator(self):
"""测试提取标准分隔符的Markdown"""
content = """---
metadata:
name: test
steps: []
---
# Markdown Title
This is markdown content.
"""
yaml_part, markdown_part = self.loader.extract_markdown(content)
assert "metadata:" in yaml_part
assert "# Markdown Title" in markdown_part
assert "This is markdown content" in markdown_part
def test_extract_markdown_no_separator(self):
"""测试提取没有分隔符的内容"""
content = """metadata:
name: test
steps: []
"""
yaml_part, markdown_part = self.loader.extract_markdown(content)
assert "metadata:" in yaml_part
assert markdown_part == ""
def test_resolve_variables(self):
"""测试解析变量"""
context = {
'user': {'name': 'John', 'age': 30},
'project': 'test-project'
}
text = "Hello {{user.name}}, working on {{project}}"
result = self.loader._resolve_variables(text, context)
assert result == "Hello John, working on test-project"
def test_resolve_variables_missing(self):
"""测试解析缺失的变量"""
context = {'user': {'name': 'John'}}
text = "Hello {{user.name}}, age: {{user.age}}"
result = self.loader._resolve_variables(text, context)
# 缺失的变量应保持原样
assert "{{user.age}}" in result
class TestSkillSchemaValidator:
"""测试Schema验证器"""
def setup_method(self):
self.validator = SkillSchemaValidator()
def test_validate_yaml_valid(self):
"""测试验证有效的YAML"""
yaml_data = {
'metadata': {
'name': 'test-skill',
'version': '1.0.0',
'description': 'A test skill'
},
'steps': [
{'name': 'step1', 'type': 'command'}
]
}
result = self.validator.validate_yaml(yaml_data)
assert result.is_valid
assert len(result.issues) == 0
def test_validate_yaml_invalid_name(self):
"""测试验证无效的技能名称"""
yaml_data = {
'metadata': {
'name': 'Invalid Name!',
'version': '1.0.0',
'description': 'A test skill'
},
'steps': []
}
result = self.validator.validate_yaml(yaml_data)
assert not result.is_valid
assert any('INVALID_SKILL_NAME' in issue.code for issue in result.issues)
def test_validate_yaml_duplicate_steps(self):
"""测试验证重复的步骤名称"""
yaml_data = {
'metadata': {
'name': 'test-skill',
'version': '1.0.0',
'description': 'A test skill'
},
'steps': [
{'name': 'step1', 'type': 'command'},
{'name': 'step1', 'type': 'script'} # 重复名称
]
}
result = self.validator.validate_yaml(yaml_data)
assert not result.is_valid
assert any('DUPLICATE_STEP_NAME' in issue.code for issue in result.issues)
def test_validate_skill_structure(self):
"""测试验证技能结构"""
skill = Skill(
metadata=SkillMetadata(
name='test-skill',
version='1.0.0',
description='A test skill'
),
steps=[
SkillStep(name='step1', type='command'),
SkillStep(name='step2', type='script', dependencies=['step1'])
]
)
result = self.validator.validate_skill_structure(skill)
assert result.is_valid
def test_validate_skill_structure_circular_dependency(self):
"""测试验证循环依赖"""
skill = Skill(
metadata=SkillMetadata(
name='test-skill',
version='1.0.0',
description='A test skill'
),
steps=[
SkillStep(name='step1', type='command', dependencies=['step2']),
SkillStep(name='step2', type='script', dependencies=['step1']) # 循环依赖
]
)
# 注意:循环依赖检查已在Skill模型中实现
# 这里验证是否会引发异常
with pytest.raises(ValueError) as exc_info:
# 构建时会验证循环依赖
pass
assert 'circular' in str(exc_info.value).lower()
class TestSkillDSLParser:
"""测试技能DSL解析器"""
def setup_method(self):
self.parser = SkillDSLParser()
def test_parse_valid_skill(self):
"""测试解析有效的技能"""
skill_content = """---
metadata:
name: test-skill
version: 1.0.0
description: A test skill for unit testing
tags: [test, unit]
steps:
- name: step1
type: command
description: First step
parameters:
command: echo
args: ["Hello, World!"]
---
# Test Skill Documentation
This is a test skill for unit testing.
"""
skill = self.parser.parse(skill_content)
assert skill.metadata.name == 'test-skill'
assert skill.metadata.version == '1.0.0'
assert len(skill.steps) == 1
assert skill.steps[0].name == 'step1'
assert skill.documentation.markdown_content is not None
def test_parse_invalid_skill(self):
"""测试解析无效的技能"""
skill_content = """---
metadata:
name: invalid skill name
version: not-a-version
description: Too short
---
"""
with pytest.raises(ParseError):
self.parser.parse(skill_content)
def test_parse_file(self, tmp_path):
"""测试从文件解析技能"""
skill_file = tmp_path / "test_skill.yaml"
skill_content = """---
metadata:
name: file-skill
version: 1.0.0
description: A skill loaded from file
steps:
- name: step1
type: command
---
"""
skill_file.write_text(skill_content)
skill = self.parser.parse_file(skill_file)
assert skill.metadata.name == 'file-skill'
def test_parse_file_not_found(self):
"""测试解析不存在的文件"""
with pytest.raises(FileNotFoundError):
self.parser.parse_file("/nonexistent/path/skill.yaml")
def test_validate_skill(self):
"""测试验证技能"""
skill_content = """---
metadata:
name: test-skill
version: 1.0.0
description: A test skill for validation
steps:
- name: step1
type: command
---
"""
result = self.parser.validate(skill_content)
assert result.is_valid
def test_validate_invalid_skill(self):
"""测试验证无效的技能"""
skill_content = """---
metadata:
name: invalid!
version: x.y.z
description: Short
steps: []
---
"""
result = self.parser.validate(skill_content)
assert not result.is_valid
assert len(result.issues) > 0
def test_serialize_skill(self):
"""测试序列化技能"""
skill_content = """---
metadata:
name: serialize-test
version: 1.0.0
description: A skill for serialization testing
steps:
- name: step1
type: command
---
# Documentation
Test documentation.
"""
skill = self.parser.parse(skill_content)
serialized = self.parser.serialize(skill, format="yaml")
# 重新解析序列化的内容
skill2 = self.parser.parse(serialized)
assert skill.metadata.name == skill2.metadata.name
assert len(skill.steps) == len(skill2.steps)
def test_serialize_json(self):
"""测试序列化为JSON"""
skill_content = """---
metadata:
name: json-test
version: 1.0.0
description: A skill for JSON serialization
steps:
- name: step1
type: command
---
"""
skill = self.parser.parse(skill_content)
json_str = self.parser.serialize(skill, format="json")
# 验证JSON格式
data = json.loads(json_str)
assert data['metadata']['name'] == 'json-test'
assert '$schema' in data
def test_cache_behavior(self, tmp_path):
"""测试解析器缓存行为"""
skill_file = tmp_path / "cache_test.yaml"
# 第一次写入和解析
skill_content_v1 = """---
metadata:
name: cache-test
version: 1.0.0
description: Cache test v1
steps:
- name: step1
type: command
---
"""
skill_file.write_text(skill_content_v1)
skill1 = self.parser.parse_file(skill_file)
# 立即再次解析应该使用缓存
skill2 = self.parser.parse_file(skill_file)
assert skill1 is skill2 # 应该是同一个对象(缓存)
# 修改文件
skill_content_v2 = """---
metadata:
name: cache-test
version: 1.0.0
description: Cache test v2 # 修改了描述
steps:
- name: step1
type: command
---
"""
skill_file.write_text(skill_content_v2)
# 重新解析应该获取新版本
skill3 = self.parser.parse_file(skill_file)
assert skill3 is not skill1 # 应该是新对象
assert skill3.metadata.description == "Cache test v2"
class TestExtendedValidators:
"""测试扩展验证器"""
def setup_method(self):
from parsers.extended_validators import ExtendedSkillValidator
self.validator = ExtendedSkillValidator()
def test_security_validation(self):
"""测试安全验证"""
skill_data = {
'steps': [
{
'name': 'dangerous_step',
'script': 'eval("console.log("dangerous")")'
}
],
'permissions': {
'write': ['/etc/passwd']
}
}
issues = self.validator.validate_security(skill_data)
# 应该发现安全问题
assert len(issues) >= 2
assert any('eval() usage' in issue['description'] for issue in issues)
assert any('/etc/passwd' in issue['description'] for issue in issues)
def test_performance_validation(self):
"""测试性能验证"""
skill_data = {
'steps': [
{
'name': 'long_step',
'timeout': 5000, # 超过1小时
'script': 'for (let i = 0; i < 1000000; i++) {}'
}
]
}
issues = self.validator.validate_performance(skill_data)
assert len(issues) > 0
def test_best_practices_validation(self):
"""测试最佳实践验证"""
skill_data = {
'metadata': {
'description': 'Short' # 太短
},
'inputs': {
'required_input': {
'required': True
# 缺少验证规则
}
},
'steps': [
{'name': 'step1', 'type': 'command'}
]
}
issues = self.validator.validate_best_practices(skill_data)
assert len(issues) >= 2
def test_compatibility_validation(self):
"""测试兼容性验证"""
skill_data = {
'metadata': {
'requirements': {
'opencode_version': '2.0.0', # 高于当前版本
'tools': [
{'name': 'uncommon-tool', 'optional': False}
]
}
}
}
issues = self.validator.validate_compatibility(skill_data)
assert len(issues) >= 1
@pytest.fixture
def sample_skill_yaml():
"""提供示例技能YAML"""
return Path(__file__).parent / "test_data" / "sample_skill.yaml"
def test_integration_parse_complex_skill(sample_skill_yaml):
"""集成测试:解析复杂的技能文件"""
if not sample_skill_yaml.exists():
pytest.skip("Sample skill file not found")
parser = SkillDSLParser()
skill = parser.parse_file(sample_skill_yaml)
# 验证基本属性
assert skill.metadata.name == 'git-release'
assert skill.metadata.version == '1.0.0'
assert len(skill.steps) >= 4 # 至少有4个步骤
# 验证输入
assert 'release_type' in skill.inputs
assert skill.inputs['release_type'].required is True
assert skill.inputs['release_type'].choices == ['major', 'minor', 'patch']
# 验证步骤依赖
step_names = {step.name for step in skill.steps}
for step in skill.steps:
for dep in step.dependencies:
assert dep in step_names, f"Step {step.name} depends on unknown step {dep}"
# 验证输出
assert 'version' in skill.outputs
assert 'changelog' in skill.outputs
# 验证文档
assert skill.documentation.overview is not None
assert len(skill.documentation.examples) >= 1
# 验证权限
assert '.git' in skill.permissions.read
assert 'CHANGELOG.md' in skill.permissions.write
def test_validation_error_messages():
"""测试验证错误信息格式"""
parser = SkillDSLParser()
# 创建有多个错误的技能
invalid_content = """---
metadata:
name: invalid-name!
version: not-semver
description: Short
inputs:
123invalid: # 无效的输入名
type: string
steps:
- name: step1
type: invalid-type # 无效的类型
- name: step1 # 重复的名称
type: command
---
"""
result = parser.validate(invalid_content)
assert not result.is_valid
assert len(result.issues) >= 4
# 检查错误信息包含有用的详情
for issue in result.issues:
assert issue.message
assert issue.path
assert issue.code
if __name__ == "__main__":
pytest.main([__file__, "-v"])
五、性能优化与扩展性考虑
1. 解析器性能优化
# parsers/optimizations.py
import hashlib
import functools
from typing import Dict, Any, Optional
from pathlib import Path
import pickle
class OptimizedSkillParser(SkillDSLParser):
"""性能优化的技能解析器"""
def __init__(self, schema_dir: Optional[Path] = None, cache_size: int = 100):
super().__init__(schema_dir)
self.cache_size = cache_size
self.lru_cache = {} # 简单的LRU缓存实现
self.hit_count = 0
self.miss_count = 0
@functools.lru_cache(maxsize=100)
def _calculate_content_hash(self, content: str) -> str:
"""计算内容哈希(使用缓存)"""
return hashlib.sha256(content.encode('utf-8')).hexdigest()
def parse(self, content: str, validate: bool = True) -> Skill:
"""重写parse方法,添加缓存"""
content_hash = self._calculate_content_hash(content)
cache_key = (content_hash, validate)
if cache_key in self.lru_cache:
self.hit_count += 1
# 更新LRU顺序(将键移到最后)
value = self.lru_cache.pop(cache_key)
self.lru_cache[cache_key] = value
return value
self.miss_count += 1
skill = super().parse(content, validate)
# 添加到缓存
if len(self.lru_cache) >= self.cache_size:
# 移除最旧的条目
oldest_key = next(iter(self.lru_cache))
del self.lru_cache[oldest_key]
self.lru_cache[cache_key] = skill
return skill
def parse_file(self, file_path: Union[str, Path], validate: bool = True) -> Skill:
"""重写文件解析,添加文件监控"""
file_path = Path(file_path)
# 使用文件属性和修改时间作为缓存键的一部分
stat = file_path.stat()
cache_key = (str(file_path), stat.st_mtime, stat.st_size, validate)
if cache_key in self.lru_cache:
self.hit_count += 1
return self.lru_cache[cache_key]
self.miss_count += 1
skill = super().parse_file(file_path, validate)
# 添加到缓存
if len(self.lru_cache) >= self.cache_size:
oldest_key = next(iter(self.lru_cache))
del self.lru_cache[oldest_key]
self.lru_cache[cache_key] = skill
return skill
def get_cache_stats(self) -> Dict[str, Any]:
"""获取缓存统计信息"""
total = self.hit_count + self.miss_count
hit_rate = self.hit_count / total if total > 0 else 0
return {
'cache_size': len(self.lru_cache),
'max_cache_size': self.cache_size,
'hit_count': self.hit_count,
'miss_count': self.miss_count,
'hit_rate': f"{hit_rate:.2%}",
'total_requests': total
}
def clear_cache(self):
"""清空缓存"""
self.lru_cache.clear()
self.hit_count = 0
self.miss_count = 0
self._calculate_content_hash.cache_clear()
class SchemaRegistry:
"""Schema注册表,支持动态加载和版本管理"""
def __init__(self):
self.schemas = {}
self.schema_aliases = {}
self.default_version = "v1.0.0"
def register_schema(self, schema: Dict, version: str, aliases: List[str] = None):
"""注册Schema"""
self.schemas[version] = schema
if aliases:
for alias in aliases:
self.schema_aliases[alias] = version
def get_schema(self, version_or_alias: str) -> Optional[Dict]:
"""获取Schema"""
# 首先检查别名
actual_version = self.schema_aliases.get(version_or_alias, version_or_alias)
return self.schemas.get(actual_version)
def validate_against_schema(self, data: Dict, schema_id: str = None) -> ValidationResult:
"""根据Schema验证数据"""
schema_version = schema_id or self.default_version
schema = self.get_schema(schema_version)
if not schema:
return ValidationResult(
is_valid=False,
issues=[ValidationIssue(
code="SCHEMA_NOT_FOUND",
message=f"Schema {schema_version} not found",
severity=ValidationSeverity.ERROR,
path="$"
)],
schema_version=schema_version
)
# 这里可以添加更复杂的验证逻辑
# 比如支持Schema组合、引用等
return self._basic_validation(data, schema)
def _basic_validation(self, data: Dict, schema: Dict) -> ValidationResult:
"""基础验证逻辑"""
result = ValidationResult(
is_valid=True,
issues=[],
schema_version=schema.get('$id', 'unknown')
)
# 这里可以实现基本的验证逻辑
# 生产环境中应该使用完整的JSON Schema验证器
return result
六、部署与使用示例
1. 命令行工具
# cli/skill_cli.py
#!/usr/bin/env python3
"""OpenCode技能DSL解析器命令行工具"""
import sys
import argparse
import json
from pathlib import Path
from typing import Optional
from parsers.skill_parser import (
SkillDSLParser,
parse_skill,
parse_skill_file,
validate_skill
)
from parsers.extended_validators import ExtendedSkillValidator
def main():
parser = argparse.ArgumentParser(
description="OpenCode Skill DSL Parser CLI",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s validate skill.yaml
%(prog)s parse skill.yaml --format json
%(prog)s lint skill.yaml --all
"""
)
subparsers = parser.add_subparsers(dest='command', help='Command to execute')
# validate命令
validate_parser = subparsers.add_parser('validate', help='Validate a skill definition')
validate_parser.add_argument('file', help='Skill file to validate')
validate_parser.add_argument('--strict', action='store_true', help='Enable strict validation')
validate_parser.add_argument('--output', choices=['text', 'json'], default='text', help='Output format')
# parse命令
parse_parser = subparsers.add_parser('parse', help='Parse a skill definition')
parse_parser.add_argument('file', help='Skill file to parse')
parse_parser.add_argument('--format', choices=['yaml', 'json', 'python'], default='yaml', help='Output format')
parse_parser.add_argument('--no-validate', action='store_true', help='Skip validation')
# lint命令
lint_parser = subparsers.add_parser('lint', help='Lint a skill definition with extended checks')
lint_parser.add_argument('file', help='Skill file to lint')
lint_parser.add_argument('--all', action='store_true', help='Run all validators')
lint_parser.add_argument('--security', action='store_true', help='Run security checks')
lint_parser.add_argument('--performance', action='store_true', help='Run performance checks')
lint_parser.add_argument('--best-practices', action='store_true', help='Run best practices checks')
lint_parser.add_argument('--output', choices=['text', 'json', 'html'], default='text', help='Output format')
# schema命令
schema_parser = subparsers.add_parser('schema', help='Schema-related operations')
schema_parser.add_argument('--list', action='store_true', help='List available schemas')
schema_parser.add_argument('--show', metavar='VERSION', help='Show specific schema')
schema_parser.add_argument('--validate', metavar='FILE', help='Validate against specific schema version')
# stats命令
stats_parser = subparsers.add_parser('stats', help='Show parser statistics')
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
try:
if args.command == 'validate':
validate_command(args)
elif args.command == 'parse':
parse_command(args)
elif args.command == 'lint':
lint_command(args)
elif args.command == 'schema':
schema_command(args)
elif args.command == 'stats':
stats_command(args)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def validate_command(args):
"""执行验证命令"""
file_path = Path(args.file)
if not file_path.exists():
print(f"Error: File not found: {args.file}", file=sys.stderr)
sys.exit(1)
content = file_path.read_text(encoding='utf-8')
result = validate_skill(content)
if args.output == 'json':
output_json(result.to_dict())
else:
output_text(result, file_path)
def parse_command(args):
"""执行解析命令"""
file_path = Path(args.file)
skill = parse_skill_file(file_path, validate=not args.no_validate)
if args.format == 'json':
import json
print(json.dumps(skill.dict(exclude_none=True), indent=2, default=str))
elif args.format == 'python':
print(f"Skill: {skill.metadata.name} v{skill.metadata.version}")
print(f"Description: {skill.metadata.description}")
print(f"Steps: {len(skill.steps)}")
for step in skill.steps:
print(f" - {step.name} ({step.type})")
else: # yaml
parser = SkillDSLParser()
print(parser.serialize(skill, format='yaml'))
def lint_command(args):
"""执行代码检查命令"""
file_path = Path(args.file)
if not file_path.exists():
print(f"Error: File not found: {args.file}", file=sys.stderr)
sys.exit(1)
content = file_path.read_text(encoding='utf-8')
parser = SkillDSLParser()
try:
# 首先进行基础解析
skill = parser.parse(content, validate=True)
skill_data = skill.dict(exclude_none=True)
# 运行扩展验证器
validator = ExtendedSkillValidator()
# 确定要运行的验证器
validators_to_run = []
if args.all or args.security:
validators_to_run.append('security')
if args.all or args.performance:
validators_to_run.append('performance')
if args.all or args.best_practices:
validators_to_run.append('best_practices')
if args.all:
validators_to_run.append('compatibility')
if not validators_to_run:
validators_to_run = ['security', 'best_practices'] # 默认
# 运行验证
all_issues = {}
for validator_name in validators_to_run:
validator_func = getattr(validator, f'validate_{validator_name}')
issues = validator_func(skill_data)
if issues:
all_issues[validator_name] = issues
# 输出结果
if args.output == 'json':
output_json({
'file': str(file_path),
'skill': skill.metadata.name,
'validators_run': validators_to_run,
'issues': all_issues,
'summary': {
'total_issues': sum(len(issues) for issues in all_issues.values()),
'by_severity': {
'high': sum(1 for issues in all_issues.values()
for issue in issues if issue['severity'] == 'high'),
'medium': sum(1 for issues in all_issues.values()
for issue in issues if issue['severity'] == 'medium'),
'low': sum(1 for issues in all_issues.values()
for issue in issues if issue['severity'] == 'low'),
}
}
})
else:
output_lint_results(all_issues, file_path, args.output)
except Exception as e:
print(f"Error during linting: {e}", file=sys.stderr)
sys.exit(1)
def schema_command(args):
"""执行Schema命令"""
parser = SkillDSLParser()
if args.list:
# 列出可用Schema
print("Available schemas:")
# 这里可以添加列出Schema的逻辑
print(" v1.0.0 - OpenCode Skill Schema v1.0.0")
elif args.show:
# 显示特定Schema
print(f"Schema {args.show}:")
# 这里可以添加显示Schema的逻辑
elif args.validate:
# 使用特定Schema验证文件
file_path = Path(args.validate)
content = file_path.read_text(encoding='utf-8')
# 这里可以添加特定Schema验证的逻辑
print(f"Validated {file_path}")
def stats_command(args):
"""显示统计信息"""
parser = SkillDSLParser()
if hasattr(parser, 'get_cache_stats'):
stats = parser.get_cache_stats()
print("Parser Statistics:")
print(f" Cache hit rate: {stats['hit_rate']}")
print(f" Cache size: {stats['cache_size']}/{stats['max_cache_size']}")
print(f" Total requests: {stats['total_requests']}")
else:
print("Statistics not available for this parser")
def output_json(data: dict):
"""输出JSON格式"""
import json
print(json.dumps(data, indent=2, ensure_ascii=False))
def output_text(result, file_path: Path):
"""输出文本格式"""
print(f"Validation results for {file_path}:")
print(f" Schema version: {result.schema_version}")
print(f" Valid: {'✓' if result.is_valid else '✗'}")
print(f" Issues: {len(result.issues)}")
if result.issues:
print("
Details:")
for issue in result.issues:
icon = {
'info': 'ℹ',
'warning': '⚠',
'error': '✗',
'critical': '‼'
}.get(issue.severity, '?')
print(f" {icon} [{issue.severity.upper()}] {issue.message}")
if issue.path != '$':
print(f" Location: {issue.path}")
if issue.suggestion:
print(f" Suggestion: {issue.suggestion}")
print()
def output_lint_results(issues_by_validator: dict, file_path: Path, format: str):
"""输出代码检查结果"""
if format == 'html':
output_html_lint_results(issues_by_validator, file_path)
return
print(f"Lint results for {file_path}:")
print("=" * 80)
total_issues = sum(len(issues) for issues in issues_by_validator.values())
if total_issues == 0:
print("✓ No issues found!")
return
# 按严重性分组
by_severity = {'high': [], 'medium': [], 'low': []}
for validator_name, issues in issues_by_validator.items():
for issue in issues:
by_severity[issue['severity']].append((validator_name, issue))
# 输出摘要
print(f"Found {total_issues} issue(s):")
print(f" High: {len(by_severity['high'])}")
print(f" Medium: {len(by_severity['medium'])}")
print(f" Low: {len(by_severity['low'])}")
print()
# 输出详情(按严重性排序)
for severity in ['high', 'medium', 'low']:
issues_list = by_severity[severity]
if not issues_list:
continue
print(f"{severity.upper()} severity issues:")
print("-" * 40)
for validator_name, issue in issues_list:
icon = '‼' if severity == 'high' else '⚠' if severity == 'medium' else 'ℹ'
print(f"{icon} [{validator_name}] {issue['description']}")
print(f" Location: {issue['location']}")
if issue.get('suggestion'):
print(f" Suggestion: {issue['suggestion']}")
print()
def output_html_lint_results(issues_by_validator: dict, file_path: Path):
"""输出HTML格式的代码检查结果"""
html = f"""
Lint Results - {file_path.name}
Lint Results:
{file_path.name}
Summary
"""
# 计算统计
total_issues = sum(len(issues) for issues in issues_by_validator.values())
if total_issues == 0:
html += "✓ No issues found!
"
else:
by_severity = {'high': 0, 'medium': 0, 'low': 0}
for issues in issues_by_validator.values():
for issue in issues:
by_severity[issue['severity']] += 1
html += f"Found
{total_issues} issue(s):"
html += ""
for severity, count in by_severity.items():
if count > 0:
html += f"{severity.title()}: {count}"
html += ""
html += """
"""
# 输出问题详情
if total_issues > 0:
html += "Issues
"
# 收集所有问题并按严重性排序
all_issues = []
for validator_name, issues in issues_by_validator.items():
for issue in issues:
issue['validator'] = validator_name
all_issues.append(issue)
# 按严重性排序(高->中->低)
severity_order = {'high': 0, 'medium': 1, 'low': 2}
all_issues.sort(key=lambda x: severity_order[x['severity']])
for issue in all_issues:
html += f"""
{issue['severity']}">
{issue['severity'].upper()}
[{issue['validator']}] {issue['description']}
Location: {issue['location']}
"""
if issue.get('suggestion'):
html += f'Suggestion: {issue["suggestion"]}'
html += ""
html += """
"""
output_file = file_path.with_suffix('.lint.html')
output_file.write_text(html)
print(f"HTML report saved to: {output_file}")
if __name__ == "__main__":
main()
七、实施计划与时间安排
2天实施计划:
第一天:核心功能实现
-
上午 (4小时)
- 设计Skill元数据Schema (1小时)
- 实现Pydantic数据模型 (1.5小时)
- 创建YAML/Markdown分割逻辑 (1.5小时)
-
下午 (4小时)
- 实现基础Schema验证器 (2小时)
- 创建技能解析器核心逻辑 (2小时)
第二天:验证器与工具完善
-
上午 (4小时)
- 实现扩展验证器(安全、性能、最佳实践)(2小时)
- 添加缓存和性能优化 (1小时)
- 编写单元测试 (1小时)
-
下午 (4小时)
- 创建命令行工具 (1.5小时)
- 实现序列化功能 (1小时)
- 编写集成测试和示例 (1.5小时)
关键交付物:
- 完整的技能DSL解析器:支持YAML+Markdown混合格式
- Schema验证系统:基于JSON Schema的完整验证
- 扩展验证器:安全、性能、最佳实践检查
- 命令行工具:验证、解析、代码检查功能
- 详细文档:API文档、使用示例、测试用例
这个方案提供了一个生产就绪的技能DSL解析器,具备完整的验证功能、良好的性能和可扩展性架构,能够满足OpenCode技能系统的基础需求。









