mirror of
https://github.com/niess/python-appimage.git
synced 2025-07-21 21:01:15 +02:00
394 lines
13 KiB
Python
394 lines
13 KiB
Python
import atexit
|
|
from dataclasses import dataclass, field
|
|
from distutils.version import LooseVersion
|
|
import glob
|
|
import json
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
import shutil
|
|
import stat
|
|
import subprocess
|
|
from typing import Dict, List, Optional
|
|
|
|
from .config import Arch, PythonImpl, PythonVersion
|
|
from ..appimage import Appifier
|
|
from ..utils.deps import ensure_excludelist, ensure_patchelf, EXCLUDELIST, \
|
|
PATCHELF
|
|
from ..utils.log import debug, log
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PythonExtractor:
|
|
'''Python extractor from an extracted Manylinux image.'''
|
|
|
|
arch: Arch
|
|
'''Target architecture'''
|
|
|
|
prefix: Path
|
|
'''Target image path'''
|
|
|
|
tag: str
|
|
'''Python binary tag'''
|
|
|
|
|
|
excludelist: Optional[Path] = None
|
|
'''Exclude list for shared libraries.'''
|
|
|
|
patchelf: Optional[Path] = None
|
|
'''Patchelf executable.'''
|
|
|
|
|
|
excluded: List[str] = field(init=False)
|
|
'''Excluded shared libraries.'''
|
|
|
|
impl: PythonImpl = field(init=False)
|
|
'''Python implementation'''
|
|
|
|
library_path: List[str] = field(init=False)
|
|
'''Search paths for libraries (LD_LIBRARY_PATH)'''
|
|
|
|
python_prefix: Path = field(init=False)
|
|
'''Python installation prefix'''
|
|
|
|
version: PythonVersion = field(init=False)
|
|
'''Python version'''
|
|
|
|
|
|
def __post_init__(self):
|
|
# Locate Python installation.
|
|
link = os.readlink(self.prefix / f'opt/python/{self.tag}')
|
|
if not link.startswith('/'):
|
|
raise NotImplementedError()
|
|
object.__setattr__(self, 'python_prefix', self.prefix / link[1:])
|
|
|
|
# Parse implementation and version.
|
|
head, tail = Path(link).name.split('-', 1)
|
|
if head == 'cpython':
|
|
impl = PythonImpl.CPYTHON
|
|
version = PythonVersion.from_str(tail)
|
|
else:
|
|
raise NotImplementedError()
|
|
object.__setattr__(self, 'impl', impl)
|
|
object.__setattr__(self, 'version', version)
|
|
|
|
# Set libraries search path.
|
|
paths = []
|
|
if self.arch in (Arch.AARCH64, Arch.X86_64):
|
|
paths.append(self.prefix / 'lib64')
|
|
paths.append(self.prefix / 'usr/lib64')
|
|
elif self.arch == Arch.I686:
|
|
paths.append(self.prefix / 'lib')
|
|
paths.append(self.prefix / 'usr/lib')
|
|
else:
|
|
raise NotImplementedError()
|
|
paths.append(self.prefix / 'usr/local/lib')
|
|
|
|
ssl = glob.glob(str(self.prefix / 'opt/_internal/openssl-*'))
|
|
if ssl:
|
|
paths.append(Path(ssl[0]) / 'lib')
|
|
|
|
mpdecimal = glob.glob(str(self.prefix / 'opt/_internal/mpdecimal-*'))
|
|
if mpdecimal:
|
|
paths.append(Path(mpdecimal[0]) / 'lib')
|
|
|
|
object.__setattr__(self, 'library_path', paths)
|
|
|
|
# Set excluded libraries.
|
|
if self.excludelist:
|
|
excludelist = Path(self.excludelist)
|
|
else:
|
|
ensure_excludelist()
|
|
excludelist = Path(EXCLUDELIST)
|
|
excluded = []
|
|
with excludelist.open() as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith('#'):
|
|
excluded.append(line)
|
|
object.__setattr__(self, 'excluded', excluded)
|
|
|
|
# Set patchelf, if not provided.
|
|
if self.patchelf is None:
|
|
ensure_patchelf()
|
|
object.__setattr__(self, 'patchelf', PATCHELF)
|
|
else:
|
|
assert(self.patchelf.exists())
|
|
|
|
|
|
def extract(
|
|
self,
|
|
destination: Path,
|
|
*,
|
|
appify: Optional[bool]=False,
|
|
python_prefix: Optional[str]=None,
|
|
system_prefix: Optional[str]=None,
|
|
):
|
|
'''Extract Python runtime.'''
|
|
|
|
python = f'python{self.version.short()}'
|
|
flavoured_python = f'python{self.version.flavoured()}'
|
|
runtime = f'bin/{flavoured_python}'
|
|
packages = f'lib/{flavoured_python}'
|
|
pip = f'bin/pip{self.version.short()}'
|
|
|
|
if python_prefix is None:
|
|
python_prefix = f'opt/{flavoured_python}'
|
|
|
|
if system_prefix is None:
|
|
system_prefix = 'usr'
|
|
|
|
python_dest = destination / python_prefix
|
|
system_dest = destination / system_prefix
|
|
|
|
# Locate include files.
|
|
include = glob.glob(str(self.python_prefix / 'include/*'))
|
|
if include:
|
|
include = Path(include[0]).name
|
|
include = f'include/{include}'
|
|
else:
|
|
raise NotImplementedError()
|
|
|
|
# Clone Python runtime.
|
|
log('CLONE',
|
|
f'{python} from {self.python_prefix.relative_to(self.prefix)}')
|
|
(python_dest / 'bin').mkdir(exist_ok=True, parents=True)
|
|
shutil.copy(self.python_prefix / runtime, python_dest / runtime)
|
|
|
|
# Clone pip wrapper.
|
|
with open(self.python_prefix / pip) as f:
|
|
f.readline() # Skip shebang.
|
|
body = f.read()
|
|
|
|
with open(python_dest / pip, 'w') as f:
|
|
f.write('#! /bin/sh\n')
|
|
f.write(' '.join((
|
|
'"exec"',
|
|
f'"$(dirname $(readlink -f ${0}))/{flavoured_python}"',
|
|
'"$0"',
|
|
'"$@"\n'
|
|
)))
|
|
f.write(body)
|
|
shutil.copymode(self.python_prefix / pip, python_dest / pip)
|
|
|
|
# Clone Python packages.
|
|
for folder in (packages, include):
|
|
shutil.copytree(self.python_prefix / folder, python_dest / folder,
|
|
symlinks=True, dirs_exist_ok=True)
|
|
|
|
# Remove some clutters.
|
|
log('PRUNE', '%s packages', python)
|
|
shutil.rmtree(python_dest / packages / 'test', ignore_errors=True)
|
|
for root, dirs, files in os.walk(python_dest / packages):
|
|
root = Path(root)
|
|
for d in dirs:
|
|
if d == '__pycache__':
|
|
shutil.rmtree(root / d, ignore_errors=True)
|
|
for f in files:
|
|
if f.endswith('.pyc'):
|
|
(root / f).unlink()
|
|
|
|
# Map binary dependencies.
|
|
libs = self.ldd(self.python_prefix / f'bin/{flavoured_python}')
|
|
path = Path(self.python_prefix / f'{packages}/lib-dynload')
|
|
for module in glob.glob(str(path / "*.so")):
|
|
l = self.ldd(module)
|
|
libs.update(l)
|
|
|
|
# Copy and patch binary dependencies.
|
|
libdir = system_dest / 'lib'
|
|
libdir.mkdir(exist_ok=True, parents=True)
|
|
|
|
for (name, src) in libs.items():
|
|
dst = libdir / name
|
|
shutil.copy(src, dst, follow_symlinks=True)
|
|
# Some libraries are read-only, which prevents overriding the
|
|
# destination directory. Below, we change the permission of
|
|
# destination files to read-write (for the owner).
|
|
mode = dst.stat().st_mode
|
|
if not (mode & stat.S_IWUSR):
|
|
mode = mode | stat.S_IWUSR
|
|
dst.chmod(mode)
|
|
|
|
self.set_rpath(dst, '$ORIGIN')
|
|
|
|
# Patch RPATHs of binary modules.
|
|
log('LINK', '%s C-extensions', python)
|
|
path = Path(python_dest / f'{packages}/lib-dynload')
|
|
for module in glob.glob(str(path / "*.so")):
|
|
src = Path(module)
|
|
dst = os.path.relpath(libdir, src.parent)
|
|
self.set_rpath(src, f'$ORIGIN/{dst}')
|
|
|
|
# Patch RPATHs of Python runtime.
|
|
src = python_dest / runtime
|
|
dst = os.path.relpath(libdir, src.parent)
|
|
self.set_rpath(src, f'$ORIGIN/{dst}')
|
|
|
|
# Copy SSL certificates (i.e. clone certifi).
|
|
certs = self.prefix / 'opt/_internal/certs.pem'
|
|
if certs.is_symlink():
|
|
dst = self.prefix / str(certs.readlink())[1:]
|
|
certifi = dst.parent
|
|
assert(certifi.name == 'certifi')
|
|
site_packages = certifi.parent
|
|
assert(site_packages.name == 'site-packages')
|
|
log('INSTALL', certifi.name)
|
|
|
|
for src in glob.glob(str(site_packages / 'certifi*')):
|
|
src = Path(src)
|
|
dst = python_dest / f'{packages}/site-packages/{src.name}'
|
|
if not dst.exists():
|
|
shutil.copytree(src, dst, symlinks=True)
|
|
|
|
cert_src = dst / 'cacert.pem'
|
|
assert(cert_src.exists())
|
|
else:
|
|
raise NotImplementedError()
|
|
|
|
# Copy Tcl & Tk data.
|
|
tx_version = []
|
|
for location in ('usr/local/lib', 'usr/share'):
|
|
tcltk_src = self.prefix / location
|
|
for match in glob.glob(str(tcltk_src / 'tk*')):
|
|
path = Path(match)
|
|
if path.is_dir():
|
|
tx_version.append(LooseVersion(path.name[2:]))
|
|
tx_version.sort()
|
|
tx_version = tx_version[-1]
|
|
|
|
log('INSTALL', f'Tcl/Tk{tx_version}')
|
|
tcltk_dir = Path(system_dest / 'share/tcltk')
|
|
tcltk_dir.mkdir(exist_ok=True, parents=True)
|
|
|
|
for tx in ('tcl', 'tk'):
|
|
name = f'{tx}{tx_version}'
|
|
src = tcltk_src / name
|
|
dst = tcltk_dir / name
|
|
shutil.copytree(src, dst, symlinks=True, dirs_exist_ok=True)
|
|
|
|
if appify:
|
|
appifier = Appifier(
|
|
appdir = str(destination),
|
|
appdir_bin = str(system_dest / 'bin'),
|
|
python_bin = str(python_dest / 'bin'),
|
|
python_pkg = str(python_dest / packages),
|
|
version = self.version,
|
|
tk_version = tx_version,
|
|
cert_src = cert_src
|
|
)
|
|
appifier.appify()
|
|
|
|
|
|
def ldd(self, target: Path) -> Dict[str, Path]:
|
|
'''Cross-platform implementation of ldd, using readelf.'''
|
|
|
|
pattern = re.compile(r'[(]NEEDED[)]\s+Shared library:\s+\[([^\]]+)\]')
|
|
dependencies = dict()
|
|
|
|
def recurse(target: Path):
|
|
result = subprocess.run(f'readelf -d {target}', shell=True,
|
|
check=True, capture_output=True)
|
|
stdout = result.stdout.decode()
|
|
matches = pattern.findall(stdout)
|
|
|
|
for match in matches:
|
|
if (match not in dependencies) and (match not in self.excluded):
|
|
path = self.locate_library(match)
|
|
dependencies[match] = path
|
|
recurse(path)
|
|
|
|
recurse(target)
|
|
return dependencies
|
|
|
|
|
|
def locate_library(self, name: str) -> Path:
|
|
'''Locate a library given its qualified name.'''
|
|
|
|
for dirname in self.library_path:
|
|
path = dirname / name
|
|
if path.exists():
|
|
return path
|
|
else:
|
|
raise FileNotFoundError(name)
|
|
|
|
|
|
def set_rpath(self, target, rpath):
|
|
cmd = f'{self.patchelf} --print-rpath {target}'
|
|
result = subprocess.run(cmd, shell=True, check=True,
|
|
capture_output=True)
|
|
current_rpath = result.stdout.decode().strip()
|
|
if current_rpath != rpath:
|
|
cmd = f"{self.patchelf} --set-rpath '{rpath}' {target}"
|
|
subprocess.run(cmd, shell=True, check=True, capture_output=True)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ImageExtractor:
|
|
'''Manylinux image extractor from layers.'''
|
|
|
|
prefix: Path
|
|
'''Manylinux image prefix.'''
|
|
|
|
tag: Optional[str] = 'latest'
|
|
'''Manylinux image tag.'''
|
|
|
|
|
|
def default_destination(self):
|
|
return self.prefix / f'extracted/{self.tag}'
|
|
|
|
|
|
def extract(self, destination: Optional[Path]=None, *, clean=False):
|
|
'''Extract Manylinux image.'''
|
|
|
|
if destination is None:
|
|
destination = self.default_destination()
|
|
|
|
if clean:
|
|
def clean(destination):
|
|
shutil.rmtree(destination, ignore_errors=True)
|
|
atexit.register(clean, destination)
|
|
|
|
log('EXTRACT', f'{self.prefix.name}:{self.tag}')
|
|
|
|
with open(self.prefix / f'tags/{self.tag}.json') as f:
|
|
meta = json.load(f)
|
|
layers = meta['layers']
|
|
|
|
extracted = []
|
|
extracted_file = destination / '.extracted'
|
|
if destination.exists():
|
|
clean_destination = True
|
|
if extracted_file.exists():
|
|
with extracted_file.open() as f:
|
|
extracted = f.read().split(os.linesep)[:-1]
|
|
|
|
for a, b in zip(layers, extracted):
|
|
if a != b:
|
|
break
|
|
else:
|
|
clean_destination = False
|
|
|
|
if clean_destination:
|
|
shutil.rmtree(destination, ignore_errors=True)
|
|
|
|
for i, layer in enumerate(layers):
|
|
try:
|
|
if layer == extracted[i]:
|
|
continue
|
|
except IndexError:
|
|
pass
|
|
|
|
debug('EXTRACT', f'{layer}.tar.gz')
|
|
filename = self.prefix / f'layers/{layer}.tar.gz'
|
|
cmd = ''.join((
|
|
f'trap \'chmod u+rw -R {destination}\' EXIT ; ',
|
|
f'mkdir -p {destination} && ',
|
|
f'tar -xzf {filename} --exclude=dev -C {destination} && ',
|
|
f'echo \'{layer}\' >> {extracted_file}'
|
|
))
|
|
r = subprocess.run(f'/bin/bash -c "{cmd}"', shell=True,
|
|
capture_output=True)
|
|
if r.returncode != 0:
|
|
raise ValueError(r.stderr.decode())
|