# Copyright 1999-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 from collections import deque import gc import gzip import logging import signal import sys import textwrap import time import warnings import weakref import zlib import portage from portage import os from portage import _encodings from portage import _unicode_encode from portage import installation from portage.cache.mappings import slot_dict_class from portage.elog.messages import eerror from portage.output import colorize, create_color_func, red bad = create_color_func("BAD") from portage._sets import SETPREFIX from portage._sets.base import InternalPackageSet from portage.util import ensure_dirs, writemsg, writemsg_level from portage.util.futures import asyncio from portage.util.SlotObject import SlotObject from portage.util._async.SchedulerInterface import SchedulerInterface from portage.package.ebuild.digestcheck import digestcheck from portage.package.ebuild.digestgen import digestgen from portage.package.ebuild.doebuild import _check_temp_dir, _prepare_self_update from portage.package.ebuild.prepare_build_dirs import prepare_build_dirs import _emerge from _emerge.BinpkgFetcher import BinpkgFetcher from _emerge.BinpkgPrefetcher import BinpkgPrefetcher from _emerge.BinpkgVerifier import BinpkgVerifier from _emerge.Blocker import Blocker from _emerge.BlockerDB import BlockerDB from _emerge.clear_caches import clear_caches from _emerge.create_depgraph_params import create_depgraph_params from _emerge.create_world_atom import create_world_atom from _emerge.DepPriority import DepPriority from _emerge.depgraph import depgraph, resume_depgraph from _emerge.EbuildBuildDir import EbuildBuildDir from _emerge.EbuildFetcher import EbuildFetcher from _emerge.EbuildPhase import EbuildPhase from _emerge.emergelog import emergelog from _emerge.FakeVartree import FakeVartree from _emerge.getloadavg import getloadavg from _emerge._find_deep_system_runtime_deps import _find_deep_system_runtime_deps from _emerge._flush_elog_mod_echo import _flush_elog_mod_echo from _emerge.JobStatusDisplay import JobStatusDisplay from _emerge.MergeListItem import MergeListItem from _emerge.Package import Package from _emerge.PackageMerge import PackageMerge from _emerge.PollScheduler import PollScheduler from _emerge.SequentialTaskQueue import SequentialTaskQueue # enums FAILURE = 1 class Scheduler(PollScheduler): # max time between loadavg checks (seconds) _loadavg_latency = 30 # max time between display status updates (seconds) _max_display_latency = 3 _opts_ignore_blockers = frozenset( ["--buildpkgonly", "--fetchonly", "--fetch-all-uri", "--nodeps", "--pretend"] ) _opts_no_background = frozenset(["--pretend", "--fetchonly", "--fetch-all-uri"]) _opts_no_self_update = frozenset( ["--buildpkgonly", "--fetchonly", "--fetch-all-uri", "--pretend"] ) class _iface_class(SchedulerInterface): __slots__ = ("fetch", "scheduleSetup", "scheduleUnpack") class _fetch_iface_class(SlotObject): __slots__ = ("log_file", "schedule") _task_queues_class = slot_dict_class( ("merge", "jobs", "ebuild_locks", "fetch", "unpack"), prefix="" ) class _build_opts_class(SlotObject): __slots__ = ( "buildpkg", "buildpkg_exclude", "buildpkgonly", "fetch_all_uri", "fetchonly", "pretend", ) class _binpkg_opts_class(SlotObject): __slots__ = ("fetchonly", "getbinpkg", "pretend") class _pkg_count_class(SlotObject): __slots__ = ("curval", "maxval") class _emerge_log_class(SlotObject): __slots__ = ("xterm_titles",) def log(self, *pargs, **kwargs): if not self.xterm_titles: # Avoid interference with the scheduler's status display. kwargs.pop("short_msg", None) emergelog(self.xterm_titles, *pargs, **kwargs) class _failed_pkg(SlotObject): __slots__ = ("build_dir", "build_log", "pkg", "postinst_failure", "returncode") class _ConfigPool: """Interface for a task to temporarily allocate a config instance from a pool. This allows a task to be constructed long before the config instance actually becomes needed, like when prefetchers are constructed for the whole merge list.""" __slots__ = ("_root", "_allocate", "_deallocate") def __init__(self, root, allocate, deallocate): self._root = root self._allocate = allocate self._deallocate = deallocate def allocate(self): return self._allocate(self._root) def deallocate(self, settings): self._deallocate(settings) class _unknown_internal_error(portage.exception.PortageException): """ Used internally to terminate scheduling. The specific reason for the failure should have been dumped to stderr. """ def __init__(self, value=""): portage.exception.PortageException.__init__(self, value) def __init__( self, settings, trees, mtimedb, myopts, spinner, mergelist=None, favorites=None, graph_config=None, ): PollScheduler.__init__(self, main=True) if mergelist is not None: warnings.warn( "The mergelist parameter of the " + "_emerge.Scheduler constructor is now unused. Use " + "the graph_config parameter instead.", DeprecationWarning, stacklevel=2, ) self.settings = settings self.target_root = settings["EROOT"] self.trees = trees self.myopts = myopts self._spinner = spinner self._mtimedb = mtimedb self._favorites = favorites self._args_set = InternalPackageSet(favorites, allow_repo=True) self._build_opts = self._build_opts_class() for k in self._build_opts.__slots__: setattr(self._build_opts, k, myopts.get("--" + k.replace("_", "-"))) self._build_opts.buildpkg_exclude = InternalPackageSet( initial_atoms=" ".join(myopts.get("--buildpkg-exclude", [])).split(), allow_wildcard=True, allow_repo=True, ) if "mirror" in self.settings.features: self._build_opts.fetch_all_uri = True self._binpkg_opts = self._binpkg_opts_class() for k in self._binpkg_opts.__slots__: setattr(self._binpkg_opts, k, "--" + k.replace("_", "-") in myopts) self.curval = 0 self._logger = self._emerge_log_class() self._task_queues = self._task_queues_class() for k in self._task_queues.allowed_keys: setattr(self._task_queues, k, SequentialTaskQueue()) # Holds merges that will wait to be executed when no builds are # executing. This is useful for system packages since dependencies # on system packages are frequently unspecified. For example, see # bug #256616. self._merge_wait_queue = deque() # Holds merges that have been transfered from the merge_wait_queue to # the actual merge queue. They are removed from this list upon # completion. Other packages can start building only when this list is # empty. self._merge_wait_scheduled = [] # Holds system packages and their deep runtime dependencies. Before # being merged, these packages go to merge_wait_queue, to be merged # when no other packages are building. self._deep_system_deps = set() # Holds packages to merge which will satisfy currently unsatisfied # deep runtime dependencies of system packages. If this is not empty # then no parallel builds will be spawned until it is empty. This # minimizes the possibility that a build will fail due to the system # being in a fragile state. For example, see bug #259954. self._unsatisfied_system_deps = set() self._status_display = JobStatusDisplay( xterm_titles=("notitles" not in settings.features) ) self._max_load = myopts.get("--load-average") max_jobs = myopts.get("--jobs") if max_jobs is None: max_jobs = 1 self._set_max_jobs(max_jobs) self._running_root = trees[trees._running_eroot]["root_config"] self.edebug = 0 if settings.get("PORTAGE_DEBUG", "") == "1": self.edebug = 1 self.pkgsettings = {} self._config_pool = {} for root in self.trees: self._config_pool[root] = [] self._fetch_log = os.path.join( _emerge.emergelog._emerge_log_dir, "emerge-fetch.log" ) fetch_iface = self._fetch_iface_class( log_file=self._fetch_log, schedule=self._schedule_fetch ) self._sched_iface = self._iface_class( self._event_loop, is_background=self._is_background, fetch=fetch_iface, scheduleSetup=self._schedule_setup, scheduleUnpack=self._schedule_unpack, ) self._prefetchers = weakref.WeakValueDictionary() self._pkg_queue = [] self._jobs = 0 self._running_tasks = {} self._completed_tasks = set() self._main_exit = None self._main_loadavg_handle = None self._schedule_merge_wakeup_task = None self._failed_pkgs = [] self._failed_pkgs_all = [] self._failed_pkgs_die_msgs = [] self._post_mod_echo_msgs = [] self._parallel_fetch = False self._init_graph(graph_config) merge_count = len( [ x for x in self._mergelist if isinstance(x, Package) and x.operation == "merge" ] ) self._pkg_count = self._pkg_count_class(curval=0, maxval=merge_count) self._status_display.maxval = self._pkg_count.maxval # The load average takes some time to respond when new # jobs are added, so we need to limit the rate of adding # new jobs. self._job_delay_max = 5 self._previous_job_start_time = None self._job_delay_timeout_id = None # The load average takes some time to respond when after # a SIGSTOP/SIGCONT cycle, so delay scheduling for some # time after SIGCONT is received. self._sigcont_delay = 5 self._sigcont_time = None # This is used to memoize the _choose_pkg() result when # no packages can be chosen until one of the existing # jobs completes. self._choose_pkg_return_early = False features = self.settings.features if "parallel-fetch" in features and not ( "--pretend" in self.myopts or "--fetch-all-uri" in self.myopts or "--fetchonly" in self.myopts ): if "distlocks" not in features: portage.writemsg(red("!!!") + "\n", noiselevel=-1) portage.writemsg( red("!!!") + " parallel-fetching " + "requires the distlocks feature enabled" + "\n", noiselevel=-1, ) portage.writemsg( red("!!!") + " you have it disabled, " + "thus parallel-fetching is being disabled" + "\n", noiselevel=-1, ) portage.writemsg(red("!!!") + "\n", noiselevel=-1) elif merge_count > 1: self._parallel_fetch = True if self._parallel_fetch: # clear out existing fetch log if it exists try: open(self._fetch_log, "w").close() except OSError: pass self._running_portage = None portage_match = self._running_root.trees["vartree"].dbapi.match( portage.const.PORTAGE_PACKAGE_ATOM ) if portage_match: cpv = portage_match.pop() self._running_portage = self._pkg( cpv, "installed", self._running_root, installed=True ) def _handle_self_update(self): if installation.TYPE != installation.TYPES.SYSTEM: return os.EX_OK if self._opts_no_self_update.intersection(self.myopts): return os.EX_OK for x in self._mergelist: if not isinstance(x, Package): continue if x.operation != "merge": continue if x.root != self._running_root.root: continue if not portage.dep.match_from_list(portage.const.PORTAGE_PACKAGE_ATOM, [x]): continue rval = _check_temp_dir(self.settings) if rval != os.EX_OK: return rval _prepare_self_update(self.settings) break return os.EX_OK def _terminate_tasks(self): self._status_display.quiet = True for task in list(self._running_tasks.values()): if task.isAlive(): # This task should keep the main loop running until # it has had an opportunity to clean up after itself. # Rely on its exit hook to remove it from # self._running_tasks when it has finished cleaning up. task.cancel() else: # This task has been waiting to be started in one of # self._task_queues which are all cleared below. It # will never be started, so purged it from # self._running_tasks so that it won't keep the main # loop running. del self._running_tasks[id(task)] for q in self._task_queues.values(): q.clear() def _init_graph(self, graph_config): """ Initialization structures used for dependency calculations involving currently installed packages. """ self._set_graph_config(graph_config) self._blocker_db = {} depgraph_params = create_depgraph_params(self.myopts, None) dynamic_deps = "dynamic_deps" in depgraph_params ignore_built_slot_operator_deps = ( self.myopts.get("--ignore-built-slot-operator-deps", "n") == "y" ) for root in self.trees: if graph_config is None: fake_vartree = FakeVartree( self.trees[root]["root_config"], pkg_cache=self._pkg_cache, dynamic_deps=dynamic_deps, ignore_built_slot_operator_deps=ignore_built_slot_operator_deps, ) fake_vartree.sync() else: fake_vartree = graph_config.trees[root]["vartree"] self._blocker_db[root] = BlockerDB(fake_vartree) def _destroy_graph(self): """ Use this to free memory at the beginning of _calc_resume_list(). After _calc_resume_list(), the _init_graph() method must to be called in order to re-generate the structures that this method destroys. """ self._blocker_db = None self._set_graph_config(None) gc.collect() def _set_max_jobs(self, max_jobs): self._max_jobs = max_jobs self._task_queues.jobs.max_jobs = max_jobs if "parallel-install" in self.settings.features: self._task_queues.merge.max_jobs = max_jobs def _background_mode(self): """ Check if background mode is enabled and adjust states as necessary. @rtype: bool @return: True if background mode is enabled, False otherwise. """ background = ( self._max_jobs is True or self._max_jobs > 1 or "--quiet" in self.myopts or self.myopts.get("--quiet-build") == "y" ) and not bool(self._opts_no_background.intersection(self.myopts)) if background: interactive_tasks = self._get_interactive_tasks() if interactive_tasks: background = False writemsg_level( ">>> Sending package output to stdio due " + "to interactive package(s):\n", level=logging.INFO, noiselevel=-1, ) msg = [""] for pkg in interactive_tasks: pkg_str = " " + colorize("INFORM", str(pkg.cpv)) if pkg.root_config.settings["ROOT"] != "/": pkg_str += " for " + pkg.root msg.append(pkg_str) msg.append("") writemsg_level( "".join(f"{l}\n" for l in msg), level=logging.INFO, noiselevel=-1, ) if self._max_jobs is True or self._max_jobs > 1: self._set_max_jobs(1) writemsg_level( ">>> Setting --jobs=1 due " + "to the above interactive package(s)\n", level=logging.INFO, noiselevel=-1, ) writemsg_level( ">>> In order to temporarily mask " + "interactive updates, you may\n" + ">>> specify --accept-properties=-interactive\n", level=logging.INFO, noiselevel=-1, ) self._status_display.quiet = not background or ( "--quiet" in self.myopts and "--verbose" not in self.myopts ) self._logger.xterm_titles = ( "notitles" not in self.settings.features and self._status_display.quiet ) return background def _get_interactive_tasks(self): interactive_tasks = [] for task in self._mergelist: if not (isinstance(task, Package) and task.operation == "merge"): continue if "interactive" in task.properties: interactive_tasks.append(task) return interactive_tasks def _set_graph_config(self, graph_config): if graph_config is None: self._graph_config = None self._pkg_cache = {} self._digraph = None self._mergelist = [] self._world_atoms = None self._deep_system_deps.clear() return self._graph_config = graph_config self._pkg_cache = graph_config.pkg_cache self._digraph = graph_config.graph self._mergelist = graph_config.mergelist # Generate world atoms while the event loop is not running, # since otherwise portdbapi match calls in the create_world_atom # function could trigger event loop recursion. self._world_atoms = {} for pkg in self._mergelist: if getattr(pkg, "operation", None) != "merge": continue atom = create_world_atom( pkg, self._args_set, pkg.root_config, before_install=True ) if atom is not None: self._world_atoms[pkg] = atom if "--nodeps" in self.myopts or ( self._max_jobs is not True and self._max_jobs < 2 ): # save some memory self._digraph = None graph_config.graph = None graph_config.pkg_cache.clear() self._deep_system_deps.clear() for pkg in self._mergelist: self._pkg_cache[pkg] = pkg return self._find_system_deps() self._prune_digraph() self._prevent_builddir_collisions() if "--debug" in self.myopts: writemsg("\nscheduler digraph:\n\n", noiselevel=-1) self._digraph.debug_print() writemsg("\n", noiselevel=-1) def _find_system_deps(self): """ Find system packages and their deep runtime dependencies. Before being merged, these packages go to merge_wait_queue, to be merged when no other packages are building. NOTE: This can only find deep system deps if the system set has been added to the graph and traversed deeply (the depgraph "complete" parameter will do this, triggered by emerge --complete-graph option). """ params = create_depgraph_params(self.myopts, None) if not params["implicit_system_deps"]: return deep_system_deps = self._deep_system_deps deep_system_deps.clear() deep_system_deps.update(_find_deep_system_runtime_deps(self._digraph)) deep_system_deps.difference_update( [pkg for pkg in deep_system_deps if pkg.operation != "merge"] ) def _prune_digraph(self): """ Prune any root nodes that are irrelevant. """ graph = self._digraph completed_tasks = self._completed_tasks removed_nodes = set() while True: for node in graph.root_nodes(): if ( not isinstance(node, Package) or (node.installed and node.operation == "nomerge") or node.onlydeps or node in completed_tasks ): removed_nodes.add(node) if removed_nodes: graph.difference_update(removed_nodes) if not removed_nodes: break removed_nodes.clear() def _prevent_builddir_collisions(self): """ When building stages, sometimes the same exact cpv needs to be merged to both $ROOTs. Add edges to the digraph in order to avoid collisions in the builddir. Currently, normal file locks would be inappropriate for this purpose since emerge holds all of it's build dir locks from the main process. """ cpv_map = {} for pkg in self._mergelist: if not isinstance(pkg, Package): # a satisfied blocker continue if pkg.installed: continue if pkg.cpv not in cpv_map: cpv_map[pkg.cpv] = [pkg] continue for earlier_pkg in cpv_map[pkg.cpv]: self._digraph.add( earlier_pkg, pkg, priority=DepPriority(buildtime=True) ) cpv_map[pkg.cpv].append(pkg) class _pkg_failure(portage.exception.PortageException): """ An instance of this class is raised by unmerge() when an uninstallation fails. """ status = 1 def __init__(self, *pargs): portage.exception.PortageException.__init__(self, pargs) if pargs: self.status = pargs[0] def _schedule_fetch(self, fetcher, force_queue=False): """ Schedule a fetcher, in order to control the number of concurrent fetchers. If self._max_jobs is greater than 1 then the fetch queue is bypassed and the fetcher is started immediately, otherwise it is added to the front of the parallel-fetch queue. NOTE: The parallel-fetch queue is currently used to serialize access to the parallel-fetch log, so changes in the log handling would be required before it would be possible to enable concurrent fetching within the parallel-fetch queue. """ if self._max_jobs > 1 and not force_queue: fetcher.start() else: self._task_queues.fetch.addFront(fetcher) def _schedule_setup(self, setup_phase): """ Schedule a setup phase on the merge queue, in order to serialize unsandboxed access to the live filesystem. """ if ( self._task_queues.merge.max_jobs > 1 and "ebuild-locks" in self.settings.features ): # Use a separate queue for ebuild-locks when the merge # queue allows more than 1 job (due to parallel-install), # since the portage.locks module does not behave as desired # if we try to lock the same file multiple times # concurrently from the same process. self._task_queues.ebuild_locks.add(setup_phase) else: self._task_queues.merge.add(setup_phase) self._schedule() def _schedule_unpack(self, unpack_phase): """ Schedule an unpack phase on the unpack queue, in order to serialize $DISTDIR access for live ebuilds. """ self._task_queues.unpack.add(unpack_phase) def _find_blockers(self, new_pkg): """ Returns a callable. """ def get_blockers(): return self._find_blockers_impl(new_pkg) return get_blockers def _find_blockers_impl(self, new_pkg): if self._opts_ignore_blockers.intersection(self.myopts): return None blocker_db = self._blocker_db[new_pkg.root] blocked_pkgs = [] for blocking_pkg in blocker_db.findInstalledBlockers(new_pkg): if new_pkg.slot_atom == blocking_pkg.slot_atom: continue if new_pkg.cpv == blocking_pkg.cpv: continue blocked_pkgs.append(blocking_pkg) return blocked_pkgs def _generate_digests(self): """ Generate digests if necessary for --digests or FEATURES=digest. In order to avoid interference, this must done before parallel tasks are started. """ digest = "--digest" in self.myopts if not digest: for pkgsettings in self.pkgsettings.values(): if pkgsettings.mycpv is not None: # ensure that we are using global features # settings rather than those from package.env pkgsettings.reset() if "digest" in pkgsettings.features: digest = True break if not digest: return os.EX_OK for x in self._mergelist: if ( not isinstance(x, Package) or x.type_name != "ebuild" or x.operation != "merge" ): continue pkgsettings = self.pkgsettings[x.root] if pkgsettings.mycpv is not None: # ensure that we are using global features # settings rather than those from package.env pkgsettings.reset() if "--digest" not in self.myopts and "digest" not in pkgsettings.features: continue portdb = x.root_config.trees["porttree"].dbapi ebuild_path = portdb.findname(x.cpv, myrepo=x.repo) if ebuild_path is None: raise AssertionError(f"ebuild not found for '{x.cpv}'") pkgsettings["O"] = os.path.dirname(ebuild_path) if not digestgen(mysettings=pkgsettings, myportdb=portdb): writemsg_level( f"!!! Unable to generate manifest for '{x.cpv}'.\n", level=logging.ERROR, noiselevel=-1, ) return FAILURE return os.EX_OK def _check_manifests(self): # Verify all the manifests now so that the user is notified of failure # as soon as possible. if ( "strict" not in self.settings.features or "--fetchonly" in self.myopts or "--fetch-all-uri" in self.myopts ): return os.EX_OK shown_verifying_msg = False quiet_settings = {} for myroot, pkgsettings in self.pkgsettings.items(): quiet_config = portage.config(clone=pkgsettings) quiet_config["PORTAGE_QUIET"] = "1" quiet_config.backup_changes("PORTAGE_QUIET") quiet_settings[myroot] = quiet_config del quiet_config failures = 0 for x in self._mergelist: if not isinstance(x, Package) or x.type_name != "ebuild": continue if x.operation == "uninstall": continue if not shown_verifying_msg: shown_verifying_msg = True self._status_msg("Verifying ebuild manifests") root_config = x.root_config portdb = root_config.trees["porttree"].dbapi quiet_config = quiet_settings[root_config.root] ebuild_path = portdb.findname(x.cpv, myrepo=x.repo) if ebuild_path is None: raise AssertionError(f"ebuild not found for '{x.cpv}'") quiet_config["O"] = os.path.dirname(ebuild_path) if not digestcheck([], quiet_config, strict=True): failures |= 1 if failures: return FAILURE return os.EX_OK def _add_prefetchers(self): if not self._parallel_fetch: return if self._parallel_fetch: prefetchers = self._prefetchers for pkg in self._mergelist: # mergelist can contain solved Blocker instances if not isinstance(pkg, Package) or pkg.operation == "uninstall": continue prefetcher = self._create_prefetcher(pkg) if prefetcher is not None: # This will start the first prefetcher immediately, so that # self._task() won't discard it. This avoids a case where # the first prefetcher is discarded, causing the second # prefetcher to occupy the fetch queue before the first # fetcher has an opportunity to execute. prefetchers[pkg] = prefetcher self._task_queues.fetch.add(prefetcher) def _create_prefetcher(self, pkg): """ @return: a prefetcher, or None if not applicable """ prefetcher = None if not isinstance(pkg, Package): pass elif pkg.type_name == "ebuild": prefetcher = EbuildFetcher( background=True, config_pool=self._ConfigPool( pkg.root, self._allocate_config, self._deallocate_config ), fetchonly=1, fetchall=self._build_opts.fetch_all_uri, logfile=self._fetch_log, pkg=pkg, prefetch=True, scheduler=self._sched_iface, ) elif ( pkg.type_name == "binary" and "--getbinpkg" in self.myopts and pkg.root_config.trees["bintree"].download_required(pkg.cpv) ): prefetcher = BinpkgPrefetcher( background=True, pkg=pkg, scheduler=self._sched_iface ) return prefetcher async def _run_pkg_pretend(self, loop=None): """ Since pkg_pretend output may be important, this method sends all output directly to stdout (regardless of options like --quiet or --jobs). """ failures = 0 sched_iface = loop = asyncio._wrap_loop(loop or self._sched_iface) for x in self._mergelist: if not isinstance(x, Package): continue if x.operation == "uninstall": continue if x.eapi in ("0", "1", "2", "3"): continue if "pretend" not in x.defined_phases: continue self._termination_check() if self._terminated_tasks: raise asyncio.CancelledError root_config = x.root_config settings = self._allocate_config(root_config.root) settings.setcpv(x) color = "PKG_BINARY_MERGE" if x.built else "INFORM" self._status_msg(f"Running pre-merge checks for {colorize(color, x.cpv)}") if not x.built: # Get required SRC_URI metadata (it's not cached in x.metadata # because some packages have an extremely large SRC_URI value). portdb = root_config.trees["porttree"].dbapi (settings.configdict["pkg"]["SRC_URI"],) = await portdb.async_aux_get( x.cpv, ["SRC_URI"], myrepo=x.repo, loop=loop ) # setcpv/package.env allows for per-package PORTAGE_TMPDIR so we # have to validate it for each package rval = _check_temp_dir(settings) if rval != os.EX_OK: failures += 1 self._record_pkg_failure(x, settings, FAILURE) self._deallocate_config(settings) continue build_dir_path = os.path.join( os.path.realpath(settings["PORTAGE_TMPDIR"]), "portage", x.category, x.pf, ) existing_builddir = os.path.isdir(build_dir_path) settings["PORTAGE_BUILDDIR"] = build_dir_path build_dir = EbuildBuildDir(scheduler=sched_iface, settings=settings) await build_dir.async_lock() current_task = None try: # Clean up the existing build dir, in case pkg_pretend # checks for available space (bug #390711). if existing_builddir: if x.built: tree = "bintree" infloc = os.path.join(build_dir_path, "build-info") ebuild_path = os.path.join(infloc, x.pf + ".ebuild") else: tree = "porttree" portdb = root_config.trees["porttree"].dbapi ebuild_path = portdb.findname(x.cpv, myrepo=x.repo) if ebuild_path is None: raise AssertionError(f"ebuild not found for '{x.cpv}'") portage.package.ebuild.doebuild.doebuild_environment( ebuild_path, "clean", settings=settings, db=self.trees[settings["EROOT"]][tree].dbapi, ) clean_phase = EbuildPhase( background=False, phase="clean", scheduler=sched_iface, settings=settings, ) current_task = clean_phase clean_phase.start() await clean_phase.async_wait() current_task = None if x.built: tree = "bintree" bintree = root_config.trees["bintree"].dbapi.bintree fetched = False # Display fetch on stdout, so that it's always clear what # is consuming time here. if bintree.download_required(x.cpv): fetcher = self._get_prefetcher(x) if fetcher is not None and not fetcher.isAlive(): # Cancel it because it hasn't started yet. fetcher.cancel() fetcher = None if fetcher is None: fetcher = BinpkgFetcher(pkg=x, scheduler=loop) fetcher.start() # We only set the fetched value when fetcher # is a BinpkgFetcher, since BinpkgPrefetcher # handles fetch, verification, and the # bintree.inject call which moves the file. fetched = fetcher.pkg_path else: msg = ( "Fetching in the background:", fetcher.pkg_path, "To view fetch progress, run in another terminal:", f"tail -f {self._fetch_log}", ) out = portage.output.EOutput() for l in msg: out.einfo(l) if await fetcher.async_wait() != os.EX_OK: failures += 1 self._record_pkg_failure(x, settings, fetcher.returncode) continue if fetched is False: filename = bintree.getname(x.cpv) else: filename = fetched verifier = BinpkgVerifier( pkg=x, scheduler=sched_iface, _pkg_path=filename ) current_task = verifier verifier.start() if await verifier.async_wait() != os.EX_OK: failures += 1 self._record_pkg_failure(x, settings, verifier.returncode) continue current_task = None if fetched and bintree.get_local_repo_location(x.cpv): os.rename(fetched, fetcher.pkg_allocated_path) elif fetched: if not bintree.inject( x.cpv, current_pkg_path=fetched, allocated_pkg_path=fetcher.pkg_allocated_path, ): eerror( "Binary package is not usable", phase="pretend", key=x.cpv, ) failures += 1 self._record_pkg_failure(x, settings, 1) continue infloc = os.path.join(build_dir_path, "build-info") ensure_dirs(infloc) try: await bintree.dbapi.unpack_metadata(settings, infloc, loop=loop) except portage.exception.SignatureException as e: writemsg( f"!!! Invalid binary package: '{bintree.getname(x.cpv)}', {e}\n", noiselevel=-1, ) failures += 1 self._record_pkg_failure(x, settings, 1) continue ebuild_path = os.path.join(infloc, x.pf + ".ebuild") settings.configdict["pkg"]["EMERGE_FROM"] = "binary" settings.configdict["pkg"]["MERGE_TYPE"] = "binary" else: tree = "porttree" portdb = root_config.trees["porttree"].dbapi ebuild_path = portdb.findname(x.cpv, myrepo=x.repo) if ebuild_path is None: raise AssertionError(f"ebuild not found for '{x.cpv}'") settings.configdict["pkg"]["EMERGE_FROM"] = "ebuild" if self._build_opts.buildpkgonly: settings.configdict["pkg"]["MERGE_TYPE"] = "buildonly" else: settings.configdict["pkg"]["MERGE_TYPE"] = "source" portage.package.ebuild.doebuild.doebuild_environment( ebuild_path, "pretend", settings=settings, db=self.trees[settings["EROOT"]][tree].dbapi, ) prepare_build_dirs(root_config.root, settings, cleanup=0) vardb = root_config.trees["vartree"].dbapi settings["REPLACING_VERSIONS"] = " ".join( { portage.versions.cpv_getversion(match) for match in vardb.match(x.slot_atom) + vardb.match("=" + x.cpv) } ) pretend_phase = EbuildPhase( phase="pretend", scheduler=sched_iface, settings=settings ) current_task = pretend_phase pretend_phase.start() ret = await pretend_phase.async_wait() # Leave current_task assigned in order to trigger clean # on success in the below finally block. if ret != os.EX_OK: failures += 1 self._record_pkg_failure(x, settings, ret) finally: if current_task is not None: if current_task.isAlive(): current_task.cancel() portage.elog.elog_process(x.cpv, settings) if current_task is not None and current_task.returncode == os.EX_OK: clean_phase = EbuildPhase( background=False, phase="clean", scheduler=sched_iface, settings=settings, ) clean_phase.start() await clean_phase.async_wait() await build_dir.async_unlock() self._deallocate_config(settings) if failures: return FAILURE return os.EX_OK def _record_pkg_failure(self, pkg, settings, ret): """Record a package failure. This eliminates the package from the --keep-going merge list, and immediately calls _failed_pkg_msg if we have not been terminated.""" self._failed_pkgs.append( self._failed_pkg( build_dir=settings.get("PORTAGE_BUILDDIR"), build_log=settings.get("PORTAGE_LOG_FILE"), pkg=pkg, returncode=ret, ) ) if not self._terminated_tasks: self._failed_pkg_msg(self._failed_pkgs[-1], "emerge", "for") self._status_display.failed = len(self._failed_pkgs) def merge(self): if "--resume" in self.myopts: # We're resuming. portage.writemsg_stdout( colorize("GOOD", "*** Resuming merge...\n"), noiselevel=-1 ) self._logger.log(" *** Resuming merge...") self._save_resume_list() try: self._background = self._background_mode() except self._unknown_internal_error: return FAILURE rval = self._handle_self_update() if rval != os.EX_OK: return rval for root in self.trees: root_config = self.trees[root]["root_config"] # Even for --pretend --fetch mode, PORTAGE_TMPDIR is required # since it might spawn pkg_nofetch which requires PORTAGE_BUILDDIR # for ensuring sane $PWD (bug #239560) and storing elog messages. tmpdir = root_config.settings.get("PORTAGE_TMPDIR", "") if not tmpdir or not os.path.isdir(tmpdir): msg = ( "The directory specified in your PORTAGE_TMPDIR variable does not exist:", tmpdir, "Please create this directory or correct your PORTAGE_TMPDIR setting.", ) out = portage.output.EOutput() for l in msg: out.eerror(l) return FAILURE if self._background: root_config.settings.unlock() root_config.settings["PORTAGE_BACKGROUND"] = "1" root_config.settings.backup_changes("PORTAGE_BACKGROUND") root_config.settings.lock() self.pkgsettings[root] = portage.config(clone=root_config.settings) keep_going = "--keep-going" in self.myopts fetchonly = self._build_opts.fetchonly mtimedb = self._mtimedb failed_pkgs = self._failed_pkgs rval = self._generate_digests() if rval != os.EX_OK: return rval # TODO: Immediately recalculate deps here if --keep-going # is enabled and corrupt manifests are detected. rval = self._check_manifests() if rval != os.EX_OK and not keep_going: return rval while True: received_signal = [] def sighandler(signum, frame): signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) portage.util.writemsg(f"\n\nExiting on signal {signum}\n") self.terminate() received_signal.append(128 + signum) earlier_sigint_handler = signal.signal(signal.SIGINT, sighandler) earlier_sigterm_handler = signal.signal(signal.SIGTERM, sighandler) earlier_sigcont_handler = signal.signal( signal.SIGCONT, self._sigcont_handler ) signal.siginterrupt(signal.SIGCONT, False) try: rval = self._merge() finally: # Restore previous handlers if earlier_sigint_handler is not None: signal.signal(signal.SIGINT, earlier_sigint_handler) else: signal.signal(signal.SIGINT, signal.SIG_DFL) if earlier_sigterm_handler is not None: signal.signal(signal.SIGTERM, earlier_sigterm_handler) else: signal.signal(signal.SIGTERM, signal.SIG_DFL) if earlier_sigcont_handler is not None: signal.signal(signal.SIGCONT, earlier_sigcont_handler) else: signal.signal(signal.SIGCONT, signal.SIG_DFL) self._termination_check() if received_signal: sys.exit(received_signal[0]) if rval == os.EX_OK or fetchonly or not keep_going: break if "resume" not in mtimedb: break mergelist = self._mtimedb["resume"].get("mergelist") if not mergelist: break if not failed_pkgs: break for failed_pkg in failed_pkgs: mergelist.remove(list(failed_pkg.pkg)) self._failed_pkgs_all.extend(failed_pkgs) del failed_pkgs[:] if not mergelist: break if not self._calc_resume_list(): break clear_caches(self.trees) if not self._mergelist: break self._save_resume_list() self._pkg_count.curval = 0 self._pkg_count.maxval = len( [ x for x in self._mergelist if isinstance(x, Package) and x.operation == "merge" ] ) self._status_display.maxval = self._pkg_count.maxval # Cleanup any callbacks that have been registered with the global # event loop by calls to the terminate method. self._cleanup() self._logger.log(" *** Finished. Cleaning up...") if failed_pkgs: self._failed_pkgs_all.extend(failed_pkgs) del failed_pkgs[:] printer = portage.output.EOutput() background = self._background failure_log_shown = False if ( background and len(self._failed_pkgs_all) == 1 and self.myopts.get("--quiet-fail", "n") != "y" ): # If only one package failed then just show it's # whole log for easy viewing. failed_pkg = self._failed_pkgs_all[-1] log_file = None log_file_real = None log_path = self._locate_failure_log(failed_pkg) if log_path is not None: try: log_file = open( _unicode_encode( log_path, encoding=_encodings["fs"], errors="strict" ), mode="rb", ) except OSError: pass else: if log_path.endswith(".gz"): log_file_real = log_file log_file = gzip.GzipFile( filename="", mode="rb", fileobj=log_file ) if log_file is not None: try: for line in log_file: writemsg_level(line, noiselevel=-1) except zlib.error as e: writemsg_level(f"{e}\n", level=logging.ERROR, noiselevel=-1) finally: log_file.close() if log_file_real is not None: log_file_real.close() failure_log_shown = True # Dump mod_echo output now since it tends to flood the terminal. # This allows us to avoid having more important output, generated # later, from being swept away by the mod_echo output. mod_echo_output = _flush_elog_mod_echo() if ( background and not failure_log_shown and self._failed_pkgs_all and self._failed_pkgs_die_msgs and not mod_echo_output ): failed_pkg_map = {} for failed_pkg in self._failed_pkgs_all: pkg = failed_pkg.pkg failed_pkg_map[(pkg.cpv, pkg.root)] = pkg for mysettings, key, logentries in self._failed_pkgs_die_msgs: pkg = failed_pkg_map.get((key, mysettings["EROOT"])) color = "PKG_BINARY_MERGE" if pkg and pkg.built else "INFORM" root_msg = "" if mysettings["ROOT"] != "/": root_msg = f" merged to {mysettings['ROOT']}" print() printer.einfo( f"Error messages for package {colorize(color, key)}{root_msg}:" ) print() for phase in portage.const.EBUILD_PHASES: if phase not in logentries: continue for msgtype, msgcontent in logentries[phase]: if isinstance(msgcontent, str): msgcontent = [msgcontent] for line in msgcontent: printer.eerror(line.strip("\n")) if self._post_mod_echo_msgs: for msg in self._post_mod_echo_msgs: msg() if len(self._failed_pkgs_all) > 1 or (self._failed_pkgs_all and keep_going): if len(self._failed_pkgs_all) > 1: msg = ( f"The following {len(self._failed_pkgs_all)} packages have " + "failed to build, install, or execute postinst:" ) else: msg = ( "The following package has " + "failed to build, install, or execute postinst:" ) printer.eerror("") for line in textwrap.wrap(msg, 72): printer.eerror(line) printer.eerror("") for failed_pkg in self._failed_pkgs_all: msg = f" {failed_pkg.pkg}" if failed_pkg.postinst_failure: msg += " (postinst failed)" log_path = self._locate_failure_log(failed_pkg) if log_path is not None: msg += ", Log file:" printer.eerror(msg) if log_path is not None: printer.eerror(f" '{colorize('INFORM', log_path)}'") printer.eerror("") if self._failed_pkgs_all: return FAILURE return os.EX_OK def _elog_listener(self, mysettings, key, logentries, fulltext): errors = portage.elog.filter_loglevels(logentries, ["ERROR"]) if errors: self._failed_pkgs_die_msgs.append((mysettings, key, errors)) def _locate_failure_log(self, failed_pkg): log_paths = [failed_pkg.build_log] for log_path in log_paths: if not log_path: continue try: log_size = os.stat(log_path).st_size except OSError: continue if log_size == 0: continue return log_path return None def _add_packages(self): pkg_queue = self._pkg_queue for pkg in self._mergelist: if isinstance(pkg, Package): pkg_queue.append(pkg) elif isinstance(pkg, Blocker): pass def _system_merge_started(self, merge): """ Add any unsatisfied runtime deps to self._unsatisfied_system_deps. In general, this keeps track of installed system packages with unsatisfied RDEPEND or PDEPEND (circular dependencies). It can be a fragile situation, so we don't execute any unrelated builds until the circular dependencies are built and installed. """ graph = self._digraph if graph is None: return pkg = merge.merge.pkg # Skip this if $ROOT != / since it shouldn't matter if there # are unsatisfied system runtime deps in this case. if pkg.root_config.settings["ROOT"] != "/": return completed_tasks = self._completed_tasks unsatisfied = self._unsatisfied_system_deps def ignore_non_runtime_or_satisfied(priority): """ Ignore non-runtime and satisfied runtime priorities. """ if ( isinstance(priority, DepPriority) and not priority.satisfied and (priority.runtime or priority.runtime_post) ): return False return True # When checking for unsatisfied runtime deps, only check # direct deps since indirect deps are checked when the # corresponding parent is merged. for child in graph.child_nodes( pkg, ignore_priority=ignore_non_runtime_or_satisfied ): if not isinstance(child, Package) or child.operation == "uninstall": continue if child is pkg: continue if child.operation == "merge" and child not in completed_tasks: unsatisfied.add(child) def _merge_wait_exit_handler(self, task): self._merge_wait_scheduled.remove(task) self._merge_exit(task) def _merge_exit(self, merge): self._running_tasks.pop(id(merge), None) self._do_merge_exit(merge) self._deallocate_config(merge.merge.settings) if merge.returncode == os.EX_OK and not merge.merge.pkg.installed: self._status_display.curval += 1 self._status_display.merges = len(self._task_queues.merge) self._schedule() def _do_merge_exit(self, merge): pkg = merge.merge.pkg if merge.returncode != os.EX_OK: settings = merge.merge.settings build_dir = settings.get("PORTAGE_BUILDDIR") build_log = settings.get("PORTAGE_LOG_FILE") self._failed_pkgs.append( self._failed_pkg( build_dir=build_dir, build_log=build_log, pkg=pkg, returncode=merge.returncode, ) ) if not self._terminated_tasks: self._failed_pkg_msg(self._failed_pkgs[-1], "install", "to") self._status_display.failed = len(self._failed_pkgs) return if merge.postinst_failure: # Append directly to _failed_pkgs_all for non-critical errors. self._failed_pkgs_all.append( self._failed_pkg( build_dir=merge.merge.settings.get("PORTAGE_BUILDDIR"), build_log=merge.merge.settings.get("PORTAGE_LOG_FILE"), pkg=pkg, postinst_failure=True, returncode=merge.returncode, ) ) self._failed_pkg_msg( self._failed_pkgs_all[-1], "execute postinst for", "for" ) self._task_complete(pkg) pkg_to_replace = merge.merge.pkg_to_replace if pkg_to_replace is not None: # When a package is replaced, mark it's uninstall # task complete (if any). if self._digraph is not None and pkg_to_replace in self._digraph: try: self._pkg_queue.remove(pkg_to_replace) except ValueError: pass self._task_complete(pkg_to_replace) else: self._pkg_cache.pop(pkg_to_replace, None) if pkg.installed: return # Call mtimedb.commit() after each merge so that # --resume still works after being interrupted # by reboot, sigkill or similar. mtimedb = self._mtimedb mtimedb["resume"]["mergelist"].remove(list(pkg)) if not mtimedb["resume"]["mergelist"]: del mtimedb["resume"] mtimedb.commit() def _build_exit(self, build): self._running_tasks.pop(id(build), None) if build.returncode == os.EX_OK and self._terminated_tasks: # We've been interrupted, so we won't # add this to the merge queue. self.curval += 1 self._deallocate_config(build.settings) elif build.returncode == os.EX_OK: self.curval += 1 merge = PackageMerge( is_system_pkg=(build.pkg in self._deep_system_deps), merge=build, scheduler=self._sched_iface, ) self._running_tasks[id(merge)] = merge # By default, merge-wait only allows merge when no builds are executing. # As a special exception, dependencies on system packages are frequently # unspecified and will therefore force merge-wait. if not build.build_opts.buildpkgonly and ( "merge-wait" in build.settings.features or merge.is_system_pkg ): self._merge_wait_queue.append(merge) if merge.is_system_pkg: merge.addStartListener(self._system_merge_started) else: self._task_queues.merge.add(merge) merge.addExitListener(self._merge_exit) self._status_display.merges = len(self._task_queues.merge) else: settings = build.settings build_dir = settings.get("PORTAGE_BUILDDIR") build_log = settings.get("PORTAGE_LOG_FILE") self._failed_pkgs.append( self._failed_pkg( build_dir=build_dir, build_log=build_log, pkg=build.pkg, returncode=build.returncode, ) ) if not self._terminated_tasks: self._failed_pkg_msg(self._failed_pkgs[-1], "emerge", "for") self._status_display.failed = len(self._failed_pkgs) self._deallocate_config(build.settings) self._jobs -= 1 self._status_display.running = self._jobs self._status_display.merge_wait = len(self._merge_wait_queue) self._schedule() def _extract_exit(self, build): self._build_exit(build) def _task_complete(self, pkg): self._completed_tasks.add(pkg) self._unsatisfied_system_deps.discard(pkg) self._choose_pkg_return_early = False blocker_db = self._blocker_db[pkg.root] blocker_db.discardBlocker(pkg) def _main_loop(self): self._main_exit = self._event_loop.create_future() if ( self._max_load is not None and self._loadavg_latency is not None and (self._max_jobs is True or self._max_jobs > 1) ): # We have to schedule periodically, in case the load # average has changed since the last call. self._main_loadavg_handle = self._event_loop.call_later( self._loadavg_latency, self._schedule ) self._schedule() self._event_loop.run_until_complete(self._main_exit) def _merge(self): if self._opts_no_background.intersection(self.myopts): self._set_max_jobs(1) failed_pkgs = self._failed_pkgs portage.locks._quiet = self._background portage.elog.add_listener(self._elog_listener) def display_callback(): self._status_display.display() display_callback.handle = self._event_loop.call_later( self._max_display_latency, display_callback ) display_callback.handle = None if self._status_display._isatty and not self._status_display.quiet: display_callback() rval = os.EX_OK try: self._add_prefetchers() if not self._build_opts.fetchonly: # Run pkg_pretend concurrently with parallel-fetch, and be careful # to respond appropriately to termination, so that we don't start # any new tasks after we've been terminated. Temporarily make the # status display quiet so that its output is not interleaved with # pkg_pretend output. status_quiet = self._status_display.quiet self._status_display.quiet = True try: rval = self._sched_iface.run_until_complete( self._run_pkg_pretend(loop=self._sched_iface) ) except asyncio.CancelledError: self.terminate() finally: self._status_display.quiet = status_quiet self._termination_check() if self._terminated_tasks: rval = 128 + signal.SIGINT if rval != os.EX_OK: return rval self._add_packages() self._main_loop() finally: self._main_loop_cleanup() portage.locks._quiet = False portage.elog.remove_listener(self._elog_listener) if display_callback.handle is not None: display_callback.handle.cancel() if failed_pkgs: rval = failed_pkgs[-1].returncode return rval def _main_loop_cleanup(self): del self._pkg_queue[:] self._completed_tasks.clear() self._deep_system_deps.clear() self._unsatisfied_system_deps.clear() self._choose_pkg_return_early = False self._status_display.reset() self._digraph = None self._task_queues.fetch.clear() self._prefetchers.clear() self._main_exit = None if self._main_loadavg_handle is not None: self._main_loadavg_handle.cancel() self._main_loadavg_handle = None if self._job_delay_timeout_id is not None: self._job_delay_timeout_id.cancel() self._job_delay_timeout_id = None if self._schedule_merge_wakeup_task is not None: self._schedule_merge_wakeup_task.cancel() self._schedule_merge_wakeup_task = None def _choose_pkg(self): """ Choose a task that has all its dependencies satisfied. This is used for parallel build scheduling, and ensures that we don't build anything with deep dependencies that have yet to be merged. """ if self._choose_pkg_return_early: return None if self._digraph is None: if self._is_work_scheduled() and not ( "--nodeps" in self.myopts and (self._max_jobs is True or self._max_jobs > 1) ): self._choose_pkg_return_early = True return None return self._pkg_queue.pop(0) if not self._is_work_scheduled(): return self._pkg_queue.pop(0) self._prune_digraph() chosen_pkg = None # Prefer uninstall operations when available. graph = self._digraph for pkg in self._pkg_queue: if pkg.operation == "uninstall" and not graph.child_nodes(pkg): chosen_pkg = pkg break if chosen_pkg is None: later = set(self._pkg_queue) for pkg in self._pkg_queue: later.remove(pkg) if not self._dependent_on_scheduled_merges(pkg, later): chosen_pkg = pkg break if chosen_pkg is not None: self._pkg_queue.remove(chosen_pkg) if chosen_pkg is None: # There's no point in searching for a package to # choose until at least one of the existing jobs # completes. self._choose_pkg_return_early = True return chosen_pkg def _dependent_on_scheduled_merges(self, pkg, later): """ Traverse the subgraph of the given packages deep dependencies to see if it contains any scheduled merges. @param pkg: a package to check dependencies for @type pkg: Package @param later: packages for which dependence should be ignored since they will be merged later than pkg anyway and therefore delaying the merge of pkg will not result in a more optimal merge order @type later: set @rtype: bool @return: True if the package is dependent, False otherwise. """ graph = self._digraph completed_tasks = self._completed_tasks dependent = False traversed_nodes = {pkg} direct_deps = graph.child_nodes(pkg) node_stack = direct_deps direct_deps = frozenset(direct_deps) while node_stack: node = node_stack.pop() if node in traversed_nodes: continue traversed_nodes.add(node) if not ( (node.installed and node.operation == "nomerge") or (node.operation == "uninstall" and node not in direct_deps) or node in completed_tasks or node in later ): dependent = True break # Don't traverse children of uninstall nodes since # those aren't dependencies in the usual sense. if node.operation != "uninstall": node_stack.extend(graph.child_nodes(node)) return dependent def _allocate_config(self, root): """ Allocate a unique config instance for a task in order to prevent interference between parallel tasks. """ if self._config_pool[root]: temp_settings = self._config_pool[root].pop() else: temp_settings = portage.config(clone=self.pkgsettings[root]) # Since config.setcpv() isn't guaranteed to call config.reset() due to # performance reasons, call it here to make sure all settings from the # previous package get flushed out (such as PORTAGE_LOG_FILE). temp_settings.reload() temp_settings.reset() return temp_settings def _deallocate_config(self, settings): self._config_pool[settings["EROOT"]].append(settings) def _keep_scheduling(self): return bool( not self._terminated.is_set() and self._pkg_queue and not (self._failed_pkgs and not self._build_opts.fetchonly) ) def _is_work_scheduled(self): return bool(self._running_tasks) def _running_job_count(self): return self._jobs def _schedule_tasks(self): while True: state_change = 0 # When the number of jobs and merges drops to zero, # process a single merge from _merge_wait_queue if # it's not empty. We only process one since these are # special packages and we want to ensure that # parallel-install does not cause more than one of # them to install at the same time. if ( self._merge_wait_queue and not self._jobs and not self._task_queues.merge ): while self._merge_wait_queue: # If we added non-system packages to the merge queue in a # previous iteration of this loop, then for system packages we # need to come back later when the merge queue is empty. # TODO: Maybe promote non-system packages to the front of the # queue and process them within the current loop, though that # causes merge order to differ from the order builds finish. if ( self._task_queues.merge and self._merge_wait_queue[0].is_system_pkg ): break task = self._merge_wait_queue.popleft() task.scheduler = self._sched_iface self._merge_wait_scheduled.append(task) self._task_queues.merge.add(task) task.addExitListener(self._merge_wait_exit_handler) self._status_display.merges = len(self._task_queues.merge) state_change += 1 # For system packages, always serialize install regardless of # parallel-install, in order to mitigate failures triggered # by fragile states as in bug 256616. For other packages, # continue to populate self._task_queues.merge, which will # serialize install unless parallel-install is enabled. if task.is_system_pkg: break self._status_display.merge_wait = len(self._merge_wait_queue) if self._schedule_tasks_imp(): state_change += 1 self._status_display.display() # Cancel prefetchers if they're the only reason # the main poll loop is still running. if ( self._failed_pkgs and not self._build_opts.fetchonly and not self._is_work_scheduled() and self._task_queues.fetch ): # Since this happens asynchronously, it doesn't count in # state_change (counting it triggers an infinite loop). self._task_queues.fetch.clear() if not ( state_change or ( self._merge_wait_queue and not self._jobs and not self._task_queues.merge ) ): break if not ( self._is_work_scheduled() or self._keep_scheduling() or self._main_exit.done() ): self._main_exit.set_result(None) elif self._main_loadavg_handle is not None: self._main_loadavg_handle.cancel() self._main_loadavg_handle = self._event_loop.call_later( self._loadavg_latency, self._schedule ) # Failure to schedule *after* self._task_queues.merge becomes # empty will cause the scheduler to hang as in bug 711322. # Do not rely on scheduling which occurs via the _merge_exit # method, since the order of callback invocation may cause # self._task_queues.merge to appear non-empty when it is # about to become empty. if self._task_queues.merge and ( self._schedule_merge_wakeup_task is None or self._schedule_merge_wakeup_task.done() ): self._schedule_merge_wakeup_task = asyncio.ensure_future( self._task_queues.merge.wait(loop=self._event_loop), loop=self._event_loop, ) self._schedule_merge_wakeup_task.add_done_callback( self._schedule_merge_wakeup ) def _schedule_merge_wakeup(self, future): if not future.cancelled(): future.result() if self._main_exit is not None and not self._main_exit.done(): self._schedule() def _sigcont_handler(self, signum, frame): self._sigcont_time = time.time() def _job_delay(self): """ @rtype: bool @return: True if job scheduling should be delayed, False otherwise. """ if self._jobs and self._max_load is not None: current_time = time.time() if self._sigcont_time is not None: elapsed_seconds = current_time - self._sigcont_time # elapsed_seconds < 0 means the system clock has been adjusted if elapsed_seconds > 0 and elapsed_seconds < self._sigcont_delay: if self._job_delay_timeout_id is not None: self._job_delay_timeout_id.cancel() self._job_delay_timeout_id = self._event_loop.call_later( self._sigcont_delay - elapsed_seconds, self._schedule ) return True # Only set this to None after the delay has expired, # since this method may be called again before the # delay has expired. self._sigcont_time = None try: avg1, avg5, avg15 = getloadavg() except OSError: return False delay = self._job_delay_max * avg1 / self._max_load if delay > self._job_delay_max: delay = self._job_delay_max elapsed_seconds = current_time - self._previous_job_start_time # elapsed_seconds < 0 means the system clock has been adjusted if elapsed_seconds > 0 and elapsed_seconds < delay: if self._job_delay_timeout_id is not None: self._job_delay_timeout_id.cancel() self._job_delay_timeout_id = self._event_loop.call_later( delay - elapsed_seconds, self._schedule ) return True return False def _schedule_tasks_imp(self): """ @rtype: bool @return: True if state changed, False otherwise. """ state_change = False while True: if not self._keep_scheduling(): return state_change if ( self._choose_pkg_return_early or self._merge_wait_scheduled or (self._jobs and self._unsatisfied_system_deps) or not self._can_add_job() or self._job_delay() ): return state_change pkg = self._choose_pkg() if pkg is None: return state_change state_change = True if not pkg.installed: self._pkg_count.curval += 1 task = self._task(pkg) if pkg.installed: merge = PackageMerge(merge=task, scheduler=self._sched_iface) self._running_tasks[id(merge)] = merge self._task_queues.merge.addFront(merge) merge.addExitListener(self._merge_exit) else: self._jobs += 1 self._previous_job_start_time = time.time() self._status_display.running = self._jobs self._running_tasks[id(task)] = task task.scheduler = self._sched_iface self._task_queues.jobs.add(task) if pkg.built: task.addExitListener(self._extract_exit) else: task.addExitListener(self._build_exit) def _get_prefetcher(self, pkg): try: prefetcher = self._prefetchers.pop(pkg, None) except KeyError: # KeyError observed with PyPy 1.8, despite None given as default. # Note that PyPy 1.8 has the same WeakValueDictionary code as # CPython 2.7, so it may be possible for CPython to raise KeyError # here as well. prefetcher = None if prefetcher is not None and ( prefetcher.cancelled or not prefetcher.isAlive() ): try: self._task_queues.fetch._task_queue.remove(prefetcher) except ValueError: pass prefetcher = None return prefetcher def _task(self, pkg): pkg_to_replace = None if pkg.operation != "uninstall": vardb = pkg.root_config.trees["vartree"].dbapi previous_cpv = [ x for x in vardb.match(pkg.slot_atom) if portage.cpv_getkey(x) == pkg.cp ] if not previous_cpv and vardb.cpv_exists(pkg.cpv): # same cpv, different SLOT previous_cpv = [pkg.cpv] if previous_cpv: previous_cpv = previous_cpv.pop() pkg_to_replace = self._pkg( previous_cpv, "installed", pkg.root_config, installed=True, operation="uninstall", ) prefetcher = self._get_prefetcher(pkg) task = MergeListItem( args_set=self._args_set, background=self._background, binpkg_opts=self._binpkg_opts, build_opts=self._build_opts, config_pool=self._ConfigPool( pkg.root, self._allocate_config, self._deallocate_config ), emerge_opts=self.myopts, find_blockers=self._find_blockers(pkg), logger=self._logger, mtimedb=self._mtimedb, pkg=pkg, pkg_count=self._pkg_count.copy(), pkg_to_replace=pkg_to_replace, prefetcher=prefetcher, scheduler=self._sched_iface, settings=self._allocate_config(pkg.root), statusMessage=self._status_msg, world_atom=self._world_atom, ) return task def _failed_pkg_msg(self, failed_pkg, action, preposition): pkg = failed_pkg.pkg color = "PKG_BINARY_MERGE" if failed_pkg.pkg.built else "INFORM" msg = f"{bad('Failed')} to {action} {colorize(color, pkg.cpv)}" if pkg.root_config.settings["ROOT"] != "/": msg += f" {preposition} {pkg.root}" log_path = self._locate_failure_log(failed_pkg) if log_path is not None: msg += ", Log file:" self._status_msg(msg) if log_path is not None: self._status_msg(f" '{colorize('INFORM', log_path)}'") def _status_msg(self, msg): """ Display a brief status message (no newlines) in the status display. This is called by tasks to provide feedback to the user. This delegates the resposibility of generating \r and \n control characters, to guarantee that lines are created or erased when necessary and appropriate. @type msg: str @param msg: a brief status message (no newlines allowed) """ if not self._background: writemsg_level("\n") self._status_display.displayMessage(msg) def _save_resume_list(self): """ Do this before verifying the ebuild Manifests since it might be possible for the user to use --resume --skipfirst get past a non-essential package with a broken digest. """ mtimedb = self._mtimedb mtimedb["resume"] = {} # Stored as a dict starting with portage-2.1.6_rc1, and supported # by >=portage-2.1.3_rc8. Versions