# This file is part of craft-application.
#
# Copyright 2025 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License version 3, as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranties of MERCHANTABILITY,
# SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
"""Orchestrates linter registration, ignore config and execution."""
from __future__ import annotations
import inspect
from collections.abc import Iterable, Iterator
from typing import TYPE_CHECKING, cast
from craft_cli import emit
from craft_application import errors
from craft_application.lint import (
ExitCode,
IgnoreConfig,
IgnoreSpec,
LintContext,
LinterIssue,
Severity,
Stage,
should_ignore,
)
from craft_application.services import base
if TYPE_CHECKING:
from pathlib import Path
from typing import Any
from craft_application.application import AppMetadata
from craft_application.lint.base import AbstractLinter
from craft_application.models import Project
from craft_application.services.project import ProjectService
from craft_application.services.service_factory import ServiceFactory
[docs]
class LinterService(base.AppService):
"""Orchestrates linter registration, ignore config and execution."""
_class_registry: dict[Stage, list[type[AbstractLinter]]] = {
Stage.PRE: [],
Stage.POST: [],
}
[docs]
def __init__(self, app: AppMetadata, services: ServiceFactory) -> None:
super().__init__(app, services)
self._ignore_cfg: IgnoreConfig = {}
self._issues: list[LinterIssue] = []
self._issues_by_linter: dict[str, list[LinterIssue]] = {}
[docs]
@classmethod
def register(cls, linter_cls: type[AbstractLinter]) -> None:
"""Register a linter class for use by the service."""
if inspect.isabstract(linter_cls):
raise TypeError("Cannot register abstract linter class.")
emit.debug(
f"Registering linter {linter_cls.name!r} for stage {linter_cls.stage.value}"
)
cls._class_registry.setdefault(linter_cls.stage, []).append(linter_cls)
[docs]
@classmethod
def build_ignore_config(
cls,
project_dir: Path, # noqa: ARG003 (used by application overrides)
cli_ignores: IgnoreConfig | None = None,
) -> IgnoreConfig:
"""Merge ignore config from app-specific project rules and CLI overrides."""
config: IgnoreConfig = {}
if cli_ignores:
cls._merge_into(config, cli_ignores)
return config
[docs]
def load_ignore_config(
self,
project_dir: Path,
cli_ignores: IgnoreConfig | None = None,
) -> IgnoreConfig:
"""Load ignore configuration using the class-level builder."""
self._ignore_cfg = self.__class__.build_ignore_config(project_dir, cli_ignores)
return self._ignore_cfg
@staticmethod
def _normalize_ignore_config(raw: dict[str, Any]) -> IgnoreConfig:
"""Normalize raw YAML data into IgnoreConfig structure."""
cfg: IgnoreConfig = {}
for linter_name, spec in raw.items():
if not isinstance(spec, dict):
continue
spec_dict = cast("dict[str, Any]", spec)
ids_raw = cast(str | Iterable[str] | None, spec_dict.get("ids"))
by_filename_raw = cast(
dict[str, str | Iterable[str]] | None, spec_dict.get("by_filename")
)
if ids_raw == "*":
norm_ids: str | set[str] = "*"
elif isinstance(ids_raw, str):
norm_ids = {ids_raw}
else:
norm_ids = set(cast(Iterable[str], ids_raw or []))
norm_by_fname: dict[str, set[str]] = {}
for issue_id, globs in (by_filename_raw or {}).items():
if isinstance(globs, str):
norm_by_fname[str(issue_id)] = {globs}
else:
norm_by_fname[str(issue_id)] = set(cast(Iterable[str], globs or []))
cfg[str(linter_name)] = IgnoreSpec(ids=norm_ids, by_filename=norm_by_fname)
return cfg
@staticmethod
def _merge_into(base_cfg: IgnoreConfig, overlay: IgnoreConfig) -> None:
"""Merge overlay rules into base config, with overlay taking precedence."""
for linter_name, over_spec in overlay.items():
if linter_name not in base_cfg:
base_cfg[linter_name] = IgnoreSpec(
ids=set() if over_spec.ids != "*" else "*",
by_filename={k: set(v) for k, v in over_spec.by_filename.items()},
)
if over_spec.ids == "*":
continue
if isinstance(over_spec.ids, set):
base_cfg[linter_name].ids = set(over_spec.ids)
continue
base_spec = base_cfg[linter_name]
if over_spec.ids == "*":
base_spec.ids = "*"
base_spec.by_filename.clear()
elif isinstance(over_spec.ids, set):
if base_spec.ids != "*":
if not isinstance(base_spec.ids, set):
base_spec.ids = set()
base_spec.ids.update(over_spec.ids)
for issue_id, globs in over_spec.by_filename.items():
base_spec.by_filename.setdefault(issue_id, set()).update(globs)
[docs]
def pre_filter_linters(
self,
stage: Stage,
ctx: LintContext, # noqa: ARG002 (reserved for future use)
candidates: list[type[AbstractLinter]] | None = None,
) -> list[type[AbstractLinter]]:
"""App-specific selection hook."""
registry = type(self)._class_registry # noqa: SLF001
return list(candidates or registry.get(stage, []))
[docs]
def post_filter_issues(
self,
linter: AbstractLinter, # noqa: ARG002 (reserved for future use)
issues: Iterable[LinterIssue],
ctx: LintContext, # noqa: ARG002 (reserved for future use)
) -> Iterator[LinterIssue]:
"""App-specific filtering hook."""
yield from issues
[docs]
def run(
self,
stage: Stage,
ctx: LintContext,
) -> Iterator[LinterIssue]:
"""Run linters for a stage, streaming non-suppressed issues."""
if stage == Stage.PRE and ctx.project is None: # type: ignore[union-attr]
project_service = self._services.get("project")
if not project_service.is_configured:
project_service.configure(platform=None, build_for=None)
try:
project = project_service.get()
except errors.CraftValidationError as exc:
if self._is_adoptable_missing_error(exc):
project = self._build_project_without_adopt_info(project_service)
else:
raise
ctx = LintContext(
project_dir=ctx.project_dir,
project=project,
artifact_dirs=ctx.artifact_dirs,
)
self._issues.clear()
self._issues_by_linter.clear()
registry = type(self)._class_registry # noqa: SLF001
selected = self.pre_filter_linters(stage, ctx, registry.get(stage, []))
for cls in selected:
linter = cls()
raw_issues = linter.run(ctx)
user_filtered = (
issue
for issue in raw_issues
if not should_ignore(linter.name, issue, self._ignore_cfg)
)
filtered = self.post_filter_issues(linter, user_filtered, ctx)
for issue in filtered:
self._issues.append(issue)
self._issues_by_linter.setdefault(linter.name, []).append(issue)
yield issue
[docs]
def get_highest_severity(self) -> Severity | None:
"""Return the highest severity present among collected issues."""
if not self._issues:
return None
return max((i.severity for i in self._issues), default=None)
[docs]
def summary(self) -> ExitCode:
"""Return an exit code (non-zero only for errors)."""
highest = self.get_highest_severity()
if highest == Severity.ERROR:
return ExitCode.ERROR
return ExitCode.OK
@property
def issues(self) -> list[LinterIssue]:
"""Return a copy of issues collected during the last run."""
return list(self._issues)
@property
def issues_by_linter(self) -> dict[str, list[LinterIssue]]:
"""Return collected issues grouped by linter name."""
return {name: list(issues) for name, issues in self._issues_by_linter.items()}
@staticmethod
def _is_adoptable_missing_error(error: errors.CraftValidationError) -> bool:
"""Return True for adopt-info missing-field validation errors."""
message = str(error)
return (
"'adopt-info' not set" in message
and "required fields are missing" in message
)
def _build_project_without_adopt_info(
self, project_service: ProjectService
) -> Project:
"""Build a project model for linting without adopt-info validation."""
raw_project = project_service.get_raw()
project_path = project_service.resolve_project_file_path()
return self._app.ProjectClass.from_yaml_data(raw_project, project_path)