public inbox for isar-users@googlegroups.com
 help / color / mirror / Atom feed
From: Uladzimir Bely <ubely@ilbers.de>
To: isar-users@googlegroups.com
Subject: [PATCH v3 1/5] Add debrepo python script handling base-apt
Date: Fri, 25 Mar 2022 11:32:22 +0100	[thread overview]
Message-ID: <20220325103226.27033-2-ubely@ilbers.de> (raw)
In-Reply-To: <20220325103226.27033-1-ubely@ilbers.de>

Signed-off-by: Uladzimir Bely <ubely@ilbers.de>
---
 scripts/debrepo | 361 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 361 insertions(+)
 create mode 100755 scripts/debrepo

diff --git a/scripts/debrepo b/scripts/debrepo
new file mode 100755
index 00000000..fba8342e
--- /dev/null
+++ b/scripts/debrepo
@@ -0,0 +1,361 @@
+#!/usr/bin/env python3
+
+# This software is a part of ISAR.
+# Copyright (C) 2022 ilbers GmbH
+
+import os
+import sys
+
+import subprocess
+import getopt
+import pickle
+
+import apt_pkg
+import apt.progress.base
+
+
+class DebRepo(object):
+    def __init__(self, workdir, cmdline_opts):
+        self.workdir = workdir
+        self.optsfile = self.workdir + "/repo.opts"
+
+        # Set default values
+        self.distro = "debian"
+
+        self.repo = self.workdir + "/repo/apt" + "/" + self.distro
+        self.repodb = self.workdir + "/repo/db" + "/" + self.distro
+        self.mirror = "http://deb.debian.org/debian"
+        self.arch = "amd64"
+        self.codename = "bullseye"
+        self.components = "main contrib non-free"
+        self.keydir = "/etc/apt/trusted.gpg.d"
+        self.check_gpg = True
+
+        # Load stored opts
+        opts = self.load_opts()
+        print("stored opts: " + str(opts))
+
+        # Overwrite opts by cmdline_opts
+        for opt, arg in cmdline_opts.items():
+            opts[opt] = arg
+
+        print("all opts: " + str(opts))
+
+        # Replace by values passed in commandline
+        for opt, arg in opts.items():
+            if opt == "repodir":
+                self.repo = arg + "/" + self.distro
+            if opt == "repodbdir":
+                self.repodb = arg + "/" + self.distro
+            if opt == "mirror":
+                self.mirror = arg
+            if opt == "arch":
+                self.arch = arg
+            if opt == "distro":
+                self.distro = arg
+            if opt == "codename":
+                self.codename = arg
+            if opt == "components":
+                self.components = arg.replace(",", " ")
+            if opt == "keydir":
+                self.keydir = arg
+            if opt == "check_gpg":
+                self.check_gpg = arg
+
+        self.save_opts(opts)
+
+        print("workdir:     " + str(self.workdir))
+        print("repo:        " + str(self.repo))
+        print("repodb:      " + str(self.repodb))
+        print("mirror:      " + str(self.mirror))
+        print("arch:        " + str(self.arch))
+        print("distro:      " + str(self.distro))
+        print("codename:    " + str(self.codename))
+        print("components:  " + str(self.components))
+        print("keydir:      " + str(self.keydir))
+        print("check_gpg:   " + str(self.check_gpg))
+
+        self.cache = None
+        self.depcache = None
+        self.sr = None
+
+    def create_rootfs(self):
+        if not os.path.exists(self.workdir + "/var/lib/dpkg"):
+            os.makedirs(self.workdir + "/var/lib/dpkg")
+        f = open(self.workdir + "/var/lib/dpkg" + "/status", "w")
+        f.flush
+        f.close
+
+        if not os.path.exists(self.workdir + "/etc/apt/"):
+            os.makedirs(self.workdir + "/etc/apt/")
+        f = open(self.workdir + "/etc/apt/sources.list", "w")
+        f.write("deb [arch=" + self.arch + "] " + self.mirror + " " + self.codename + " " + self.components + "\n")
+        f.write("deb-src [arch=" + self.arch + "] " + self.mirror + " " + self.codename + " " + self.components + "\n")
+        f.flush
+        f.close
+
+        if not os.path.exists(self.workdir + "/var/cache/apt/archives/partial"):
+            os.makedirs(self.workdir + "/var/cache/apt/archives/partial")
+
+    def create_repo_dist(self):
+        if not os.path.exists(self.repo + "/conf"):
+            os.makedirs(self.repo + "/conf")
+        f = open(self.repo + "/conf" + "/distributions", "w")
+        f.write("Codename: " + self.codename + "\n")
+        f.write("Architectures: i386 armhf arm64 amd64 mipsel riscv64 source" + "\n")
+        f.write("Components: " + self.components + "\n")
+        f.flush
+        f.close
+
+    def apt_config(self):
+        # Configure apt to work with empty directory
+        apt_pkg.config.set("APT::Architecture", self.arch)
+        apt_pkg.config.set("Dir", self.workdir)
+        apt_pkg.config.set("Dir::Cache", self.workdir + "/var/cache/apt")
+        apt_pkg.config.set("Dir::State::status", self.workdir + "/var/lib/dpkg/status")
+
+        apt_pkg.config.set("APT::Install-Recommends", "0")
+        apt_pkg.config.set("APT::Install-Suggests", "0")
+
+        # Use host keys for authentification
+        # apt_pkg.config.set("Dir::Etc::Trusted", "/etc/apt/trusted.gpg")
+        # apt_pkg.config.set("Dir::Etc::TrustedParts", "/etc/apt/trusted.gpg.d")
+        apt_pkg.config.set("Dir::Etc::TrustedParts", self.keydir)
+
+        # Allow using repositories without keys
+        if not self.check_gpg:
+            apt_pkg.config.set("Acquire::AllowInsecureRepositories", "1")
+
+    def mark_essential(self):
+        for pkg in self.cache.packages:
+            if pkg.essential:
+                self.depcache.mark_install(pkg)
+
+    def mark_by_prio(self, priority):
+        for pkg in self.cache.packages:
+            ver = self.depcache.get_candidate_ver(pkg)
+            if ver and ver.priority <= priority:
+                self.depcache.mark_install(pkg)
+
+    def mark_list(self, pkglist):
+        if pkglist:
+            for pkgname in pkglist:
+                if pkgname in self.cache:
+                    pkg = self.cache[pkgname]
+                    self.depcache.mark_install(pkg)
+
+    def handle_deb(self, item):
+        subprocess.run([
+            "reprepro",
+            "--dbdir", self.repodb,
+            "--outdir", self.repo,
+            "--confdir", self.repo + "/conf",
+            "-C", "main",
+            "includedeb",
+            self.codename,
+            item.destfile
+            ])
+
+    def handle_repo(self, fetcher):
+        fetcher.run()
+        for item in fetcher.items:
+            if item.status == item.STAT_ERROR:
+                print("Some error ocured: '%s'" % item.error_text)
+                pass
+            else:
+                self.handle_deb(item)
+
+    def get_filename(self, url):
+        fragment_removed = url.split("#")[0]  # keep to left of first #
+        query_string_removed = fragment_removed.split("?")[0]
+        scheme_removed = query_string_removed.split("://")[-1].split(":")[-1]
+        if scheme_removed.find("/") == -1:
+            return ""
+        return os.path.basename(scheme_removed)
+
+    def download_file(self, uri):
+        filename = self.get_filename(uri)
+        subprocess.run([
+            "wget",
+            uri,
+            "-O",
+            filename
+        ])
+
+    def handle_dsc(self, uri):
+        filename = self.get_filename(uri)
+        subprocess.run([
+            "reprepro",
+            "--dbdir", self.repodb,
+            "--outdir", self.repo,
+            "--confdir", self.repo + "/conf",
+            "-C", "main",
+            "-S", "-", "-P" "source",
+            "--delete",
+            "includedsc",
+            self.codename,
+            os.path.realpath(filename)
+            ])
+
+    def handle_src_list(self, pkgs):
+        if pkgs:
+            for pkgname in pkgs:
+                dsc = ""
+                self.sr.restart()
+                if self.sr.lookup(pkgname):
+                    print(self.sr.files)
+                    for sr_file in self.sr.files:
+                        print(self.sr.index.archive_uri(sr_file[2]))
+                        self.download_file(self.sr.index.archive_uri(sr_file[2]))
+
+                    dsc_uri = self.sr.index.archive_uri(self.sr.files[0][2])
+                    self.handle_dsc(dsc_uri)
+
+    def apt_run(self, init, srcmode, pkgs, controlfile):
+        apt_pkg.init()
+
+        sources = apt_pkg.SourceList()
+        sources.read_main_list()
+
+        progress = apt.progress.text.AcquireProgress()
+
+        self.cache = apt_pkg.Cache()
+        self.cache.update(progress, sources)
+        self.cache = apt_pkg.Cache()
+
+        self.depcache = apt_pkg.DepCache(self.cache)
+        self.sr = apt_pkg.SourceRecords()
+
+        if init:
+            self.mark_essential()
+            # 1(required), 2(important), 3(standard), 4(optional), 5(extra)
+            self.mark_by_prio(1)
+
+        if srcmode:
+            self.handle_src_list(pkgs)
+        else:
+            self.mark_list(pkgs)
+
+        if controlfile:
+            fobj = open(controlfile, "r")
+            try:
+                Parse = apt_pkg.TagFile(fobj)
+                while Parse.step() == 1:
+                    for item in apt_pkg.parse_depends(Parse.section.get("Build-Depends", "")):
+                        #self.mark_list([item[0][0]])
+                        if item[0][0] in self.cache:
+                            pkg = self.cache[item[0][0]]
+                            print("pkg: " + str(pkg))
+                            self.depcache.mark_install(pkg)
+
+            finally:
+                fobj.close()
+
+        if init or not srcmode:
+            fetcher = apt_pkg.Acquire(progress)
+            pm = apt_pkg.PackageManager(self.depcache)
+
+            recs = apt_pkg.PackageRecords(self.cache)
+            pm.get_archives(fetcher, sources, recs)
+
+            self.handle_repo(fetcher)
+
+    def load_opts(self):
+        params = {}
+        if os.path.isfile(self.optsfile):
+            with open(self.optsfile, 'rb') as file:
+                data = file.read()
+                if data:
+                    params = pickle.loads(data)
+
+        return params
+
+    def save_opts(self, opts):
+        file = open(self.optsfile, 'wb')
+        pickle.dump(opts, file)
+        file.close()
+
+
+class DebRepoArgs(object):
+    def __init__(self):
+        self.workdir = ""
+        self.init = False
+        self.srcmode = False
+        self.controlfile = ""
+
+        self.opts = {}
+        self.pkgs = []
+
+        try:
+            opts, args = getopt.getopt(sys.argv[1:], "", [
+                "init",
+                "srcmode",
+                "workdir=",
+                "repodir=",
+                "repodbdir=",
+                "mirror=",
+                "arch=",
+                "distro=",
+                "codename=",
+                "components=",
+                "keydir=",
+                "no-check-gpg",
+                "controlfile=",
+                ])
+        except getopt.GetoptError as msg:
+            print("Error: " + str(msg))
+            sys.exit(1)
+
+        for opt, arg in opts:
+            if opt in ("--workdir"):
+                self.workdir = arg
+            if opt in ("--init"):
+                self.init = True
+            if opt in ("--srcmode"):
+                self.srcmode = True
+            if opt in ("--controlfile"):
+                self.controlfile = arg
+
+            if opt in ("--repodir",
+                       "--repodbdir",
+                       "--mirror",
+                       "--arch",
+                       "--distro",
+                       "--codename",
+                       "--components",
+                       "--keydir",
+                       "--controlfile",
+                       ):
+                self.opts[opt[2:]] = arg
+            if opt in ("--no-check-gpg"):
+                self.opts['check_gpg'] = False
+
+        if not self.workdir:
+            print("Error: workdir is not specified")
+            sys.exit(1)
+
+        self.pkgs = args
+
+
+def main():
+    args = DebRepoArgs()
+
+    if not (args.init or args.pkgs or args.controlfile):
+        print("Nothing to do")
+        sys.exit(0)
+
+    if not os.path.exists(args.workdir):
+        os.makedirs(args.workdir)
+
+    debrepo = DebRepo(args.workdir, args.opts)
+
+    if (args.init):
+        debrepo.create_rootfs()
+        debrepo.create_repo_dist()
+
+    debrepo.apt_config()
+    debrepo.apt_run(args.init, args.srcmode, args.pkgs, args.controlfile)
+
+
+if __name__ == "__main__":
+    main()
-- 
2.20.1


  reply	other threads:[~2022-03-25 10:32 UTC|newest]

Thread overview: 7+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-03-25 10:32 [PATCH v3 0/5] Improving base-apt usage PoC Uladzimir Bely
2022-03-25 10:32 ` Uladzimir Bely [this message]
2022-03-25 10:32 ` [PATCH v3 2/5] meta: Use cached base-apt repo to debootstrap Uladzimir Bely
2022-03-25 10:32 ` [PATCH v3 3/5] meta: always use base-apt repo in local mode Uladzimir Bely
2022-03-25 10:32 ` [PATCH v3 4/5] base-apt: Predownload packages to base-apt before install Uladzimir Bely
2022-03-30 13:41   ` Moessbauer, Felix
2022-03-25 10:32 ` [PATCH v3 5/5] draft: make isar-apt repo visible for base-apt Uladzimir Bely

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20220325103226.27033-2-ubely@ilbers.de \
    --to=ubely@ilbers.de \
    --cc=isar-users@googlegroups.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox