apt_check.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. #!/usr/bin/python3
  2. # nice apt-get -s -o Debug::NoLocking=true upgrade | grep ^Inst
  3. import apt
  4. import apt_pkg
  5. import os
  6. import sys
  7. from optparse import OptionParser
  8. import gettext
  9. import subprocess
  10. SYNAPTIC_PINFILE = "/var/lib/synaptic/preferences"
  11. DISTRO = subprocess.check_output(
  12. ["lsb_release", "-c", "-s"],
  13. universal_newlines=True).strip()
  14. def _(msg):
  15. return gettext.dgettext("update-notifier", msg)
  16. def _handleException(type, value, tb):
  17. sys.stderr.write("E: " + _("Unknown Error: '%s' (%s)") % (type, value))
  18. sys.exit(-1)
  19. def clean(cache, depcache):
  20. " unmark (clean) all changes from the given depcache "
  21. # mvo: looping is too inefficient with the new auto-mark code
  22. # for pkg in cache.Packages:
  23. # depcache.MarkKeep(pkg)
  24. depcache.init()
  25. def saveDistUpgrade(cache, depcache):
  26. """ this function mimics a upgrade but will never remove anything """
  27. depcache.upgrade(True)
  28. if depcache.del_count > 0:
  29. clean(cache, depcache)
  30. depcache.upgrade()
  31. def isSecurityUpgrade(ver):
  32. " check if the given version is a security update (or masks one) "
  33. security_pockets = [("Ubuntu", "%s-security" % DISTRO),
  34. ("UbuntuESM", "%s-security" % DISTRO),
  35. ("gNewSense", "%s-security" % DISTRO),
  36. ("Debian", "%s-updates" % DISTRO)]
  37. for (file, index) in ver.file_list:
  38. for origin, archive in security_pockets:
  39. if (file.archive == archive and file.origin == origin):
  40. return True
  41. return False
  42. def isESMUpgrade(ver):
  43. " check if the given version is a security update (or masks one) "
  44. for (file, index) in ver.file_list:
  45. if file.origin == "UbuntuESM" and file.archive.startswith(DISTRO):
  46. return True
  47. return False
  48. def write_package_names(outstream, cache, depcache):
  49. " write out package names that change to outstream "
  50. pkgs = [pkg for pkg in cache.packages if depcache.marked_install(pkg)
  51. or depcache.marked_upgrade(pkg)]
  52. outstream.write("\n".join([p.name for p in pkgs]))
  53. def write_human_readable_summary(outstream, upgrades, security_updates,
  54. esm_updates, have_esm, disabled_esm_updates):
  55. " write out human summary summary to outstream "
  56. if have_esm is not None:
  57. if have_esm:
  58. outstream.write(gettext.dgettext("update-notifier",
  59. "UA Infrastructure Extended "
  60. "Security Maintenance (ESM) is "
  61. "enabled."))
  62. else:
  63. outstream.write(gettext.dgettext("update-notifier",
  64. "UA Infrastructure Extended "
  65. "Security Maintenance (ESM) is "
  66. "not enabled."))
  67. outstream.write("\n\n")
  68. outstream.write(gettext.dngettext("update-notifier",
  69. "%i update can be installed "
  70. "immediately.",
  71. "%i updates can be installed "
  72. "immediately.",
  73. upgrades) % upgrades)
  74. outstream.write("\n")
  75. if esm_updates > 0:
  76. outstream.write(gettext.dngettext("update-notifier",
  77. "%i of these updates is provided "
  78. "through UA Infrastructure ESM.",
  79. "%i of these updates are "
  80. "provided through UA "
  81. "Infrastructure ESM.",
  82. esm_updates) %
  83. esm_updates)
  84. outstream.write("\n")
  85. outstream.write(gettext.dngettext("update-notifier",
  86. "%i of these updates is a "
  87. "security update.",
  88. "%i of these updates are "
  89. "security updates.",
  90. security_updates) %
  91. security_updates)
  92. if upgrades > 0 or security_updates > 0 or esm_updates > 0:
  93. outstream.write("\n")
  94. outstream.write(gettext.dgettext("update-notifier",
  95. "To see these additional updates "
  96. "run: apt list --upgradable"))
  97. if have_esm is not None and not have_esm:
  98. outstream.write("\n")
  99. if disabled_esm_updates > 0:
  100. outstream.write("\n")
  101. outstream.write(gettext.dngettext("update-notifier",
  102. "Enable UA Infrastructure ESM "
  103. "to receive %i additional "
  104. "security update.",
  105. "Enable UA Infrastructure ESM "
  106. "to receive %i additional "
  107. "security updates.",
  108. disabled_esm_updates) %
  109. disabled_esm_updates)
  110. else:
  111. outstream.write("\n")
  112. outstream.write(gettext.dgettext("update-notifier",
  113. "Enable UA Infrastructure ESM to "
  114. "receive additional future "
  115. "security updates."))
  116. outstream.write("\n")
  117. outstream.write(gettext.dgettext("update-notifier",
  118. "See https://ubuntu.com/esm "
  119. "or run: sudo ua status"))
  120. outstream.write("\n")
  121. def has_disabled_esm_security_update(depcache, pkg):
  122. " check if we have a disabled ESM security update "
  123. inst_ver = pkg.current_ver
  124. if not inst_ver:
  125. return False
  126. for ver in pkg.version_list:
  127. if ver == inst_ver:
  128. break
  129. for (file, index) in ver.file_list:
  130. if (file.origin == "UbuntuESM" and file.archive.startswith(DISTRO)
  131. and depcache.policy.get_priority(file) == -32768):
  132. return True
  133. return False
  134. def init():
  135. " init the system, be nice "
  136. # FIXME: do a ionice here too?
  137. os.nice(19)
  138. apt_pkg.init()
  139. def run(options=None):
  140. # we are run in "are security updates installed automatically?"
  141. # question mode
  142. if options.security_updates_unattended:
  143. res = apt_pkg.config.find_i("APT::Periodic::Unattended-Upgrade", 0)
  144. # print(res)
  145. sys.exit(res)
  146. # get caches
  147. try:
  148. cache = apt_pkg.Cache(apt.progress.base.OpProgress())
  149. except SystemError as e:
  150. sys.stderr.write("E: " + _("Error: Opening the cache (%s)") % e)
  151. sys.exit(-1)
  152. depcache = apt_pkg.DepCache(cache)
  153. # read the synaptic pins too
  154. if os.path.exists(SYNAPTIC_PINFILE):
  155. depcache.read_pinfile(SYNAPTIC_PINFILE)
  156. depcache.init()
  157. if depcache.broken_count > 0:
  158. sys.stderr.write("E: " + _("Error: BrokenCount > 0"))
  159. sys.exit(-1)
  160. # do the upgrade (not dist-upgrade!)
  161. try:
  162. saveDistUpgrade(cache, depcache)
  163. except SystemError as e:
  164. sys.stderr.write("E: " + _("Error: Marking the upgrade (%s)") % e)
  165. sys.exit(-1)
  166. # Check if we have ESM enabled or disabled; and if it exists in the
  167. # first place.
  168. have_esm = None # None == does not exist
  169. for file in cache.file_list:
  170. if file.origin == "UbuntuESM" and file.archive.startswith(DISTRO):
  171. # In case of multiple ESM repos, one enabled is sufficient.
  172. if depcache.policy.get_priority(file) == -32768:
  173. # We found a disabled ESM repository, but we'll only count
  174. # ESM as disabled here if we have not found any other ESM
  175. # repo, so one ESM repo being enabled means ESM is enabled.
  176. if have_esm is None:
  177. have_esm = False
  178. else:
  179. have_esm = True
  180. break
  181. # analyze the ugprade
  182. upgrades = 0
  183. security_updates = 0
  184. esm_updates = 0
  185. disabled_esm_updates = 0
  186. # we need another cache that has more pkg details
  187. with apt.Cache() as aptcache:
  188. for pkg in cache.packages:
  189. if has_disabled_esm_security_update(depcache, pkg):
  190. disabled_esm_updates += 1
  191. # skip packages that are not marked upgraded/installed
  192. if not (depcache.marked_install(pkg)
  193. or depcache.marked_upgrade(pkg)):
  194. continue
  195. # check if this is really a upgrade or a false positive
  196. # (workaround for ubuntu #7907)
  197. inst_ver = pkg.current_ver
  198. cand_ver = depcache.get_candidate_ver(pkg)
  199. if cand_ver == inst_ver:
  200. continue
  201. # check for security upgrades
  202. if isSecurityUpgrade(cand_ver):
  203. if isESMUpgrade(cand_ver):
  204. esm_updates += 1
  205. upgrades += 1
  206. security_updates += 1
  207. continue
  208. # check to see if the update is a phased one
  209. try:
  210. from UpdateManager.Core.UpdateList import UpdateList
  211. ul = UpdateList(None)
  212. ignored = ul._is_ignored_phased_update(
  213. aptcache[pkg.get_fullname()])
  214. if ignored:
  215. depcache.mark_keep(pkg)
  216. continue
  217. except ImportError:
  218. pass
  219. upgrades = upgrades + 1
  220. # now check for security updates that are masked by a
  221. # candidate version from another repo (-proposed or -updates)
  222. for ver in pkg.version_list:
  223. if (inst_ver
  224. and apt_pkg.version_compare(ver.ver_str,
  225. inst_ver.ver_str) <= 0):
  226. continue
  227. if isESMUpgrade(ver):
  228. esm_updates += 1
  229. if isSecurityUpgrade(ver):
  230. security_updates += 1
  231. break
  232. # print the number of upgrades
  233. if options and options.show_package_names:
  234. write_package_names(sys.stderr, cache, depcache)
  235. elif options and options.readable_output:
  236. write_human_readable_summary(sys.stdout, upgrades, security_updates,
  237. esm_updates, have_esm,
  238. disabled_esm_updates)
  239. else:
  240. # print the number of regular upgrades and the number of
  241. # security upgrades
  242. sys.stderr.write("%s;%s" % (upgrades, security_updates))
  243. # return the number of upgrades (if its used as a module)
  244. return(upgrades, security_updates)
  245. if __name__ == "__main__":
  246. # setup a exception handler to make sure that uncaught stuff goes
  247. # to the notifier
  248. sys.excepthook = _handleException
  249. # gettext
  250. APP = "update-notifier"
  251. DIR = "/usr/share/locale"
  252. gettext.bindtextdomain(APP, DIR)
  253. gettext.textdomain(APP)
  254. # check arguments
  255. parser = OptionParser()
  256. parser.add_option("-p",
  257. "--package-names",
  258. action="store_true",
  259. dest="show_package_names",
  260. help=_("Show the packages that are "
  261. "going to be installed/upgraded"))
  262. parser.add_option("",
  263. "--human-readable",
  264. action="store_true",
  265. dest="readable_output",
  266. help=_("Show human readable output on stdout"))
  267. parser.add_option("",
  268. "--security-updates-unattended",
  269. action="store_true",
  270. help=_("Return the time in days when security updates "
  271. "are installed unattended (0 means disabled)"))
  272. (options, args) = parser.parse_args()
  273. # run it
  274. init()
  275. run(options)