241 lines
8.9 KiB
Python
241 lines
8.9 KiB
Python
#!/usr/bin/env python3
|
|
#
|
|
# Copyright (C) 2020 The Android Open Source Project
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Unit tests for apexer_with_DCLA_preprocessing."""
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import stat
|
|
import subprocess
|
|
import tempfile
|
|
from typing import List
|
|
import unittest
|
|
import zipfile
|
|
|
|
TEST_PRIVATE_KEY = os.path.join('testdata', 'com.android.example.apex.pem')
|
|
TEST_APEX = 'com.android.example.apex'
|
|
|
|
# In order to debug test failures, set DEBUG_TEST to True and run the test from
|
|
# local workstation bypassing atest, e.g.:
|
|
# $ m apexer_with_DCLA_preprocessing_test && \
|
|
# out/host/linux-x86/nativetest64/apexer_with_DCLA_preprocessing_test/\
|
|
# apexer_with_DCLA_preprocessing_test
|
|
#
|
|
# the test will print out the command used, and the temporary files used by the
|
|
# test.
|
|
DEBUG_TEST = False
|
|
|
|
def get_current_dir():
|
|
"""returns the current dir, relative to the script dir."""
|
|
# The script dir is the one we want, which could be different from pwd.
|
|
current_dir = os.path.dirname(os.path.realpath(__file__))
|
|
return current_dir
|
|
|
|
# TODO: consolidate these common test utilities into a common python_library_host
|
|
# to be shared across tests under system/apex
|
|
def run_command(cmd: List[str]) -> None:
|
|
"""Run a command."""
|
|
try:
|
|
if DEBUG_TEST:
|
|
cmd_str = ' '.join(cmd)
|
|
print(f'\nRunning: \n{cmd_str}\n')
|
|
res = subprocess.run(
|
|
cmd,
|
|
check=True,
|
|
stdout=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
stderr=subprocess.PIPE)
|
|
except subprocess.CalledProcessError as err:
|
|
print(err.stderr)
|
|
print(err.output)
|
|
raise err
|
|
|
|
def get_apexer_with_DCLA_preprocessing() -> str:
|
|
tool_binary = os.path.join(get_current_dir(), 'apexer_with_DCLA_preprocessing')
|
|
if not os.path.isfile(tool_binary):
|
|
raise FileNotFoundError(f'cannot find tooling apexer with DCLA preprocessing')
|
|
else:
|
|
os.chmod(tool_binary, stat.S_IRUSR | stat.S_IXUSR);
|
|
return tool_binary
|
|
|
|
def get_host_tool(tool_name: str) -> str:
|
|
"""get host tools."""
|
|
tool_binary = os.path.join(get_current_dir(), 'bin', tool_name)
|
|
if not os.path.isfile(tool_binary):
|
|
host_build_top = os.environ.get('ANDROID_BUILD_TOP')
|
|
if host_build_top:
|
|
host_command_dir = os.path.join(host_build_top, 'out/host/linux-x86/bin')
|
|
tool_binary = os.path.join(host_command_dir, tool_name)
|
|
if not os.path.isfile(tool_binary):
|
|
host_command_dir = os.path.join(host_build_top, 'prebuilts/sdk/current/public')
|
|
tool_binary = os.path.join(host_command_dir, tool_name)
|
|
else:
|
|
tool_binary = shutil.which(tool_name)
|
|
|
|
if not tool_binary or not os.path.isfile(tool_binary):
|
|
raise FileNotFoundError(f'cannot find tooling {tool_name}')
|
|
else:
|
|
return tool_binary
|
|
|
|
def get_digest(file_path: str) -> str:
|
|
"""Get sha512 digest of a file """
|
|
digester = hashlib.sha512()
|
|
with open(file_path, 'rb') as f:
|
|
bytes_to_digest = f.read()
|
|
digester.update(bytes_to_digest)
|
|
return digester.hexdigest()
|
|
|
|
class ApexerWithDCLAPreprocessingTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self._to_cleanup = []
|
|
tools_zip_file = os.path.join(get_current_dir(), 'apexer_test_host_tools.zip')
|
|
self.unzip_host_tools(tools_zip_file)
|
|
|
|
def tearDown(self):
|
|
if not DEBUG_TEST:
|
|
for i in self._to_cleanup:
|
|
if os.path.isdir(i):
|
|
shutil.rmtree(i, ignore_errors=True)
|
|
else:
|
|
os.remove(i)
|
|
del self._to_cleanup[:]
|
|
else:
|
|
print('Cleanup: ' + str(self._to_cleanup))
|
|
|
|
def create_temp_dir(self) -> str:
|
|
tmp_dir = tempfile.mkdtemp()
|
|
self._to_cleanup.append(tmp_dir)
|
|
return tmp_dir
|
|
|
|
def expand_apex(self, apex_file: str) -> None:
|
|
"""expand an apex file include apex_payload."""
|
|
apex_dir = self.create_temp_dir()
|
|
with zipfile.ZipFile(apex_file, 'r') as apex_zip:
|
|
apex_zip.extractall(apex_dir)
|
|
payload_img = os.path.join(apex_dir, 'apex_payload.img')
|
|
extract_dir = os.path.join(apex_dir, 'payload_extract')
|
|
os.mkdir(extract_dir)
|
|
debugfs = get_host_tool('debugfs_static')
|
|
run_command([debugfs, payload_img, '-R', f'rdump / {extract_dir}'])
|
|
|
|
# remove /etc and /lost+found and /payload_extract/apex_manifest.pb
|
|
lost_and_found = os.path.join(extract_dir, 'lost+found')
|
|
etc_dir = os.path.join(extract_dir, 'etc')
|
|
os.remove(os.path.join(extract_dir, 'apex_manifest.pb'))
|
|
if os.path.isdir(lost_and_found):
|
|
shutil.rmtree(lost_and_found)
|
|
if os.path.isdir(etc_dir):
|
|
shutil.rmtree(etc_dir)
|
|
|
|
return apex_dir
|
|
|
|
def unzip_host_tools(self, host_tools_file_path: str) -> None:
|
|
dir_name = get_current_dir()
|
|
if os.path.isfile(host_tools_file_path):
|
|
with zipfile.ZipFile(host_tools_file_path, 'r') as zip_obj:
|
|
zip_obj.extractall(dir_name)
|
|
|
|
for i in ["apexer", "deapexer", "avbtool", "mke2fs", "sefcontext_compile", "e2fsdroid",
|
|
"resize2fs", "soong_zip", "aapt2", "merge_zips", "zipalign", "debugfs_static",
|
|
"signapk.jar", "android.jar"]:
|
|
file_path = os.path.join(dir_name, "bin", i)
|
|
if os.path.exists(file_path):
|
|
os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR);
|
|
|
|
def test_DCLA_preprocessing(self):
|
|
"""test DCLA preprocessing done properly."""
|
|
apex_file = os.path.join(get_current_dir(), TEST_APEX + '.apex')
|
|
apex_dir = self.expand_apex(apex_file)
|
|
|
|
# create apex canned_fs_config file, TEST_APEX does not come with one
|
|
canned_fs_config_file = os.path.join(apex_dir, 'canned_fs_config')
|
|
with open(canned_fs_config_file, 'w') as f:
|
|
# add /lib/foo.so file
|
|
lib_dir = os.path.join(apex_dir, 'payload_extract', 'lib')
|
|
os.makedirs(lib_dir)
|
|
foo_file = os.path.join(lib_dir, 'foo.so')
|
|
with open(foo_file, 'w') as lib_file:
|
|
lib_file.write('This is a placeholder lib file.')
|
|
foo_digest = get_digest(foo_file)
|
|
|
|
# add /lib dir and /lib/foo.so in canned_fs_config
|
|
f.write(f'/lib 0 2000 0755\n')
|
|
f.write(f'/lib/foo.so 1000 1000 0644\n')
|
|
|
|
# add /lib/bar.so file
|
|
lib_dir = os.path.join(apex_dir, 'payload_extract', 'lib64')
|
|
os.makedirs(lib_dir)
|
|
bar_file = os.path.join(lib_dir, 'bar.so')
|
|
with open(bar_file, 'w') as lib_file:
|
|
lib_file.write('This is another placeholder lib file.')
|
|
bar_digest = get_digest(bar_file)
|
|
|
|
# add /lib dir and /lib/foo.so in canned_fs_config
|
|
f.write(f'/lib64 0 2000 0755\n')
|
|
f.write(f'/lib64/bar.so 1000 1000 0644\n')
|
|
|
|
f.write(f'/ 0 2000 0755\n')
|
|
f.write(f'/apex_manifest.pb 1000 1000 0644\n')
|
|
|
|
# call apexer_with_DCLA_preprocessing
|
|
manifest_file = os.path.join(apex_dir, 'apex_manifest.pb')
|
|
build_info_file = os.path.join(apex_dir, 'apex_build_info.pb')
|
|
key_file = os.path.join(get_current_dir(), TEST_PRIVATE_KEY)
|
|
apexer= get_host_tool('apexer')
|
|
apexer_wrapper = get_apexer_with_DCLA_preprocessing()
|
|
android_jar = get_host_tool('android.jar')
|
|
apex_out = os.path.join(apex_dir, 'DCLA_preprocessed_output.apex')
|
|
run_command([apexer_wrapper,
|
|
'--apexer', apexer,
|
|
'--canned_fs_config', canned_fs_config_file,
|
|
os.path.join(apex_dir, 'payload_extract'),
|
|
apex_out,
|
|
'--',
|
|
'--android_jar_path', android_jar,
|
|
'--apexer_tool_path', os.path.dirname(apexer),
|
|
'--key', key_file,
|
|
'--manifest', manifest_file,
|
|
'--build_info', build_info_file,
|
|
'--payload_fs_type', 'ext4',
|
|
'--payload_type', 'image',
|
|
'--force'
|
|
])
|
|
|
|
# check the existence of updated canned_fs_config
|
|
updated_canned_fs_config = os.path.join(apex_dir, 'updated_canned_fs_config')
|
|
self.assertTrue(
|
|
os.path.isfile(updated_canned_fs_config),
|
|
'missing updated canned_fs_config file named updated_canned_fs_config')
|
|
|
|
# check the resulting apex, it should have /lib/foo.so/<hash>/foo.so and
|
|
# /lib64/bar.so/<hash>/bar.so
|
|
result_apex_dir = self.expand_apex(apex_out)
|
|
replaced_foo = os.path.join(
|
|
result_apex_dir, f'payload_extract/lib/foo.so/{foo_digest}/foo.so')
|
|
replaced_bar = os.path.join(
|
|
result_apex_dir, f'payload_extract/lib64/bar.so/{bar_digest}/bar.so')
|
|
self.assertTrue(
|
|
os.path.isfile(replaced_foo),
|
|
f'expecting /lib/foo.so/{foo_digest}/foo.so')
|
|
self.assertTrue(
|
|
os.path.isfile(replaced_bar),
|
|
f'expecting /lib64/bar.so/{bar_digest}/bar.so')
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main(verbosity=2)
|