366 lines
13 KiB
Python
366 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2015 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""Handle path inference and translation."""
|
|
|
|
from __future__ import print_function
|
|
|
|
import collections
|
|
import os
|
|
import tempfile
|
|
|
|
from autotest_lib.utils.frozen_chromite.lib import constants
|
|
from autotest_lib.utils.frozen_chromite.lib import cros_build_lib
|
|
from autotest_lib.utils.frozen_chromite.lib import git
|
|
from autotest_lib.utils.frozen_chromite.lib import osutils
|
|
from autotest_lib.utils.frozen_chromite.utils import memoize
|
|
|
|
|
|
GENERAL_CACHE_DIR = '.cache'
|
|
CHROME_CACHE_DIR = 'cros_cache'
|
|
|
|
CHECKOUT_TYPE_UNKNOWN = 'unknown'
|
|
CHECKOUT_TYPE_GCLIENT = 'gclient'
|
|
CHECKOUT_TYPE_REPO = 'repo'
|
|
|
|
CheckoutInfo = collections.namedtuple(
|
|
'CheckoutInfo', ['type', 'root', 'chrome_src_dir'])
|
|
|
|
|
|
class ChrootPathResolver(object):
|
|
"""Perform path resolution to/from the chroot.
|
|
|
|
Attributes:
|
|
source_path: Value to override default source root inference.
|
|
source_from_path_repo: Whether to infer the source root from the converted
|
|
path's repo parent during inbound translation; overrides |source_path|.
|
|
"""
|
|
|
|
# TODO(garnold) We currently infer the source root based on the path's own
|
|
# encapsulating repository. This is a heuristic catering to paths are being
|
|
# translated to be used in a chroot that's not associated with the currently
|
|
# executing code (for example, cbuildbot run on a build root or a foreign
|
|
# tree checkout). This approach might result in arbitrary repo-contained
|
|
# paths being translated to invalid chroot paths where they actually should
|
|
# not, and other valid source paths failing to translate because they are not
|
|
# repo-contained. Eventually we'll want to make this behavior explicit, by
|
|
# either passing a source_root value, or requesting to infer it from the path
|
|
# (source_from_path_repo=True), but otherwise defaulting to the executing
|
|
# code's source root in the normal case. When that happens, we'll be
|
|
# switching source_from_path_repo to False by default. See chromium:485746.
|
|
|
|
def __init__(self, source_path=None, source_from_path_repo=True):
|
|
self._inside_chroot = cros_build_lib.IsInsideChroot()
|
|
self._source_path = (constants.SOURCE_ROOT if source_path is None
|
|
else source_path)
|
|
self._source_from_path_repo = source_from_path_repo
|
|
|
|
# The following are only needed if outside the chroot.
|
|
if self._inside_chroot:
|
|
self._chroot_path = None
|
|
self._chroot_link = None
|
|
self._chroot_to_host_roots = None
|
|
else:
|
|
self._chroot_path = self._GetSourcePathChroot(self._source_path)
|
|
# The chroot link allows us to resolve paths when the chroot is symlinked
|
|
# to the default location. This is generally not used, but it is useful
|
|
# for CI for optimization purposes. We will trust them not to do something
|
|
# dumb, like symlink to /, but this doesn't enable that kind of behavior
|
|
# anyway, just allows resolving paths correctly from outside the chroot.
|
|
self._chroot_link = self._ReadChrootLink(self._chroot_path)
|
|
|
|
# Initialize mapping of known root bind mounts.
|
|
self._chroot_to_host_roots = (
|
|
(constants.CHROOT_SOURCE_ROOT, self._source_path),
|
|
(constants.CHROOT_CACHE_ROOT, self._GetCachePath),
|
|
)
|
|
|
|
@classmethod
|
|
@memoize.MemoizedSingleCall
|
|
def _GetCachePath(cls):
|
|
"""Returns the cache directory."""
|
|
return os.path.realpath(GetCacheDir())
|
|
|
|
def _GetSourcePathChroot(self, source_path):
|
|
"""Returns path to the chroot directory of a given source root."""
|
|
if source_path is None:
|
|
return None
|
|
return os.path.join(source_path, constants.DEFAULT_CHROOT_DIR)
|
|
|
|
def _ReadChrootLink(self, path):
|
|
"""Convert a chroot symlink to its absolute path.
|
|
|
|
This contains defaults/edge cases assumptions for chroot paths. Not
|
|
recommended for non-chroot paths.
|
|
|
|
Args:
|
|
path (str|None): The path to resolve.
|
|
|
|
Returns:
|
|
str|None: The resolved path if the provided path is a symlink, None
|
|
otherwise.
|
|
"""
|
|
# Mainly for the "if self._source_from_path_repo:" branch in _GetChrootPath.
|
|
# _GetSourcePathChroot can return None, so double check it here.
|
|
if not path:
|
|
return None
|
|
|
|
abs_path = os.path.abspath(path)
|
|
link = osutils.ResolveSymlink(abs_path)
|
|
|
|
# ResolveSymlink returns the passed path when the path isn't a symlink. We
|
|
# can skip some redundant work when its falling back on the link when the
|
|
# chroot is not a symlink.
|
|
if link == abs_path:
|
|
return None
|
|
|
|
return link
|
|
|
|
def _TranslatePath(self, path, src_root, dst_root_input):
|
|
"""If |path| starts with |src_root|, replace it using |dst_root_input|.
|
|
|
|
Args:
|
|
path: An absolute path we want to convert to a destination equivalent.
|
|
src_root: The root that path needs to be contained in.
|
|
dst_root_input: The root we want to relocate the relative path into, or a
|
|
function returning this value.
|
|
|
|
Returns:
|
|
A translated path, or None if |src_root| is not a prefix of |path|.
|
|
|
|
Raises:
|
|
ValueError: If |src_root| is a prefix but |dst_root_input| yields None,
|
|
which means we don't have sufficient information to do the translation.
|
|
"""
|
|
if not path.startswith(os.path.join(src_root, '')) and path != src_root:
|
|
return None
|
|
dst_root = dst_root_input() if callable(dst_root_input) else dst_root_input
|
|
if dst_root is None:
|
|
raise ValueError('No target root to translate path to')
|
|
return os.path.join(dst_root, path[len(src_root):].lstrip(os.path.sep))
|
|
|
|
def _GetChrootPath(self, path):
|
|
"""Translates a fully-expanded host |path| into a chroot equivalent.
|
|
|
|
This checks path prefixes in order from the most to least "contained": the
|
|
chroot itself, then the cache directory, and finally the source tree. The
|
|
idea is to return the shortest possible chroot equivalent.
|
|
|
|
Args:
|
|
path: A host path to translate.
|
|
|
|
Returns:
|
|
An equivalent chroot path.
|
|
|
|
Raises:
|
|
ValueError: If |path| is not reachable from the chroot.
|
|
"""
|
|
new_path = None
|
|
|
|
# Preliminary: compute the actual source and chroot paths to use. These are
|
|
# generally the precomputed values, unless we're inferring the source root
|
|
# from the path itself.
|
|
source_path = self._source_path
|
|
chroot_path = self._chroot_path
|
|
chroot_link = self._chroot_link
|
|
|
|
if self._source_from_path_repo:
|
|
path_repo_dir = git.FindRepoDir(path)
|
|
if path_repo_dir is not None:
|
|
source_path = os.path.abspath(os.path.join(path_repo_dir, '..'))
|
|
chroot_path = self._GetSourcePathChroot(source_path)
|
|
chroot_link = self._ReadChrootLink(chroot_path)
|
|
|
|
# First, check if the path happens to be in the chroot already.
|
|
if chroot_path is not None:
|
|
new_path = self._TranslatePath(path, chroot_path, '/')
|
|
# Or in the symlinked dir.
|
|
if new_path is None and chroot_link is not None:
|
|
new_path = self._TranslatePath(path, chroot_link, '/')
|
|
|
|
# Second, check the cache directory.
|
|
if new_path is None:
|
|
new_path = self._TranslatePath(path, self._GetCachePath(),
|
|
constants.CHROOT_CACHE_ROOT)
|
|
|
|
# Finally, check the current SDK checkout tree.
|
|
if new_path is None and source_path is not None:
|
|
new_path = self._TranslatePath(path, source_path,
|
|
constants.CHROOT_SOURCE_ROOT)
|
|
|
|
if new_path is None:
|
|
raise ValueError('Path is not reachable from the chroot')
|
|
|
|
return new_path
|
|
|
|
def _GetHostPath(self, path):
|
|
"""Translates a fully-expanded chroot |path| into a host equivalent.
|
|
|
|
We first attempt translation of known roots (source). If any is successful,
|
|
we check whether the result happens to point back to the chroot, in which
|
|
case we trim the chroot path prefix and recurse. If neither was successful,
|
|
just prepend the chroot path.
|
|
|
|
Args:
|
|
path: A chroot path to translate.
|
|
|
|
Returns:
|
|
An equivalent host path.
|
|
|
|
Raises:
|
|
ValueError: If |path| could not be mapped to a proper host destination.
|
|
"""
|
|
new_path = None
|
|
|
|
# Attempt resolution of known roots.
|
|
for src_root, dst_root in self._chroot_to_host_roots:
|
|
new_path = self._TranslatePath(path, src_root, dst_root)
|
|
if new_path is not None:
|
|
break
|
|
|
|
if new_path is None:
|
|
# If no known root was identified, just prepend the chroot path.
|
|
new_path = self._TranslatePath(path, '', self._chroot_path)
|
|
else:
|
|
# Check whether the resolved path happens to point back at the chroot, in
|
|
# which case trim the chroot path or link prefix and continue recursively.
|
|
path = self._TranslatePath(new_path, self._chroot_path, '/')
|
|
if path is None and self._chroot_link:
|
|
path = self._TranslatePath(new_path, self._chroot_link, '/')
|
|
|
|
if path is not None:
|
|
new_path = self._GetHostPath(path)
|
|
|
|
return new_path
|
|
|
|
def _ConvertPath(self, path, get_converted_path):
|
|
"""Expands |path|; if outside the chroot, applies |get_converted_path|.
|
|
|
|
Args:
|
|
path: A path to be converted.
|
|
get_converted_path: A conversion function.
|
|
|
|
Returns:
|
|
An expanded and (if needed) converted path.
|
|
|
|
Raises:
|
|
ValueError: If path conversion failed.
|
|
"""
|
|
# NOTE: We do not want to expand wrapper script symlinks because this
|
|
# prevents them from working. Therefore, if the path points to a file we
|
|
# only resolve its dirname but leave the basename intact. This means our
|
|
# path resolution might return unusable results for file symlinks that
|
|
# point outside the reachable space. These are edge cases in which the user
|
|
# is expected to resolve the realpath themselves in advance.
|
|
expanded_path = os.path.expanduser(path)
|
|
if os.path.isfile(expanded_path):
|
|
expanded_path = os.path.join(
|
|
os.path.realpath(os.path.dirname(expanded_path)),
|
|
os.path.basename(expanded_path))
|
|
else:
|
|
expanded_path = os.path.realpath(expanded_path)
|
|
|
|
if self._inside_chroot:
|
|
return expanded_path
|
|
|
|
try:
|
|
return get_converted_path(expanded_path)
|
|
except ValueError as e:
|
|
raise ValueError('%s: %s' % (e, path))
|
|
|
|
def ToChroot(self, path):
|
|
"""Resolves current environment |path| for use in the chroot."""
|
|
return self._ConvertPath(path, self._GetChrootPath)
|
|
|
|
def FromChroot(self, path):
|
|
"""Resolves chroot |path| for use in the current environment."""
|
|
return self._ConvertPath(path, self._GetHostPath)
|
|
|
|
|
|
def DetermineCheckout(cwd=None):
|
|
"""Gather information on the checkout we are in.
|
|
|
|
There are several checkout types, as defined by CHECKOUT_TYPE_XXX variables.
|
|
This function determines what checkout type |cwd| is in, for example, if |cwd|
|
|
belongs to a `repo` checkout.
|
|
|
|
Returns:
|
|
A CheckoutInfo object with these attributes:
|
|
type: The type of checkout. Valid values are CHECKOUT_TYPE_*.
|
|
root: The root of the checkout.
|
|
chrome_src_dir: If the checkout is a Chrome checkout, the path to the
|
|
Chrome src/ directory.
|
|
"""
|
|
checkout_type = CHECKOUT_TYPE_UNKNOWN
|
|
root, path = None, None
|
|
|
|
cwd = cwd or os.getcwd()
|
|
for path in osutils.IteratePathParents(cwd):
|
|
gclient_file = os.path.join(path, '.gclient')
|
|
if os.path.exists(gclient_file):
|
|
checkout_type = CHECKOUT_TYPE_GCLIENT
|
|
break
|
|
repo_dir = os.path.join(path, '.repo')
|
|
if os.path.isdir(repo_dir):
|
|
checkout_type = CHECKOUT_TYPE_REPO
|
|
break
|
|
|
|
if checkout_type != CHECKOUT_TYPE_UNKNOWN:
|
|
root = path
|
|
|
|
# Determine the chrome src directory.
|
|
chrome_src_dir = None
|
|
if checkout_type == CHECKOUT_TYPE_GCLIENT:
|
|
chrome_src_dir = os.path.join(root, 'src')
|
|
|
|
return CheckoutInfo(checkout_type, root, chrome_src_dir)
|
|
|
|
|
|
def FindCacheDir():
|
|
"""Returns the cache directory location based on the checkout type."""
|
|
checkout = DetermineCheckout()
|
|
if checkout.type == CHECKOUT_TYPE_REPO:
|
|
return os.path.join(checkout.root, GENERAL_CACHE_DIR)
|
|
elif checkout.type == CHECKOUT_TYPE_GCLIENT:
|
|
return os.path.join(checkout.chrome_src_dir, 'build', CHROME_CACHE_DIR)
|
|
elif checkout.type == CHECKOUT_TYPE_UNKNOWN:
|
|
return os.path.join(tempfile.gettempdir(), 'chromeos-cache')
|
|
else:
|
|
raise AssertionError('Unexpected type %s' % checkout.type)
|
|
|
|
|
|
def GetCacheDir():
|
|
"""Returns the current cache dir."""
|
|
return os.environ.get(constants.SHARED_CACHE_ENVVAR, FindCacheDir())
|
|
|
|
|
|
def ToChrootPath(path, source_path=None):
|
|
"""Resolves current environment |path| for use in the chroot.
|
|
|
|
Args:
|
|
path: string path to translate into chroot namespace.
|
|
source_path: string path to root of source checkout with chroot in it.
|
|
|
|
Returns:
|
|
The same path converted to "inside chroot" namespace.
|
|
|
|
Raises:
|
|
ValueError: If the path references a location not available in the chroot.
|
|
"""
|
|
return ChrootPathResolver(source_path=source_path).ToChroot(path)
|
|
|
|
|
|
def FromChrootPath(path, source_path=None):
|
|
"""Resolves chroot |path| for use in the current environment.
|
|
|
|
Args:
|
|
path: string path to translate out of chroot namespace.
|
|
source_path: string path to root of source checkout with chroot in it.
|
|
|
|
Returns:
|
|
The same path converted to "outside chroot" namespace.
|
|
"""
|
|
return ChrootPathResolver(source_path=source_path).FromChroot(path)
|