Package entropy :: Package server :: Package interfaces :: Module mirrors

Source Code for Module entropy.server.interfaces.mirrors

   1  # -*- coding: utf-8 -*- 
   2  """ 
   3   
   4      @author: Fabio Erculiani <[email protected]> 
   5      @contact: [email protected] 
   6      @copyright: Fabio Erculiani 
   7      @license: GPL-2 
   8   
   9      B{Entropy Package Manager Server Mirrors Interfaces}. 
  10   
  11  """ 
  12  import os 
  13  import shutil 
  14  import time 
  15  import errno 
  16  import threading 
  17  import multiprocessing 
  18  import socket 
  19  import codecs 
  20  try: 
  21      from Queue import Queue 
  22  except ImportError: 
  23      from queue import Queue 
  24   
  25  from entropy.exceptions import EntropyPackageException 
  26  from entropy.output import red, darkgreen, bold, brown, blue, darkred, \ 
  27      darkblue, purple, teal 
  28  from entropy.const import etpConst, const_get_int, const_get_cpus, \ 
  29      const_mkdtemp, const_mkstemp, const_file_readable, const_dir_readable 
  30  from entropy.cache import EntropyCacher 
  31  from entropy.i18n import _ 
  32  from entropy.misc import RSS, ParallelTask 
  33  from entropy.transceivers import EntropyTransceiver 
  34  from entropy.transceivers.uri_handlers.skel import EntropyUriHandler 
  35  from entropy.core.settings.base import SystemSettings 
  36  from entropy.server.interfaces.db import ServerPackagesRepository 
  37   
  38  import entropy.tools 
  39   
  40   
41 -class Server(object):
42 43 SYSTEM_SETTINGS_PLG_ID = etpConst['system_settings_plugins_ids']['server_plugin'] 44
45 - def __init__(self, server, repository_id):
46 47 from entropy.server.transceivers import TransceiverServerHandler 48 from entropy.server.interfaces.main import Server as MainServer 49 50 if not isinstance(server, MainServer): 51 raise AttributeError("entropy.server.interfaces.main.Server needed") 52 53 self._entropy = server 54 self.TransceiverServerHandler = TransceiverServerHandler 55 self.Cacher = EntropyCacher() 56 self._settings = SystemSettings()
57 58
59 - def _show_interface_status(self, repository_id):
60 """ 61 Print Entropy Server Mirrors interface status. 62 """ 63 mytxt = blue("%s:") % (_("Entropy Server Mirrors Interface loaded"),) 64 self._entropy.output( 65 mytxt, 66 importance = 2, 67 level = "info", 68 header = red(" @@ ") 69 ) 70 for mirror in self._entropy.remote_repository_mirrors(repository_id): 71 mytxt = _("repository mirror") 72 mirror = EntropyTransceiver.hide_sensible_data(mirror) 73 self._entropy.output( 74 "%s: %s" % (purple(mytxt), darkgreen(mirror),), 75 importance = 0, 76 level = "info", 77 header = brown(" # ") 78 ) 79 for mirror in self._entropy.remote_packages_mirrors(repository_id): 80 mytxt = _("packages mirror") 81 mirror = EntropyTransceiver.hide_sensible_data(mirror) 82 self._entropy.output( 83 blue("%s: %s") % (teal(mytxt), darkgreen(mirror),), 84 importance = 0, 85 level = "info", 86 header = brown(" # ") 87 )
88
89 - def _read_remote_file_in_branches(self, repository_id, filename, 90 excluded_branches = None):
91 """ 92 Reads a file remotely located in all the available branches, in 93 repository directory. 94 95 @param repository_id: repository identifier 96 @type repository_id: string 97 @param filename: name of the file that should be located inside 98 repository database directory 99 @type filename: string 100 @keyword excluded_branches: list of branch identifiers excluded or None 101 @type excluded_branches: list or None 102 @return: dictionary with branches as key and raw file content as value: 103 {'4': 'abcd\n', '5': 'defg\n'} 104 @rtype: dict 105 """ 106 if excluded_branches is None: 107 excluded_branches = [] 108 109 branch_data = {} 110 mirrors = self._entropy.remote_repository_mirrors(repository_id) 111 for uri in mirrors: 112 113 crippled_uri = EntropyTransceiver.get_uri_name(uri) 114 115 self._entropy.output( 116 "[%s] %s: %s => %s" % ( 117 brown(repository_id), 118 blue(_("looking for file in mirror")), 119 darkgreen(crippled_uri), 120 filename, 121 ), 122 importance = 1, 123 level = "info", 124 header = brown(" @@ ") 125 ) 126 127 # not using override data on purpose (remote url can be 128 # overridden...) 129 branches_path = self._entropy._get_remote_repository_relative_path( 130 repository_id) 131 txc = self._entropy.Transceiver(uri) 132 txc.set_verbosity(False) 133 134 with txc as handler: 135 136 branches = handler.list_content(branches_path) 137 for branch in branches: 138 139 # is branch excluded ? 140 if branch in excluded_branches: 141 continue 142 143 if branch_data.get(branch) != None: 144 # already read 145 continue 146 147 mypath = os.path.join("/", branches_path, branch, filename) 148 if not handler.is_file(mypath): 149 # nothing to do, not a file 150 continue 151 152 tmp_dir = const_mkdtemp(prefix = "entropy.server") 153 down_path = os.path.join(tmp_dir, 154 os.path.basename(filename)) 155 tries = 4 156 success = False 157 while tries: 158 downloaded = handler.download(mypath, down_path) 159 if not downloaded: 160 tries -= 1 161 continue # argh! 162 success = True 163 break 164 165 if success and os.path.isfile(down_path): 166 enc = etpConst['conf_encoding'] 167 with codecs.open(down_path, "r", encoding=enc) \ 168 as down_f: 169 branch_data[branch] = down_f.read() 170 171 shutil.rmtree(tmp_dir, True) 172 173 return branch_data
174
175 - def lock_mirrors(self, repository_id, lock, mirrors = None, 176 unlock_locally = True, quiet = False):
177 """ 178 Lock remote mirrors for given repository. In this way repository 179 will be locked for both Entropy Server and Entropy Client instances. 180 181 @param repository_id: repository identifier 182 @type repository_id: string 183 @param lock: True, for lock, False for unlock 184 @type lock: bool 185 @keyword mirrors: provide a list of repository mirrors and override 186 the current ones (which are stored inside repository metadata) 187 @type mirrors: list 188 @keyword unlock_locally: True, if local mirror lock file should be 189 handled too (in case of shadow repos local lock file should not 190 be touched) 191 @type unlock_locally: bool 192 @return: True, if action is successfull 193 @rtype: bool 194 """ 195 196 if mirrors is None: 197 mirrors = self._entropy.remote_repository_mirrors(repository_id) 198 199 done = True 200 for uri in mirrors: 201 202 crippled_uri = EntropyTransceiver.get_uri_name(uri) 203 204 if not quiet: 205 lock_text = _("unlocking") 206 if lock: 207 lock_text = _("locking") 208 self._entropy.output( 209 "[%s|%s] %s %s" % ( 210 brown(repository_id), 211 darkgreen(crippled_uri), 212 bold(lock_text), 213 blue("%s...") % (_("mirror"),), 214 ), 215 importance = 1, 216 level = "info", 217 header = brown(" * "), 218 back = True 219 ) 220 221 repo_relative = \ 222 self._entropy._get_override_remote_repository_relative_path( 223 repository_id) 224 if repo_relative is None: 225 repo_relative = \ 226 self._entropy._get_remote_repository_relative_path( 227 repository_id) 228 base_path = os.path.join(repo_relative, 229 self._settings['repositories']['branch']) 230 lock_file = os.path.join(base_path, 231 etpConst['etpdatabaselockfile']) 232 233 txc = self._entropy.Transceiver(uri) 234 txc.set_verbosity(False) 235 if quiet: 236 txc.set_silent(True) 237 238 with txc as handler: 239 240 if lock: 241 rc_lock = self._do_mirror_lock( 242 repository_id, uri, handler, quiet = quiet) 243 else: 244 rc_lock = self._do_mirror_unlock( 245 repository_id, uri, handler, 246 unlock_locally = unlock_locally, 247 quiet = quiet) 248 249 if not rc_lock: 250 done = False 251 252 if done: 253 db_taint_file = self._entropy._get_local_repository_taint_file( 254 repository_id) 255 if os.path.isfile(db_taint_file): 256 os.remove(db_taint_file) 257 258 return done
259 260
261 - def lock_mirrors_for_download(self, repository_id, lock, 262 mirrors = None, unlock_locally = True, quiet = False):
263 """ 264 This functions makes Entropy clients unable to download the repository 265 from given mirrors. 266 267 @param repository_id: repository identifier 268 @type repository_id: string 269 @param lock: True, for lock, False for unlock 270 @type lock: bool 271 @keyword mirrors: provide a list of repository mirrors and override 272 the current ones (which are stored inside repository metadata) 273 @type mirrors: list 274 @keyword unlock_locally: True, if local mirror lock file should be 275 handled too (in case of shadow repos local lock file should not 276 be touched) 277 @type unlock_locally: bool 278 @return: True, if action is successfull 279 @rtype: bool 280 """ 281 if mirrors is None: 282 mirrors = self._entropy.remote_repository_mirrors(repository_id) 283 284 done = True 285 for uri in mirrors: 286 287 crippled_uri = EntropyTransceiver.get_uri_name(uri) 288 289 if not quiet: 290 lock_text = _("unlocking") 291 if lock: 292 lock_text = _("locking") 293 self._entropy.output( 294 "[%s|%s] %s %s..." % ( 295 blue(repository_id), 296 red(crippled_uri), 297 bold(lock_text), 298 blue(_("mirror for download")), 299 ), 300 importance = 1, 301 level = "info", 302 header = red(" @@ "), 303 back = True 304 ) 305 306 lock_file = etpConst['etpdatabasedownloadlockfile'] 307 repo_relative = \ 308 self._entropy._get_override_remote_repository_relative_path( 309 repository_id) 310 if repo_relative is None: 311 repo_relative = \ 312 self._entropy._get_remote_repository_relative_path( 313 repository_id) 314 my_path = os.path.join(repo_relative, 315 self._settings['repositories']['branch']) 316 lock_file = os.path.join(my_path, lock_file) 317 318 txc = self._entropy.Transceiver(uri) 319 txc.set_verbosity(False) 320 if quiet: 321 txc.set_silent(True) 322 323 with txc as handler: 324 325 if lock and handler.is_file(lock_file): 326 self._entropy.output( 327 "[%s|%s] %s" % ( 328 blue(repository_id), 329 red(crippled_uri), 330 blue(_("mirror already locked for download")), 331 ), 332 importance = 1, 333 level = "info", 334 header = red(" @@ ") 335 ) 336 continue 337 338 elif not lock and not handler.is_file(lock_file): 339 self._entropy.output( 340 "[%s|%s] %s" % ( 341 blue(repository_id), 342 red(crippled_uri), 343 blue(_("mirror already unlocked for download")), 344 ), 345 importance = 1, 346 level = "info", 347 header = red(" @@ ") 348 ) 349 continue 350 351 if lock: 352 rc_lock = self._do_mirror_lock( 353 repository_id, uri, handler, 354 dblock = False, quiet = quiet) 355 else: 356 rc_lock = self._do_mirror_unlock( 357 repository_id, uri, 358 handler, dblock = False, 359 unlock_locally = unlock_locally, 360 quiet = quiet) 361 if not rc_lock: 362 done = False 363 364 return done
365
366 - def _do_mirror_lock(self, repository_id, uri, txc_handler, 367 dblock = True, quiet = False):
368 369 repo_relative = \ 370 self._entropy._get_override_remote_repository_relative_path( 371 repository_id) 372 if repo_relative is None: 373 repo_relative = self._entropy._get_remote_repository_relative_path( 374 repository_id) 375 376 my_path = os.path.join(repo_relative, 377 self._settings['repositories']['branch']) 378 379 # create path to lock file if it doesn't exist 380 if not txc_handler.is_dir(my_path): 381 txc_handler.makedirs(my_path) 382 383 crippled_uri = EntropyTransceiver.get_uri_name(uri) 384 lock_string = '' 385 386 if dblock: 387 self._entropy._create_local_repository_lockfile(repository_id) 388 lock_file = self._entropy._get_repository_lockfile(repository_id) 389 else: 390 # locking/unlocking mirror1 for download 391 lock_string = _('for download') 392 self._entropy._create_local_repository_download_lockfile( 393 repository_id) 394 lock_file = self._entropy._get_repository_download_lockfile( 395 repository_id) 396 397 remote_path = os.path.join(my_path, os.path.basename(lock_file)) 398 399 rc_lock = txc_handler.lock(remote_path) 400 if rc_lock: 401 if not quiet: 402 self._entropy.output( 403 "[%s|%s] %s %s" % ( 404 blue(repository_id), 405 red(crippled_uri), 406 blue(_("mirror successfully locked")), 407 blue(lock_string), 408 ), 409 importance = 1, 410 level = "info", 411 header = red(" @@ ") 412 ) 413 else: 414 if not quiet: 415 self._entropy.output( 416 "[%s|%s] %s: %s %s" % ( 417 blue(repository_id), 418 red(crippled_uri), 419 blue("lock error"), 420 blue(_("mirror not locked")), 421 blue(lock_string), 422 ), 423 importance = 1, 424 level = "error", 425 header = darkred(" * ") 426 ) 427 self._entropy._remove_local_repository_lockfile(repository_id) 428 429 return rc_lock
430 431
432 - def _do_mirror_unlock(self, repository_id, uri, txc_handler, 433 dblock = True, unlock_locally = True, quiet = False):
434 435 repo_relative = \ 436 self._entropy._get_override_remote_repository_relative_path( 437 repository_id) 438 if repo_relative is None: 439 repo_relative = self._entropy._get_remote_repository_relative_path( 440 repository_id) 441 442 my_path = os.path.join(repo_relative, 443 self._settings['repositories']['branch']) 444 445 crippled_uri = EntropyTransceiver.get_uri_name(uri) 446 447 if dblock: 448 dbfile = etpConst['etpdatabaselockfile'] 449 else: 450 dbfile = etpConst['etpdatabasedownloadlockfile'] 451 452 # make sure 453 remote_path = os.path.join(my_path, os.path.basename(dbfile)) 454 455 if not txc_handler.is_file(remote_path): 456 # once we locked a mirror, we're in a mutually exclusive 457 # region. If we call unlock on a mirror already unlocked 458 # that's fine for our semantics. 459 rc_delete = True 460 else: 461 rc_delete = txc_handler.delete(remote_path) 462 if rc_delete: 463 if not quiet: 464 self._entropy.output( 465 "[%s|%s] %s" % ( 466 blue(repository_id), 467 red(crippled_uri), 468 blue(_("mirror successfully unlocked")), 469 ), 470 importance = 1, 471 level = "info", 472 header = darkgreen(" * ") 473 ) 474 if unlock_locally: 475 if dblock: 476 self._entropy._remove_local_repository_lockfile( 477 repository_id) 478 else: 479 self._entropy._remove_local_repository_download_lockfile( 480 repository_id) 481 else: 482 if not quiet: 483 self._entropy.output( 484 "[%s|%s] %s: %s - %s" % ( 485 blue(repository_id), 486 red(crippled_uri), 487 blue(_("unlock error")), 488 rc_delete, 489 blue(_("mirror not unlocked")), 490 ), 491 importance = 1, 492 level = "error", 493 header = darkred(" * ") 494 ) 495 496 return rc_delete
497
498 - def download_package(self, repository_id, uri, pkg_relative_path):
499 """ 500 Download a package given its mirror uri (uri) and its relative path 501 (pkg_relative_path) on behalf of given repository. 502 503 @param repository_id: repository identifier 504 @type repository_id: string 505 @param uri: mirror uri belonging to given repository identifier 506 @type uri: string 507 @param pkg_relative_path: relative path to package 508 @type pkg_relative_path: string 509 @return: download status, True for success, False for failure 510 @rtype: bool 511 """ 512 crippled_uri = EntropyTransceiver.get_uri_name(uri) 513 514 tries = 0 515 while tries < 5: 516 517 tries += 1 518 txc = self._entropy.Transceiver(uri) 519 with txc as handler: 520 521 self._entropy.output( 522 "[%s|%s|#%s] %s: %s" % ( 523 brown(repository_id), 524 darkgreen(crippled_uri), 525 brown(str(tries)), 526 blue(_("connecting to download package")), 527 darkgreen(pkg_relative_path), 528 ), 529 importance = 1, 530 level = "info", 531 header = darkgreen(" * "), 532 back = True 533 ) 534 535 remote_path = \ 536 self._entropy.complete_remote_package_relative_path( 537 pkg_relative_path, repository_id) 538 download_path = self._entropy.complete_local_package_path( 539 pkg_relative_path, repository_id) 540 541 download_dir = os.path.dirname(download_path) 542 543 self._entropy.output( 544 "[%s|%s|#%s] %s: %s" % ( 545 brown(repository_id), 546 darkgreen(crippled_uri), 547 brown(str(tries)), 548 blue(_("downloading package")), 549 darkgreen(remote_path), 550 ), 551 importance = 1, 552 level = "info", 553 header = darkgreen(" * ") 554 ) 555 556 if not const_dir_readable(download_dir): 557 self._entropy._ensure_dir_path(download_dir) 558 559 rc_download = handler.download(remote_path, download_path) 560 if not rc_download: 561 self._entropy.output( 562 "[%s|%s|#%s] %s: %s %s" % ( 563 brown(repository_id), 564 darkgreen(crippled_uri), 565 brown(str(tries)), 566 blue(_("package")), 567 darkgreen(pkg_relative_path), 568 blue(_("does not exist")), 569 ), 570 importance = 1, 571 level = "error", 572 header = darkred(" !!! ") 573 ) 574 return False 575 576 dbconn = self._entropy.open_server_repository(repository_id, 577 read_only = True, no_upload = True) 578 package_id = dbconn.getPackageIdFromDownload(pkg_relative_path) 579 if package_id == -1: 580 self._entropy.output( 581 "[%s|%s|#%s] %s: %s %s" % ( 582 brown(repository_id), 583 darkgreen(crippled_uri), 584 brown(str(tries)), 585 blue(_("package")), 586 darkgreen(pkg_relative_path), 587 blue(_("is not listed in the repository !")), 588 ), 589 importance = 1, 590 level = "error", 591 header = darkred(" !!! ") 592 ) 593 return False 594 595 storedmd5 = dbconn.retrieveDigest(package_id) 596 self._entropy.output( 597 "[%s|%s|#%s] %s: %s" % ( 598 brown(repository_id), 599 darkgreen(crippled_uri), 600 brown(str(tries)), 601 blue(_("verifying checksum of package")), 602 darkgreen(pkg_relative_path), 603 ), 604 importance = 1, 605 level = "info", 606 header = darkgreen(" * "), 607 back = True 608 ) 609 610 md5check = entropy.tools.compare_md5(download_path, storedmd5) 611 if md5check: 612 self._entropy.output( 613 "[%s|%s|#%s] %s: %s %s" % ( 614 brown(repository_id), 615 darkgreen(crippled_uri), 616 brown(str(tries)), 617 blue(_("package")), 618 darkgreen(pkg_relative_path), 619 blue(_("downloaded successfully")), 620 ), 621 importance = 1, 622 level = "info", 623 header = darkgreen(" * ") 624 ) 625 return True 626 else: 627 self._entropy.output( 628 "[%s|%s|#%s] %s: %s %s" % ( 629 brown(repository_id), 630 darkgreen(crippled_uri), 631 brown(str(tries)), 632 blue(_("package")), 633 darkgreen(pkg_relative_path), 634 blue(_("checksum does not match. re-downloading...")), 635 ), 636 importance = 1, 637 level = "warning", 638 header = darkred(" * ") 639 ) 640 if os.path.isfile(download_path): 641 os.remove(download_path) 642 643 # if we get here it means the files hasn't been downloaded properly 644 self._entropy.output( 645 "[%s|%s|#%s] %s: %s %s" % ( 646 brown(repository_id), 647 darkgreen(crippled_uri), 648 brown(str(tries)), 649 blue(_("package")), 650 darkgreen(pkg_relative_path), 651 blue(_("seems broken. Consider to re-package it. Giving up!")), 652 ), 653 importance = 1, 654 level = "error", 655 header = darkred(" !!! ") 656 ) 657 return False
658
659 - def _get_remote_db_status(self, uri, repo):
660 661 sys_set = self._settings[Server.SYSTEM_SETTINGS_PLG_ID]['server'] 662 db_format = sys_set['database_file_format'] 663 cmethod = etpConst['etpdatabasecompressclasses'].get(db_format) 664 if cmethod is None: 665 raise AttributeError("Wrong repository compression method passed") 666 667 repo_relative = \ 668 self._entropy._get_override_remote_repository_relative_path( 669 repo) 670 if repo_relative is None: 671 repo_relative = self._entropy._get_remote_repository_relative_path( 672 repo) 673 remote_dir = os.path.join(repo_relative, 674 self._settings['repositories']['branch']) 675 676 # let raise exception if connection is impossible 677 txc = self._entropy.Transceiver(uri) 678 with txc as handler: 679 680 compressedfile = etpConst[cmethod[2]] 681 rc1 = handler.is_file(os.path.join(remote_dir, compressedfile)) 682 683 rev_file = self._entropy._get_local_repository_revision_file(repo) 684 revfilename = os.path.basename(rev_file) 685 rc2 = handler.is_file(os.path.join(remote_dir, revfilename)) 686 687 revision = 0 688 if not (rc1 and rc2): 689 return (uri, revision) 690 691 tmp_fd, rev_tmp_path = const_mkstemp(prefix = "entropy.server") 692 try: 693 694 dlcount = 5 695 dled = False 696 while dlcount: 697 remote_rev_path = os.path.join(remote_dir, revfilename) 698 dled = handler.download(remote_rev_path, rev_tmp_path) 699 if dled: 700 break 701 dlcount -= 1 702 703 crippled_uri = EntropyTransceiver.get_uri_name(uri) 704 705 if const_file_readable(rev_tmp_path): 706 707 enc = etpConst['conf_encoding'] 708 with codecs.open(rev_tmp_path, "r", encoding=enc) as f_rev: 709 try: 710 revision = int(f_rev.readline().strip()) 711 except ValueError: 712 mytxt = _("mirror hasn't valid repository revision file") 713 self._entropy.output( 714 "[%s|%s] %s: %s" % ( 715 brown(repo), 716 darkgreen(crippled_uri), 717 blue(mytxt), 718 bold(revision), 719 ), 720 importance = 1, 721 level = "error", 722 header = darkred(" !!! ") 723 ) 724 revision = 0 725 726 elif dlcount == 0: 727 self._entropy.output( 728 "[%s|%s] %s: %s" % ( 729 brown(repo), 730 darkgreen(crippled_uri), 731 blue(_("unable to download repository revision")), 732 bold(revision), 733 ), 734 importance = 1, 735 level = "error", 736 header = darkred(" !!! ") 737 ) 738 revision = 0 739 740 else: 741 self._entropy.output( 742 "[%s|%s] %s: %s" % ( 743 brown(repo), 744 darkgreen(crippled_uri), 745 blue(_("mirror doesn't have valid revision file")), 746 bold(revision), 747 ), 748 importance = 1, 749 level = "error", 750 header = darkred(" !!! ") 751 ) 752 revision = 0 753 754 finally: 755 os.close(tmp_fd) 756 os.remove(rev_tmp_path) 757 758 return (uri, revision)
759
760 - def remote_repository_status(self, repository_id):
761 """ 762 Return the repository status (revision) for every available mirror. 763 764 @param repository_id: repository identifier 765 @type repository_id: string 766 @return: dictionary, mirror URL (not URI) as key, revision as value 767 (int) 768 @rtype: dict 769 """ 770 return dict(self._get_remote_db_status(uri, repository_id) for uri in \ 771 self._entropy.remote_repository_mirrors(repository_id))
772
773 - def mirrors_status(self, repository_id):
774 """ 775 Return mirrors status for given repository identifier. 776 777 @param repository_id: repository identifier 778 @type repository_id: string 779 @return: list of tuples of length 3 780 [(uri, upload_lock_status_bool, download_lock_status_bool)] 781 @rtype: list 782 """ 783 dbstatus = [] 784 repo_relative = \ 785 self._entropy._get_override_remote_repository_relative_path( 786 repository_id) 787 if repo_relative is None: 788 repo_relative = self._entropy._get_remote_repository_relative_path( 789 repository_id) 790 remote_dir = os.path.join(repo_relative, 791 self._settings['repositories']['branch']) 792 lock_file = os.path.join(remote_dir, etpConst['etpdatabaselockfile']) 793 down_lock_file = os.path.join(remote_dir, 794 etpConst['etpdatabasedownloadlockfile']) 795 796 for uri in self._entropy.remote_repository_mirrors(repository_id): 797 down_status = False 798 up_status = False 799 800 # let raise exception if connection is impossible 801 txc = self._entropy.Transceiver(uri) 802 with txc as handler: 803 if handler.is_file(lock_file): 804 # upload locked 805 up_status = True 806 if handler.is_file(down_lock_file): 807 # download locked 808 down_status = True 809 dbstatus.append((uri, up_status, down_status)) 810 811 return dbstatus
812
813 - def mirror_locked(self, repository_id, uri):
814 """ 815 Return whether mirror is locked. 816 817 @param repository_id: the repository identifier 818 @type repository_id: string 819 @param uri: mirror uri, as listed in repository metadata 820 @type uri: string 821 @return: True, if mirror is locked 822 @rtype: bool 823 """ 824 gave_up = False 825 826 lock_file = self._entropy._get_repository_lockfile(repository_id) 827 lock_filename = os.path.basename(lock_file) 828 829 repo_relative = \ 830 self._entropy._get_override_remote_repository_relative_path( 831 repository_id) 832 if repo_relative is None: 833 repo_relative = self._entropy._get_remote_repository_relative_path( 834 repository_id) 835 836 remote_dir = os.path.join(repo_relative, 837 self._settings['repositories']['branch']) 838 remote_lock_file = os.path.join(remote_dir, lock_filename) 839 840 txc = self._entropy.Transceiver(uri) 841 with txc as handler: 842 843 if not os.path.isfile(lock_file) and \ 844 handler.is_file(remote_lock_file): 845 846 crippled_uri = EntropyTransceiver.get_uri_name(uri) 847 self._entropy.output( 848 "[%s|%s|%s] %s, %s" % ( 849 brown(str(repository_id)), 850 darkgreen(crippled_uri), 851 red(_("locking")), 852 darkblue(_("mirror already locked")), 853 blue(_("waiting up to 2 minutes before giving up")), 854 ), 855 importance = 1, 856 level = "warning", 857 header = brown(" * "), 858 back = True 859 ) 860 861 unlocked = False 862 count = 0 863 while count < 120: 864 count += 1 865 time.sleep(1) 866 if not handler.is_file(remote_lock_file): 867 self._entropy.output( 868 red("[%s|%s|%s] %s !" % ( 869 repository_id, 870 crippled_uri, 871 _("locking"), 872 _("mirror unlocked"), 873 ) 874 ), 875 importance = 1, 876 level = "info", 877 header = darkgreen(" * ") 878 ) 879 unlocked = True 880 break 881 882 if not unlocked: 883 gave_up = True 884 885 return gave_up
886
887 - def _calculate_local_upload_files(self, repository_id):
888 889 upload_dir = self._entropy._get_local_upload_directory(repository_id) 890 891 # check if it exists 892 if not os.path.isdir(upload_dir): 893 return set() 894 895 branch = self._settings['repositories']['branch'] 896 upload_packages = self._entropy._get_basedir_pkg_listing( 897 upload_dir, etpConst['packagesext'], branch = branch) 898 899 return set(upload_packages)
900
901 - def _calculate_local_package_files(self, repository_id, weak_files = False):
902 903 base_dir = self._entropy._get_local_repository_base_directory( 904 repository_id) 905 906 # check if it exists 907 if not os.path.isdir(base_dir): 908 return set() 909 910 branch = self._settings['repositories']['branch'] 911 pkg_ext = etpConst['packagesext'] 912 913 pkg_files = set(self._entropy._get_basedir_pkg_listing( 914 base_dir, pkg_ext, branch = branch)) 915 916 weak_ext = etpConst['packagesweakfileext'] 917 weak_ext_len = len(weak_ext) 918 weak_pkg_ext = pkg_ext + weak_ext 919 920 def _map_weak_ext(path): 921 return path[:-weak_ext_len]
922 923 if weak_files: 924 pkg_files |= set( 925 map( 926 _map_weak_ext, 927 self._entropy._get_basedir_pkg_listing( 928 base_dir, 929 weak_pkg_ext, 930 branch = branch)) 931 ) 932 933 return pkg_files
934
935 - def _show_local_sync_stats(self, upload_files, local_files):
936 self._entropy.output( 937 "%s:" % ( 938 blue(_("Local statistics")), 939 ), 940 importance = 1, 941 level = "info", 942 header = red(" @@ ") 943 ) 944 self._entropy.output( 945 red("%s: %s %s" % ( 946 blue(_("upload directory")), 947 bold(str(upload_files)), 948 red(_("files ready")), 949 ) 950 ), 951 importance = 0, 952 level = "info", 953 header = red(" @@ ") 954 ) 955 self._entropy.output( 956 red("%s: %s %s" % ( 957 blue(_("packages directory")), 958 bold(str(local_files)), 959 red(_("files ready")), 960 ) 961 ), 962 importance = 0, 963 level = "info", 964 header = red(" @@ ") 965 )
966
967 - def _show_sync_queues(self, upload, download, removal, copy, metainfo):
968 969 branch = self._settings['repositories']['branch'] 970 971 # show stats 972 for package, rel_pkg, size in upload: 973 package = darkgreen(rel_pkg) 974 size = blue(entropy.tools.bytes_into_human(size)) 975 self._entropy.output( 976 "[%s|%s] %s [%s]" % ( 977 brown(branch), 978 blue(_("upload")), 979 darkgreen(package), 980 size, 981 ), 982 importance = 0, 983 level = "info", 984 header = red(" # ") 985 ) 986 key_sorter = lambda x: x[1] 987 988 for package, rel_pkg, size in sorted(download, key = key_sorter): 989 package = darkred(rel_pkg) 990 size = blue(entropy.tools.bytes_into_human(size)) 991 self._entropy.output( 992 "[%s|%s] %s [%s]" % ( 993 brown(branch), 994 darkred(_("download")), 995 blue(package), 996 size, 997 ), 998 importance = 0, 999 level = "info", 1000 header = red(" # ") 1001 ) 1002 for package, rel_pkg, size in sorted(copy, key = key_sorter): 1003 package = darkblue(rel_pkg) 1004 size = blue(entropy.tools.bytes_into_human(size)) 1005 self._entropy.output( 1006 "[%s|%s] %s [%s]" % ( 1007 brown(branch), 1008 darkgreen(_("copy")), 1009 brown(package), 1010 size, 1011 ), 1012 importance = 0, 1013 level = "info", 1014 header = red(" # ") 1015 ) 1016 for package, rel_pkg, size in sorted(removal, key = key_sorter): 1017 package = brown(rel_pkg) 1018 size = blue(entropy.tools.bytes_into_human(size)) 1019 self._entropy.output( 1020 "[%s|%s] %s [%s]" % ( 1021 brown(branch), 1022 red(_("remove")), 1023 red(package), 1024 size, 1025 ), 1026 importance = 0, 1027 level = "info", 1028 header = red(" # ") 1029 ) 1030 1031 self._entropy.output( 1032 "%s: %s" % ( 1033 blue(_("Packages to be removed")), 1034 darkred(str(len(removal))), 1035 ), 1036 importance = 0, 1037 level = "info", 1038 header = blue(" @@ ") 1039 ) 1040 self._entropy.output( 1041 "%s: %s" % ( 1042 darkgreen(_("Packages to be moved locally")), 1043 darkgreen(str(len(copy))), 1044 ), 1045 importance = 0, 1046 level = "info", 1047 header = blue(" @@ ") 1048 ) 1049 self._entropy.output( 1050 "%s: %s" % ( 1051 brown(_("Packages to be downloaded")), 1052 brown(str(len(download))), 1053 ), 1054 importance = 0, 1055 level = "info", 1056 header = blue(" @@ ") 1057 ) 1058 self._entropy.output( 1059 "%s: %s" % ( 1060 bold(_("Packages to be uploaded")), 1061 bold(str(len(upload))), 1062 ), 1063 importance = 0, 1064 level = "info", 1065 header = blue(" @@ ") 1066 ) 1067 1068 self._entropy.output( 1069 "%s: %s" % ( 1070 darkred(_("Total removal size")), 1071 darkred( 1072 entropy.tools.bytes_into_human(metainfo['removal']) 1073 ), 1074 ), 1075 importance = 0, 1076 level = "info", 1077 header = blue(" @@ ") 1078 ) 1079 1080 self._entropy.output( 1081 "%s: %s" % ( 1082 blue(_("Total upload size")), 1083 blue(entropy.tools.bytes_into_human(metainfo['upload'])), 1084 ), 1085 importance = 0, 1086 level = "info", 1087 header = blue(" @@ ") 1088 ) 1089 self._entropy.output( 1090 "%s: %s" % ( 1091 brown(_("Total download size")), 1092 brown(entropy.tools.bytes_into_human(metainfo['download'])), 1093 ), 1094 importance = 0, 1095 level = "info", 1096 header = blue(" @@ ") 1097 )
1098
1099 - def _calculate_remote_package_files(self, repository_id, uri, txc_handler):
1100 1101 remote_packages_data = {} 1102 remote_packages = [] 1103 branch = self._settings['repositories']['branch'] 1104 fifo_q = Queue() 1105 1106 def get_content(lookup_dir): 1107 only_dir = self._entropy.complete_remote_package_relative_path( 1108 "", repository_id) 1109 db_url_dir = lookup_dir[len(only_dir):] 1110 1111 # create path to lock file if it doesn't exist 1112 if not txc_handler.is_dir(lookup_dir): 1113 txc_handler.makedirs(lookup_dir) 1114 1115 info = txc_handler.list_content_metadata(lookup_dir) 1116 1117 dirs = [] 1118 for path, size, user, group, perms in info: 1119 1120 if perms.startswith("d"): 1121 fifo_q.put(os.path.join(lookup_dir, path)) 1122 else: 1123 rel_path = os.path.join(db_url_dir, path) 1124 remote_packages.append(rel_path) 1125 remote_packages_data[rel_path] = int(size)
1126 1127 # initialize the queue 1128 pkgs_dir_types = self._entropy._get_pkg_dir_names() 1129 for pkg_dir_type in pkgs_dir_types: 1130 1131 remote_dir = self._entropy.complete_remote_package_relative_path( 1132 pkg_dir_type, repository_id) 1133 remote_dir = os.path.join(remote_dir, etpConst['currentarch'], 1134 branch) 1135 1136 fifo_q.put(remote_dir) 1137 1138 while not fifo_q.empty(): 1139 get_content(fifo_q.get()) 1140 1141 return remote_packages, remote_packages_data 1142
1143 - def _calculate_packages_to_sync(self, repository_id, uri):
1144 1145 crippled_uri = EntropyTransceiver.get_uri_name(uri) 1146 upload_packages = self._calculate_local_upload_files( 1147 repository_id) 1148 local_packages = self._calculate_local_package_files( 1149 repository_id, weak_files = True) 1150 self._show_local_sync_stats( 1151 len(upload_packages), len(local_packages)) 1152 1153 self._entropy.output( 1154 "%s: %s" % (blue(_("Remote statistics for")), red(crippled_uri),), 1155 importance = 1, 1156 level = "info", 1157 header = red(" @@ ") 1158 ) 1159 1160 txc = self._entropy.Transceiver(uri) 1161 with txc as handler: 1162 (remote_packages, 1163 remote_packages_data) = self._calculate_remote_package_files( 1164 repository_id, uri, handler) 1165 1166 self._entropy.output( 1167 "%s: %s %s" % ( 1168 blue(_("remote packages")), 1169 bold("%d" % (len(remote_packages),)), 1170 red(_("files stored")), 1171 ), 1172 importance = 0, 1173 level = "info", 1174 header = red(" @@ ") 1175 ) 1176 1177 mytxt = blue("%s ...") % ( 1178 _("Calculating queues"), 1179 ) 1180 self._entropy.output( 1181 mytxt, 1182 importance = 1, 1183 level = "info", 1184 header = red(" @@ ") 1185 ) 1186 1187 upload_queue, download_queue, removal_queue, fine_queue = \ 1188 self._calculate_sync_queues(repository_id, upload_packages, 1189 local_packages, remote_packages, remote_packages_data) 1190 return upload_queue, download_queue, removal_queue, fine_queue, \ 1191 remote_packages_data
1192
1193 - def _calculate_sync_queues(self, repository_id, upload_packages, 1194 local_packages, remote_packages, remote_packages_data):
1195 1196 upload_queue = set() 1197 extra_upload_queue = set() 1198 download_queue = set() 1199 extra_download_queue = set() 1200 removal_queue = set() 1201 fine_queue = set() 1202 branch = self._settings['repositories']['branch'] 1203 pkg_ext = etpConst['packagesext'] 1204 1205 def _account_extra_packages(local_package, queue): 1206 repo = self._entropy.open_repository(repository_id) 1207 package_id = repo.getPackageIdFromDownload(local_package) 1208 # NOTE: package_id can be == -1 because there might have been 1209 # some packages in the queues that have been bumped more than 1210 # once, thus, not available in repository. 1211 if package_id != -1: 1212 extra_downloads = repo.retrieveExtraDownload(package_id) 1213 for extra_download in extra_downloads: 1214 queue.add(extra_download['download'])
1215 1216 for local_package in upload_packages: 1217 1218 if not local_package.endswith(pkg_ext): 1219 continue 1220 1221 if local_package in remote_packages: 1222 1223 local_filepath = \ 1224 self._entropy.complete_local_upload_package_path( 1225 local_package, repository_id) 1226 1227 local_size = entropy.tools.get_file_size(local_filepath) 1228 remote_size = remote_packages_data.get(local_package) 1229 if remote_size is None: 1230 remote_size = 0 1231 if local_size != remote_size: 1232 # size does not match, adding to the upload queue 1233 upload_queue.add(local_package) 1234 _account_extra_packages(local_package, extra_upload_queue) 1235 else: 1236 # just move from upload to packages 1237 fine_queue.add(local_package) 1238 1239 else: 1240 # always force upload of packages in uploaddir 1241 upload_queue.add(local_package) 1242 _account_extra_packages(local_package, extra_upload_queue) 1243 1244 # if a package is in the packages directory but not online, 1245 # we have to upload it we have local_packages and remote_packages 1246 for local_package in local_packages: 1247 1248 if not local_package.endswith(pkg_ext): 1249 continue 1250 # ignore file if its .weak alter-ego exists 1251 if self._weaken_file_exists(repository_id, local_package): 1252 continue 1253 1254 if local_package in remote_packages: 1255 local_filepath = self._entropy.complete_local_package_path( 1256 local_package, repository_id) 1257 local_size = entropy.tools.get_file_size(local_filepath) 1258 remote_size = remote_packages_data.get(local_package) 1259 if remote_size is None: 1260 remote_size = 0 1261 if (local_size != remote_size) and (local_size != 0): 1262 # size does not match, adding to the upload queue 1263 if local_package not in fine_queue: 1264 upload_queue.add(local_package) 1265 _account_extra_packages(local_package, 1266 extra_upload_queue) 1267 else: 1268 # this means that the local package does not exist 1269 # so, we need to download it 1270 upload_queue.add(local_package) 1271 _account_extra_packages(local_package, extra_upload_queue) 1272 1273 # Fill download_queue and removal_queue 1274 for remote_package in remote_packages: 1275 1276 if not remote_package.endswith(pkg_ext): 1277 continue 1278 1279 if remote_package in local_packages: 1280 1281 # ignore file if its .weak alter-ego exists 1282 if self._weaken_file_exists(repository_id, remote_package): 1283 continue 1284 1285 local_filepath = self._entropy.complete_local_package_path( 1286 remote_package, repository_id) 1287 local_size = entropy.tools.get_file_size(local_filepath) 1288 remote_size = remote_packages_data.get(remote_package) 1289 if remote_size is None: 1290 remote_size = 0 1291 if (local_size != remote_size) and (local_size != 0): 1292 # size does not match, remove first 1293 # do it only if the package has not been 1294 # added to the upload_queue 1295 if remote_package not in upload_queue: 1296 # remotePackage == localPackage 1297 # just remove something that differs 1298 # from the content of the mirror 1299 removal_queue.add(remote_package) 1300 # then add to the download queue 1301 download_queue.add(remote_package) 1302 _account_extra_packages(remote_package, 1303 extra_download_queue) 1304 else: 1305 # this means that the local package does not exist 1306 # so, we need to download it 1307 # ignore .tmp files 1308 if not remote_package.endswith( 1309 EntropyUriHandler.TMP_TXC_FILE_EXT): 1310 download_queue.add(remote_package) 1311 _account_extra_packages(remote_package, 1312 extra_download_queue) 1313 1314 # Collect packages that don't exist anymore in the database 1315 # so we can filter them out from the download queue 1316 dbconn = self._entropy.open_server_repository(repository_id, 1317 just_reading = True) 1318 db_files = dbconn.listAllDownloads(do_sort = False, 1319 full_path = True) 1320 db_files = set([x for x in db_files if \ 1321 (self._entropy._get_branch_from_download_relative_uri(x) == branch)]) 1322 1323 """ 1324 ### actually do not exclude files not available locally. This makes 1325 ### possible to repair broken tidy runs, downloading a pkg again 1326 ### makes it get flagged as expired afterwards 1327 exclude = set() 1328 for myfile in download_queue: 1329 if myfile.endswith(etpConst['packagesext']): 1330 if myfile not in db_files: 1331 exclude.add(myfile) 1332 download_queue -= exclude 1333 """ 1334 1335 # filter out packages not in our repository 1336 upload_queue = set([x for x in upload_queue if x in db_files]) 1337 # filter out weird moves, packages set for upload should not 1338 # be downloaded 1339 download_queue = set([x for x in download_queue if x not in \ 1340 upload_queue]) 1341 upload_queue |= extra_upload_queue 1342 download_queue |= extra_download_queue 1343 1344 return upload_queue, download_queue, removal_queue, fine_queue 1345
1346 - def _expand_queues(self, upload_queue, download_queue, removal_queue, 1347 remote_packages_data, repo):
1348 1349 metainfo = { 1350 'removal': 0, 1351 'download': 0, 1352 'upload': 0, 1353 } 1354 removal = [] 1355 download = [] 1356 do_copy = [] 1357 upload = [] 1358 1359 for item in removal_queue: 1360 local_filepath = self._entropy.complete_local_package_path( 1361 item, repo) 1362 size = entropy.tools.get_file_size(local_filepath) 1363 metainfo['removal'] += size 1364 removal.append((local_filepath, item, size)) 1365 1366 for item in download_queue: 1367 1368 local_filepath = self._entropy.complete_local_upload_package_path( 1369 item, repo) 1370 if not os.path.isfile(local_filepath): 1371 size = remote_packages_data.get(item) 1372 if size is None: 1373 size = 0 1374 size = int(size) 1375 metainfo['removal'] += size 1376 download.append((local_filepath, item, size)) 1377 else: 1378 size = entropy.tools.get_file_size(local_filepath) 1379 do_copy.append((local_filepath, item, size)) 1380 1381 for item in upload_queue: 1382 1383 local_filepath = self._entropy.complete_local_upload_package_path( 1384 item, repo) 1385 1386 local_filepath_pkgs = self._entropy.complete_local_package_path( 1387 item, repo) 1388 if os.path.isfile(local_filepath): 1389 size = entropy.tools.get_file_size(local_filepath) 1390 upload.append((local_filepath, item, size)) 1391 else: 1392 size = entropy.tools.get_file_size(local_filepath_pkgs) 1393 upload.append((local_filepath_pkgs, item, size)) 1394 metainfo['upload'] += size 1395 1396 return upload, download, removal, do_copy, metainfo
1397
1398 - def _sync_run_removal_queue(self, repository_id, removal_queue):
1399 1400 branch = self._settings['repositories']['branch'] 1401 1402 for remove_filepath, rel_path, size in removal_queue: 1403 1404 remove_filename = os.path.basename(remove_filepath) 1405 remove_filepath_exp = remove_filepath + \ 1406 etpConst['packagesexpirationfileext'] 1407 1408 self._entropy.output( 1409 "[%s|%s|%s] %s: %s [%s]" % ( 1410 brown(repository_id), 1411 red("sync"), 1412 brown(branch), 1413 blue(_("removing package+hash")), 1414 darkgreen(remove_filename), 1415 blue(entropy.tools.bytes_into_human(size)), 1416 ), 1417 importance = 0, 1418 level = "info", 1419 header = darkred(" * ") 1420 ) 1421 1422 if os.path.isfile(remove_filepath): 1423 os.remove(remove_filepath) 1424 if os.path.isfile(remove_filepath_exp): 1425 os.remove(remove_filepath_exp) 1426 1427 self._entropy.output( 1428 "[%s|%s|%s] %s" % ( 1429 brown(repository_id), 1430 red(_("sync")), 1431 brown(branch), 1432 blue(_("removal complete")), 1433 ), 1434 importance = 0, 1435 level = "info", 1436 header = darkred(" * ") 1437 )
1438 1439
1440 - def _sync_run_copy_queue(self, repository_id, copy_queue):
1441 1442 branch = self._settings['repositories']['branch'] 1443 for from_file, rel_file, size in copy_queue: 1444 1445 to_file = self._entropy.complete_local_package_path(rel_file, 1446 repository_id) 1447 expiration_file = to_file+etpConst['packagesexpirationfileext'] 1448 1449 self._entropy.output( 1450 "[%s|%s|%s] %s: %s" % ( 1451 brown(repository_id), 1452 red("sync"), 1453 brown(branch), 1454 blue(_("copying file+hash to repository")), 1455 darkgreen(from_file), 1456 ), 1457 importance = 0, 1458 level = "info", 1459 header = darkred(" * ") 1460 ) 1461 self._entropy._ensure_dir_path(os.path.dirname(to_file)) 1462 1463 shutil.copy2(from_file, to_file) 1464 1465 # clear expiration file 1466 if os.path.isfile(expiration_file): 1467 os.remove(expiration_file)
1468 1469
1470 - def _sync_run_upload_queue(self, repository_id, uri, upload_queue):
1471 1472 branch = self._settings['repositories']['branch'] 1473 crippled_uri = EntropyTransceiver.get_uri_name(uri) 1474 queue_map = {} 1475 1476 for upload_path, rel_path, size in upload_queue: 1477 rel_dir = os.path.dirname(rel_path) 1478 obj = queue_map.setdefault(rel_dir, []) 1479 obj.append(upload_path) 1480 1481 errors = False 1482 m_fine_uris = set() 1483 m_broken_uris = set() 1484 for rel_path, myqueue in queue_map.items(): 1485 1486 remote_dir = self._entropy.complete_remote_package_relative_path( 1487 rel_path, repository_id) 1488 1489 handlers_data = { 1490 'branch': branch, 1491 'download': rel_path, 1492 } 1493 uploader = self.TransceiverServerHandler(self._entropy, [uri], 1494 myqueue, critical_files = myqueue, 1495 txc_basedir = remote_dir, copy_herustic_support = True, 1496 handlers_data = handlers_data, repo = repository_id) 1497 1498 xerrors, xm_fine_uris, xm_broken_uris = uploader.go() 1499 if xerrors: 1500 errors = True 1501 m_fine_uris.update(xm_fine_uris) 1502 m_broken_uris.update(xm_broken_uris) 1503 1504 if errors: 1505 my_broken_uris = [ 1506 (EntropyTransceiver.get_uri_name(x_uri), x_uri_rc) for \ 1507 x_uri, x_uri_rc in m_broken_uris] 1508 reason = my_broken_uris[0][1] 1509 self._entropy.output( 1510 "[%s] %s: %s, %s: %s" % ( 1511 brown(branch), 1512 blue(_("upload errors")), 1513 red(crippled_uri), 1514 blue(_("reason")), 1515 darkgreen(repr(reason)), 1516 ), 1517 importance = 1, 1518 level = "error", 1519 header = darkred(" !!! ") 1520 ) 1521 return errors, m_fine_uris, m_broken_uris 1522 1523 self._entropy.output( 1524 "[%s] %s: %s" % ( 1525 brown(branch), 1526 blue(_("upload completed successfully")), 1527 red(crippled_uri), 1528 ), 1529 importance = 1, 1530 level = "info", 1531 header = blue(" @@ ") 1532 ) 1533 return errors, m_fine_uris, m_broken_uris
1534 1535
1536 - def _sync_run_download_queue(self, repository_id, uri, download_queue):
1537 1538 branch = self._settings['repositories']['branch'] 1539 crippled_uri = EntropyTransceiver.get_uri_name(uri) 1540 queue_map = {} 1541 1542 for download_path, rel_path, size in download_queue: 1543 rel_dir = os.path.dirname(rel_path) 1544 obj = queue_map.setdefault(rel_dir, []) 1545 obj.append(download_path) 1546 1547 errors = False 1548 m_fine_uris = set() 1549 m_broken_uris = set() 1550 for rel_path, myqueue in queue_map.items(): 1551 1552 remote_dir = self._entropy.complete_remote_package_relative_path( 1553 rel_path, repository_id) 1554 1555 local_basedir = self._entropy.complete_local_package_path(rel_path, 1556 repository_id) 1557 if not os.path.isdir(local_basedir): 1558 self._entropy._ensure_dir_path(local_basedir) 1559 1560 handlers_data = { 1561 'branch': branch, 1562 'download': rel_path, 1563 } 1564 downloader = self.TransceiverServerHandler( 1565 self._entropy, [uri], myqueue, 1566 critical_files = myqueue, 1567 txc_basedir = remote_dir, local_basedir = local_basedir, 1568 handlers_data = handlers_data, download = True, 1569 repo = repository_id) 1570 1571 xerrors, xm_fine_uris, xm_broken_uris = downloader.go() 1572 if xerrors: 1573 errors = True 1574 m_fine_uris.update(xm_fine_uris) 1575 m_broken_uris.update(xm_broken_uris) 1576 1577 if errors: 1578 my_broken_uris = [ 1579 (EntropyTransceiver.get_uri_name(x_uri), x_uri_rc,) \ 1580 for x_uri, x_uri_rc in m_broken_uris] 1581 reason = my_broken_uris[0][1] 1582 self._entropy.output( 1583 "[%s|%s|%s] %s: %s, %s: %s" % ( 1584 brown(repository_id), 1585 red(_("sync")), 1586 brown(branch), 1587 blue(_("download errors")), 1588 darkgreen(crippled_uri), 1589 blue(_("reason")), 1590 reason, 1591 ), 1592 importance = 1, 1593 level = "error", 1594 header = darkred(" !!! ") 1595 ) 1596 return errors, m_fine_uris, m_broken_uris 1597 1598 self._entropy.output( 1599 "[%s|%s|%s] %s: %s" % ( 1600 brown(repository_id), 1601 red(_("sync")), 1602 brown(branch), 1603 blue(_("download completed successfully")), 1604 darkgreen(crippled_uri), 1605 ), 1606 importance = 1, 1607 level = "info", 1608 header = darkgreen(" * ") 1609 ) 1610 return errors, m_fine_uris, m_broken_uris
1611
1612 - def _run_package_files_qa_checks(self, repository_id, packages_list):
1613 1614 my_qa = self._entropy.QA() 1615 qa_total = len(packages_list) 1616 qa_count = 0 1617 qa_some_faulty = [] 1618 1619 for upload_package in packages_list: 1620 qa_count += 1 1621 1622 self._entropy.output( 1623 "%s: %s" % ( 1624 purple(_("QA checking package file")), 1625 darkgreen(os.path.basename(upload_package)), 1626 ), 1627 importance = 0, 1628 level = "info", 1629 header = purple(" @@ "), 1630 back = True, 1631 count = (qa_count, qa_total,) 1632 ) 1633 result = my_qa.entropy_package_checks(upload_package) 1634 if not result: 1635 qa_some_faulty.append(os.path.basename(upload_package)) 1636 1637 if qa_some_faulty: 1638 1639 for qa_faulty_pkg in qa_some_faulty: 1640 self._entropy.output( 1641 "[%s|%s] %s: %s" % ( 1642 brown(repository_id), 1643 self._settings['repositories']['branch'], 1644 red(_("faulty package file, please fix")), 1645 blue(os.path.basename(qa_faulty_pkg)), 1646 ), 1647 importance = 1, 1648 level = "error", 1649 header = darkred(" @@ ") 1650 ) 1651 raise EntropyPackageException( 1652 'EntropyPackageException: cannot continue')
1653
1654 - def sync_repository(self, repository_id, enable_upload = True, 1655 enable_download = True, force = False):
1656 """ 1657 Synchronize the given repository identifier. 1658 1659 @param repository_id: repository identifier 1660 @type repository_id: string 1661 @keyword enable_upload: enable upload in case it's required to push 1662 the repository remotely 1663 @type enable_upload: bool 1664 @keyword enable_download: enable download in case it's required to 1665 pull the repository remotely 1666 @type enable_download: bool 1667 @keyword force: force the repository push in case of QA errors 1668 @type force: bool 1669 @return: status code, 0 means all fine, non zero values mean error 1670 @rtype: int 1671 """ 1672 return ServerPackagesRepository.update(self._entropy, repository_id, 1673 enable_upload, enable_download, force = force)
1674
1675 - def sync_packages(self, repository_id, ask = True, pretend = False, 1676 packages_check = False):
1677 """ 1678 Synchronize packages in given repository, uploading, downloading, 1679 removing them. If changes were made locally, this function will do 1680 all the duties required to update the remote mirrors. 1681 1682 @param repository_id: repository identifier 1683 @type repository_id: string 1684 @keyword ask: be interactive and ask user for confirmation 1685 @type ask: bool 1686 @keyword pretend: just execute without effectively change anything on 1687 mirrors 1688 @type pretend: bool 1689 @keyword packages_check: verify local packages after the sync. 1690 @type packages_check: bool 1691 @return: tuple composed by (mirrors_tainted (bool), mirror_errors(bool), 1692 successfull_mirrors (list), broken_mirrors (list), check_data (dict)) 1693 @rtype: tuple 1694 @todo: improve return data documentation 1695 """ 1696 self._entropy.output( 1697 "[%s|%s] %s" % ( 1698 repository_id, 1699 red(_("sync")), 1700 darkgreen(_("starting packages sync")), 1701 ), 1702 importance = 1, 1703 level = "info", 1704 header = red(" @@ "), 1705 back = True 1706 ) 1707 1708 successfull_mirrors = set() 1709 broken_mirrors = set() 1710 check_data = () 1711 upload_queue_qa_checked = set() 1712 mirrors_tainted = False 1713 mirror_errors = False 1714 mirrors_errors = False 1715 1716 for uri in self._entropy.remote_packages_mirrors(repository_id): 1717 1718 crippled_uri = EntropyTransceiver.get_uri_name(uri) 1719 mirror_errors = False 1720 1721 self._entropy.output( 1722 "[%s|%s|%s] %s: %s" % ( 1723 repository_id, 1724 red(_("sync")), 1725 brown(self._settings['repositories']['branch']), 1726 blue(_("packages sync")), 1727 bold(crippled_uri), 1728 ), 1729 importance = 1, 1730 level = "info", 1731 header = red(" @@ ") 1732 ) 1733 1734 try: 1735 upload_queue, download_queue, removal_queue, fine_queue, \ 1736 remote_packages_data = self._calculate_packages_to_sync( 1737 repository_id, uri) 1738 except socket.error as err: 1739 self._entropy.output( 1740 "[%s|%s|%s] %s: %s, %s %s" % ( 1741 repository_id, 1742 red(_("sync")), 1743 self._settings['repositories']['branch'], 1744 darkred(_("socket error")), 1745 err, 1746 darkred(_("on")), 1747 crippled_uri, 1748 ), 1749 importance = 1, 1750 level = "error", 1751 header = darkgreen(" * ") 1752 ) 1753 continue 1754 1755 if (not upload_queue) and (not download_queue) and \ 1756 (not removal_queue): 1757 self._entropy.output( 1758 "[%s|%s|%s] %s: %s" % ( 1759 repository_id, 1760 red(_("sync")), 1761 self._settings['repositories']['branch'], 1762 darkgreen(_("nothing to do on")), 1763 crippled_uri, 1764 ), 1765 importance = 1, 1766 level = "info", 1767 header = darkgreen(" * ") 1768 ) 1769 successfull_mirrors.add(uri) 1770 continue 1771 1772 self._entropy.output( 1773 "%s:" % (blue(_("Expanding queues")),), 1774 importance = 1, 1775 level = "info", 1776 header = red(" ** ") 1777 ) 1778 1779 upload, download, removal, copy_q, metainfo = self._expand_queues( 1780 upload_queue, download_queue, removal_queue, 1781 remote_packages_data, repository_id) 1782 del upload_queue, download_queue, removal_queue, \ 1783 remote_packages_data 1784 1785 self._show_sync_queues(upload, download, removal, copy_q, metainfo) 1786 1787 if not len(upload)+len(download)+len(removal)+len(copy_q): 1788 1789 self._entropy.output( 1790 "[%s|%s|%s] %s %s" % ( 1791 repository_id, 1792 red(_("sync")), 1793 self._settings['repositories']['branch'], 1794 blue(_("nothing to sync for")), 1795 crippled_uri, 1796 ), 1797 importance = 1, 1798 level = "info", 1799 header = darkgreen(" @@ ") 1800 ) 1801 1802 successfull_mirrors.add(uri) 1803 continue 1804 1805 if pretend: 1806 successfull_mirrors.add(uri) 1807 continue 1808 1809 if ask: 1810 rc_sync = self._entropy.ask_question( 1811 _("Would you like to run the steps above ?")) 1812 if rc_sync == _("No"): 1813 continue 1814 1815 try: 1816 1817 # QA checks 1818 pkg_ext = etpConst['packagesext'] 1819 qa_package_files = [x[0] for x in upload if (x[0] \ 1820 not in upload_queue_qa_checked) and x[0].endswith(pkg_ext)] 1821 upload_queue_qa_checked |= set(qa_package_files) 1822 1823 self._run_package_files_qa_checks(repository_id, 1824 qa_package_files) 1825 1826 if removal: 1827 self._sync_run_removal_queue(repository_id, removal) 1828 1829 if copy_q: 1830 self._sync_run_copy_queue(repository_id, copy_q) 1831 1832 if upload: 1833 mirrors_tainted = True 1834 1835 if upload: 1836 d_errors, m_fine_uris, \ 1837 m_broken_uris = self._sync_run_upload_queue( 1838 repository_id, uri, upload) 1839 1840 if d_errors: 1841 mirror_errors = True 1842 1843 if download: 1844 d_errors, m_fine_uris, \ 1845 m_broken_uris = self._sync_run_download_queue( 1846 repository_id, uri, download) 1847 1848 if d_errors: 1849 mirror_errors = True 1850 if not mirror_errors: 1851 successfull_mirrors.add(uri) 1852 else: 1853 mirrors_errors = True 1854 1855 except KeyboardInterrupt: 1856 self._entropy.output( 1857 "[%s|%s|%s] %s" % ( 1858 repository_id, 1859 red(_("sync")), 1860 self._settings['repositories']['branch'], 1861 darkgreen(_("keyboard interrupt !")), 1862 ), 1863 importance = 1, 1864 level = "info", 1865 header = darkgreen(" * ") 1866 ) 1867 continue 1868 1869 except EntropyPackageException as err: 1870 1871 mirrors_errors = True 1872 broken_mirrors.add(uri) 1873 successfull_mirrors.clear() 1874 # so that people will realize this is a very bad thing 1875 self._entropy.output( 1876 "[%s|%s|%s] %s: %s, %s: %s" % ( 1877 repository_id, 1878 red(_("sync")), 1879 self._settings['repositories']['branch'], 1880 darkred(_("you must package them again")), 1881 EntropyPackageException, 1882 _("error"), 1883 err, 1884 ), 1885 importance = 1, 1886 level = "error", 1887 header = darkred(" !!! ") 1888 ) 1889 return mirrors_tainted, mirrors_errors, successfull_mirrors, \ 1890 broken_mirrors, check_data 1891 1892 except Exception as err: 1893 1894 entropy.tools.print_traceback() 1895 mirrors_errors = True 1896 broken_mirrors.add(uri) 1897 self._entropy.output( 1898 "[%s|%s|%s] %s: %s, %s: %s" % ( 1899 repository_id, 1900 red(_("sync")), 1901 self._settings['repositories']['branch'], 1902 darkred(_("exception caught")), 1903 Exception, 1904 _("error"), 1905 err, 1906 ), 1907 importance = 1, 1908 level = "error", 1909 header = darkred(" !!! ") 1910 ) 1911 1912 exc_txt = entropy.tools.print_exception( 1913 silent = True) 1914 for line in exc_txt: 1915 self._entropy.output( 1916 repr(line), 1917 importance = 1, 1918 level = "error", 1919 header = darkred(": ") 1920 ) 1921 1922 if len(successfull_mirrors) > 0: 1923 self._entropy.output( 1924 "[%s|%s|%s] %s" % ( 1925 repository_id, 1926 red(_("sync")), 1927 self._settings['repositories']['branch'], 1928 darkred( 1929 _("at least one mirror synced properly!")), 1930 ), 1931 importance = 1, 1932 level = "error", 1933 header = darkred(" !!! ") 1934 ) 1935 continue 1936 1937 # if at least one server has been synced successfully, move files 1938 if (len(successfull_mirrors) > 0) and not pretend: 1939 self._move_files_over_from_upload(repository_id) 1940 1941 if packages_check: 1942 check_data = self._entropy._verify_local_packages(repository_id, 1943 [], ask = ask) 1944 1945 return mirrors_tainted, mirrors_errors, successfull_mirrors, \ 1946 broken_mirrors, check_data
1947
1948 - def _move_files_over_from_upload(self, repository_id):
1949 1950 upload_dir = self._entropy._get_local_upload_directory(repository_id) 1951 basedir_list = [] 1952 entropy.tools.recursive_directory_relative_listing(basedir_list, 1953 upload_dir) 1954 1955 for pkg_rel in basedir_list: 1956 1957 source_pkg = self._entropy.complete_local_upload_package_path( 1958 pkg_rel, repository_id) 1959 dest_pkg = self._entropy.complete_local_package_path(pkg_rel, 1960 repository_id) 1961 1962 # clear expiration file 1963 dest_expiration = dest_pkg + etpConst['packagesexpirationfileext'] 1964 if os.path.isfile(dest_expiration): 1965 os.remove(dest_expiration) 1966 1967 self._entropy._ensure_dir_path(os.path.dirname(dest_pkg)) 1968 1969 try: 1970 os.rename(source_pkg, dest_pkg) 1971 except OSError as err: # on different hard drives? 1972 if err.errno != errno.EXDEV: 1973 raise 1974 shutil.move(source_pkg, dest_pkg)
1975
1976 - def _is_package_expired(self, repository_id, package_rel, days):
1977 1978 pkg_path = self._entropy.complete_local_package_path(package_rel, 1979 repository_id) 1980 exp_pkg_path = pkg_path + etpConst['packagesexpirationfileext'] 1981 weak_pkg_path = pkg_path + etpConst['packagesweakfileext'] 1982 1983 # it is assumed that weakened package files are always marked 1984 # as expired first. So, if a .expired file exists, a .weak 1985 # does as well. However, we must also be fault tolerant and 1986 # cope with the situation in where .weak files exist but not 1987 # their .expired counterpart. 1988 # So, if a .weak file exists, we won't return straight away. 1989 # At the same time, if a .expired file exists, we will use that. 1990 1991 expired_exists = os.path.lexists(exp_pkg_path) 1992 weak_exists = os.path.lexists(weak_pkg_path) 1993 1994 test_pkg_path = None 1995 if expired_exists: 1996 test_pkg_path = exp_pkg_path 1997 elif weak_exists: 1998 # deal with corruption 1999 test_pkg_path = weak_pkg_path 2000 else: 2001 # package file not expired, return straight away 2002 return False 2003 2004 mtime = os.path.getmtime(test_pkg_path) 2005 delta = days * 24 * 3600 2006 currmtime = time.time() 2007 file_delta = currmtime - mtime 2008 2009 if file_delta > delta: 2010 return True 2011 return False
2012
2013 - def _expiration_file_exists(self, repository_id, package_rel):
2014 """ 2015 Return whether the expiration file exists for the given package. 2016 2017 @param repository_id: repository identifier 2018 @type repository_id: string 2019 @param package_rel: package relative url, as returned by 2020 EntropyRepository.retrieveDownloadURL 2021 @type package_rel: string 2022 """ 2023 pkg_path = self._entropy.complete_local_package_path(package_rel, 2024 repository_id) 2025 2026 pkg_path += etpConst['packagesexpirationfileext'] 2027 return os.path.lexists(pkg_path)
2028
2029 - def _weaken_file_exists(self, repository_id, package_rel):
2030 """ 2031 Return whether the weaken file exists for the given package. 2032 2033 @param repository_id: repository identifier 2034 @type repository_id: string 2035 @param package_rel: package relative url, as returned by 2036 EntropyRepository.retrieveDownloadURL 2037 @type package_rel: string 2038 """ 2039 pkg_path = self._entropy.complete_local_package_path(package_rel, 2040 repository_id) 2041 2042 pkg_path += etpConst['packagesweakfileext'] 2043 return os.path.lexists(pkg_path)
2044
2045 - def _create_expiration_file(self, repository_id, package_rel):
2046 """ 2047 Mark the package file as expired by creating an .expired file 2048 if it does not exist. Please note that the created file mtime 2049 will be used to determine when the real package file will be 2050 removed. 2051 2052 @param repository_id: repository identifier 2053 @type repository_id: string 2054 @param package_rel: package relative url, as returned by 2055 EntropyRepository.retrieveDownloadURL 2056 @type package_rel: string 2057 """ 2058 pkg_path = self._entropy.complete_local_package_path(package_rel, 2059 repository_id) 2060 2061 pkg_path += etpConst['packagesexpirationfileext'] 2062 if os.path.lexists(pkg_path): 2063 # do not touch the file then, or mtime will be updated 2064 return 2065 2066 self._entropy.output( 2067 "[%s] %s" % ( 2068 blue(_("expire")), 2069 darkgreen(pkg_path), 2070 ), 2071 importance = 1, 2072 level = "info", 2073 header = brown(" @@ ") 2074 ) 2075 2076 with open(pkg_path, "w") as f_exp: 2077 f_exp.flush()
2078
2079 - def _collect_expiring_packages(self, repository_id, branch):
2080 2081 repo = self._entropy.open_repository(repository_id) 2082 2083 database_bins = set(repo.listAllDownloads(do_sort = False, 2084 full_path = True)) 2085 extra_database_bins = set(repo.listAllExtraDownloads(do_sort = False)) 2086 2087 repo_basedir = self._entropy._get_local_repository_base_directory( 2088 repository_id) 2089 2090 repo_bins = set(self._entropy._get_basedir_pkg_listing(repo_basedir, 2091 etpConst['packagesext'], branch = branch)) 2092 extra_repo_bins = set(self._entropy._get_basedir_pkg_listing( 2093 repo_basedir, etpConst['packagesextraext'], branch = branch)) 2094 2095 # scan .weak files. This is part of the weak-package-files support. 2096 weak_ext = etpConst['packagesweakfileext'] 2097 weak_ext_len = len(weak_ext) 2098 2099 def _map_weak_ext(path): 2100 return path[:-weak_ext_len]
2101 2102 repo_bins |= set( 2103 map( 2104 _map_weak_ext, 2105 self._entropy._get_basedir_pkg_listing( 2106 repo_basedir, 2107 etpConst['packagesext'] + weak_ext, 2108 branch = branch)) 2109 ) 2110 extra_repo_bins |= set( 2111 map( 2112 _map_weak_ext, 2113 self._entropy._get_basedir_pkg_listing( 2114 repo_basedir, 2115 etpConst['packagesextraext'] + weak_ext, 2116 branch = branch)) 2117 ) 2118 2119 # convert to set, so that we can do fast thingszzsd 2120 repo_bins -= database_bins 2121 extra_repo_bins -= extra_database_bins 2122 return repo_bins, extra_repo_bins 2123
2124 - def _weaken_package_file(self, repository_id, package_rel):
2125 """ 2126 Weaken the package file by creating a .weak file containing 2127 information about the to-be-removed package file. 2128 2129 @param repository_id: repository identifier 2130 @type repository_id: string 2131 @param package_rel: package relative url, as returned by 2132 EntropyRepository.retrieveDownloadURL 2133 @type package_rel: string 2134 """ 2135 pkg_path = self._entropy.complete_local_package_path(package_rel, 2136 repository_id) 2137 2138 pkg_path += etpConst['packagesweakfileext'] 2139 if os.path.lexists(pkg_path): 2140 # do not touch, or mtime will be updated 2141 return 2142 2143 self._entropy.output( 2144 "[%s] %s" % ( 2145 blue(_("weaken")), 2146 darkgreen(pkg_path), 2147 ), 2148 importance = 1, 2149 level = "info", 2150 header = brown(" @@ ") 2151 ) 2152 2153 with open(pkg_path, "w") as f_exp: 2154 f_exp.flush()
2155
2156 - def _remove_local_package(self, repository_id, package_rel, 2157 remove_expired = True, remove_weak = True):
2158 """ 2159 Remove a package file locally. 2160 2161 @param repository_id: repository identifier 2162 @type repository_id: string 2163 @param package_rel: package relative url, as returned by 2164 EntropyRepository.retrieveDownloadURL 2165 @type package_rel: string 2166 @keyword remove_expired: remove the .expired file? 2167 @type remove_expired: bool 2168 @keyword remove_weak: remove the .weak file? 2169 @type remove_weak: bool 2170 """ 2171 package_path = self._entropy.complete_local_package_path( 2172 package_rel, repository_id) 2173 # if package files are stuck in the upload/ directory 2174 # it means that the repository itself has never been pushed 2175 up_package_path = self._entropy.complete_local_upload_package_path( 2176 package_rel, repository_id) 2177 2178 remove_list = [package_path, up_package_path] 2179 if remove_expired: 2180 package_path_expired = package_path + \ 2181 etpConst['packagesexpirationfileext'] 2182 remove_list.append(package_path_expired) 2183 2184 if remove_weak: 2185 package_path_weak = package_path + \ 2186 etpConst['packagesweakfileext'] 2187 remove_list.append(package_path_weak) 2188 2189 for path in remove_list: 2190 try: 2191 os.remove(path) 2192 except OSError as err: 2193 # handle race conditions 2194 if err.errno != errno.ENOENT: 2195 raise 2196 continue 2197 self._entropy.output( 2198 "[%s] %s" % ( 2199 blue(_("remove")), 2200 darkgreen(path), 2201 ), 2202 importance = 1, 2203 level = "info", 2204 header = brown(" @@ ") 2205 )
2206
2207 - def tidy_mirrors(self, repository_id, ask = True, pretend = False, 2208 expiration_days = None):
2209 """ 2210 Cleanup package mirrors for given repository from outdated package 2211 files. A package file is considered outdated if the corresponding 2212 entry in the repository database has been removed and the removal is 2213 ETP_EXPIRATION_DAYS (env var) days old (default is given by: 2214 etpConst['packagesexpirationdays'] and can be changed in server.conf). 2215 2216 @param repository_id: repository identifier 2217 @type repository_id: string 2218 @keyword ask: be interactive and ask user for confirmation 2219 @type ask: bool 2220 @keyword pretend: just execute without effectively change anything on 2221 mirrors 2222 @keyword expiration_days: days after a package is considered expired 2223 @type: int 2224 @type pretend: bool 2225 @return: True, if tidy went successful, False if not 2226 @rtype: bool 2227 """ 2228 srv_set = self._settings[Server.SYSTEM_SETTINGS_PLG_ID]['server'] 2229 if expiration_days is None: 2230 expiration_days = srv_set['packages_expiration_days'] 2231 else: 2232 if not isinstance(expiration_days, const_get_int()): 2233 raise AttributeError("invalid expiration_days") 2234 if expiration_days < 0: 2235 raise AttributeError("invalid expiration_days") 2236 2237 weak_package_files = srv_set['weak_package_files'] 2238 2239 self._entropy.output( 2240 "[%s|%s|%s] %s" % ( 2241 brown(repository_id), 2242 red(_("tidy")), 2243 blue(self._settings['repositories']['branch']), 2244 blue(_("collecting expired packages")), 2245 ), 2246 importance = 1, 2247 level = "info", 2248 header = red(" @@ ") 2249 ) 2250 2251 branch_data = {} 2252 done = True 2253 branch = self._settings['repositories']['branch'] 2254 2255 self._entropy.output( 2256 "[%s] %s" % ( 2257 brown(branch), 2258 blue(_("collecting expired packages in the selected branches")), 2259 ), 2260 importance = 1, 2261 level = "info", 2262 header = blue(" @@ ") 2263 ) 2264 2265 # collect removed packages 2266 expiring_packages, extra_expiring_packages = \ 2267 self._collect_expiring_packages(repository_id, branch) 2268 if expiring_packages: 2269 2270 # filter expired packages used by other branches 2271 # this is done for the sake of consistency 2272 # --- read packages.db.pkglist, make sure your repository 2273 # has been ported to latest Entropy 2274 2275 branch_pkglist_data = self._read_remote_file_in_branches( 2276 repository_id, etpConst['etpdatabasepkglist'], 2277 excluded_branches = [branch]) 2278 # format data 2279 for key, val in list(branch_pkglist_data.items()): 2280 branch_pkglist_data[key] = val.split("\n") 2281 2282 for other_branch in branch_pkglist_data: 2283 branch_pkglist = set(branch_pkglist_data[other_branch]) 2284 expiring_packages -= branch_pkglist 2285 2286 if extra_expiring_packages: 2287 2288 # filter expired packages used by other branches 2289 # this is done for the sake of consistency 2290 # --- read packages.db.extra_pkglist, make sure your repository 2291 # has been ported to latest Entropy 2292 2293 branch_extra_pkglist_data = self._read_remote_file_in_branches( 2294 repository_id, etpConst['etpdatabaseextrapkglist'], 2295 excluded_branches = [branch]) 2296 # format data 2297 for key, val in list(branch_extra_pkglist_data.items()): 2298 branch_extra_pkglist_data[key] = val.split("\n") 2299 2300 for other_branch in branch_extra_pkglist_data: 2301 branch_pkglist = set(branch_extra_pkglist_data[other_branch]) 2302 extra_expiring_packages -= branch_pkglist 2303 2304 remove = [] 2305 expire = [] 2306 weaken = [] 2307 2308 for package_rel in expiring_packages: 2309 expired = self._is_package_expired(repository_id, package_rel, 2310 expiration_days) 2311 if expired: 2312 remove.append(package_rel) 2313 else: 2314 if not self._expiration_file_exists(repository_id, package_rel): 2315 expire.append(package_rel) 2316 if weak_package_files and not self._weaken_file_exists( 2317 repository_id, package_rel): 2318 weaken.append(package_rel) 2319 2320 for extra_package_rel in extra_expiring_packages: 2321 expired = self._is_package_expired(repository_id, extra_package_rel, 2322 expiration_days) 2323 if expired: 2324 remove.append(extra_package_rel) 2325 else: 2326 if not self._expiration_file_exists( 2327 repository_id, extra_package_rel): 2328 expire.append(extra_package_rel) 2329 if weak_package_files and not self._weaken_file_exists( 2330 repository_id, extra_package_rel): 2331 weaken.append(extra_package_rel) 2332 2333 if not (remove or weaken or expire): 2334 self._entropy.output( 2335 "[%s] %s" % ( 2336 brown(branch), 2337 blue(_("nothing to clean on this branch")), 2338 ), 2339 importance = 1, 2340 level = "info", 2341 header = blue(" @@ ") 2342 ) 2343 return done 2344 2345 if remove: 2346 self._entropy.output( 2347 "[%s] %s:" % ( 2348 brown(branch), 2349 blue(_("these will be removed")), 2350 ), 2351 importance = 1, 2352 level = "info", 2353 header = blue(" @@ ") 2354 ) 2355 for package in remove: 2356 self._entropy.output( 2357 "[%s] %s: %s" % ( 2358 brown(branch), 2359 blue(_("remove")), 2360 darkgreen(package), 2361 ), 2362 importance = 1, 2363 level = "info", 2364 header = brown(" # ") 2365 ) 2366 2367 if expire: 2368 self._entropy.output( 2369 "[%s] %s:" % ( 2370 brown(branch), 2371 blue(_("these will be marked as expired")), 2372 ), 2373 importance = 1, 2374 level = "info", 2375 header = blue(" @@ ") 2376 ) 2377 for package in expire: 2378 self._entropy.output( 2379 "[%s] %s: %s" % ( 2380 brown(branch), 2381 blue(_("expire")), 2382 darkgreen(package), 2383 ), 2384 importance = 1, 2385 level = "info", 2386 header = brown(" # ") 2387 ) 2388 2389 if weaken: 2390 self._entropy.output( 2391 "[%s] %s:" % ( 2392 brown(branch), 2393 blue(_("these will be removed and marked as weak")), 2394 ), 2395 importance = 1, 2396 level = "info", 2397 header = blue(" @@ ") 2398 ) 2399 for package in weaken: 2400 self._entropy.output( 2401 "[%s] %s: %s" % ( 2402 brown(branch), 2403 blue(_("weaken")), 2404 darkgreen(package), 2405 ), 2406 importance = 1, 2407 level = "info", 2408 header = brown(" # ") 2409 ) 2410 2411 if pretend: 2412 return done 2413 2414 if ask: 2415 rc_question = self._entropy.ask_question( 2416 _("Would you like to continue ?")) 2417 if rc_question == _("No"): 2418 return done 2419 2420 for package_rel in expire: 2421 self._create_expiration_file(repository_id, package_rel) 2422 2423 # split queue by remote directories to work on 2424 removal_map = {} 2425 dbconn = self._entropy.open_server_repository(repository_id, 2426 just_reading = True) 2427 for package_rel in remove: 2428 rel_path = self._entropy.complete_remote_package_relative_path( 2429 package_rel, repository_id) 2430 rel_dir = os.path.dirname(rel_path) 2431 obj = removal_map.setdefault(rel_dir, []) 2432 base_pkg = os.path.basename(package_rel) 2433 obj.append(base_pkg) 2434 2435 for uri in self._entropy.remote_packages_mirrors(repository_id): 2436 2437 ## 2438 # remove remotely 2439 ## 2440 2441 uri_done = True 2442 m_fine_uris = set() 2443 m_broken_uris = set() 2444 for remote_dir, myqueue in removal_map.items(): 2445 2446 self._entropy.output( 2447 "[%s] %s..." % ( 2448 brown(branch), 2449 blue(_("removing packages remotely")), 2450 ), 2451 importance = 1, 2452 level = "info", 2453 header = blue(" @@ ") 2454 ) 2455 2456 destroyer = self.TransceiverServerHandler( 2457 self._entropy, 2458 [uri], 2459 myqueue, 2460 critical_files = [], 2461 txc_basedir = remote_dir, 2462 remove = True, 2463 repo = repository_id 2464 ) 2465 xerrors, xm_fine_uris, xm_broken_uris = destroyer.go() 2466 if xerrors: 2467 uri_done = False 2468 m_fine_uris.update(xm_fine_uris) 2469 m_broken_uris.update(xm_broken_uris) 2470 2471 if not uri_done: 2472 my_broken_uris = [ 2473 (EntropyTransceiver.get_uri_name(x_uri), x_uri_rc) \ 2474 for x_uri, x_uri_rc in m_broken_uris] 2475 2476 reason = my_broken_uris[0][1] 2477 crippled_uri = EntropyTransceiver.get_uri_name(uri) 2478 self._entropy.output( 2479 "[%s] %s: %s, %s: %s" % ( 2480 brown(branch), 2481 blue(_("remove errors")), 2482 red(crippled_uri), 2483 blue(_("reason")), 2484 reason, 2485 ), 2486 importance = 1, 2487 level = "warning", 2488 header = brown(" !!! ") 2489 ) 2490 done = False 2491 2492 self._entropy.output( 2493 "[%s] %s..." % ( 2494 brown(branch), 2495 blue(_("removing packages locally")), 2496 ), 2497 importance = 1, 2498 level = "info", 2499 header = blue(" @@ ") 2500 ) 2501 2502 ## 2503 # remove locally 2504 ## 2505 2506 for package_rel in remove: 2507 self._remove_local_package(repository_id, package_rel) 2508 2509 for package_rel in weaken: 2510 self._weaken_package_file(repository_id, package_rel) 2511 self._remove_local_package(repository_id, package_rel, 2512 remove_expired = False, 2513 remove_weak = False) 2514 2515 return done
2516
2517 - def download_notice_board(self, repository_id):
2518 """ 2519 Download notice board for given repository identifier. 2520 2521 @param repository_id: repository identifier 2522 @type repository_id: string 2523 @return: True if download went successful. 2524 @rtype: bool 2525 """ 2526 mirrors = self._entropy.remote_repository_mirrors(repository_id) 2527 rss_path = self._entropy._get_local_repository_notice_board_file( 2528 repository_id) 2529 mytmpdir = const_mkdtemp(prefix = "entropy.server") 2530 2531 self._entropy.output( 2532 "[%s] %s %s" % ( 2533 brown(repository_id), 2534 blue(_("downloading notice board from mirrors to")), 2535 red(rss_path), 2536 ), 2537 importance = 1, 2538 level = "info", 2539 header = blue(" @@ ") 2540 ) 2541 2542 remote_dir = os.path.join( 2543 self._entropy._get_remote_repository_relative_path(repository_id), 2544 self._settings['repositories']['branch']) 2545 2546 downloaded = False 2547 for uri in mirrors: 2548 crippled_uri = EntropyTransceiver.get_uri_name(uri) 2549 2550 downloader = self.TransceiverServerHandler( 2551 self._entropy, [uri], 2552 [rss_path], download = True, 2553 local_basedir = mytmpdir, critical_files = [rss_path], 2554 txc_basedir = remote_dir, repo = repository_id 2555 ) 2556 errors, m_fine_uris, m_broken_uris = downloader.go() 2557 if not errors: 2558 self._entropy.output( 2559 "[%s] %s: %s" % ( 2560 brown(repository_id), 2561 blue(_("notice board downloaded successfully from")), 2562 red(crippled_uri), 2563 ), 2564 importance = 1, 2565 level = "info", 2566 header = blue(" @@ ") 2567 ) 2568 downloaded = True 2569 break 2570 2571 if downloaded: 2572 shutil.move(os.path.join(mytmpdir, os.path.basename(rss_path)), 2573 rss_path) 2574 2575 return downloaded
2576
2577 - def remove_notice_board(self, repository_id):
2578 """ 2579 Remove notice board for given repository identifier. 2580 2581 @param repository_id: repository identifier 2582 @type repository_id: string 2583 @return: True if removal went successful. 2584 @rtype: bool 2585 """ 2586 mirrors = self._entropy.remote_repository_mirrors(repository_id) 2587 rss_path = self._entropy._get_local_repository_notice_board_file( 2588 repository_id) 2589 rss_file = os.path.basename(rss_path) 2590 2591 self._entropy.output( 2592 "[%s] %s %s" % ( 2593 brown(repository_id), 2594 blue(_("removing notice board from")), 2595 red(rss_file), 2596 ), 2597 importance = 1, 2598 level = "info", 2599 header = blue(" @@ ") 2600 ) 2601 2602 remote_dir = os.path.join( 2603 self._entropy._get_remote_repository_relative_path(repository_id), 2604 self._settings['repositories']['branch']) 2605 2606 destroyer = self.TransceiverServerHandler( 2607 self._entropy, 2608 mirrors, 2609 [rss_file], 2610 critical_files = [rss_file], 2611 remove = True, 2612 txc_basedir = remote_dir, 2613 repo = repository_id 2614 ) 2615 errors, m_fine_uris, m_broken_uris = destroyer.go() 2616 if errors: 2617 m_broken_uris = sorted(m_broken_uris) 2618 m_broken_uris = [EntropyTransceiver.get_uri_name(x_uri) \ 2619 for x_uri, x_uri_rc in m_broken_uris] 2620 self._entropy.output( 2621 "[%s] %s %s" % ( 2622 brown(repository_id), 2623 blue(_("notice board removal failed on")), 2624 red(', '.join(m_broken_uris)), 2625 ), 2626 importance = 1, 2627 level = "info", 2628 header = blue(" @@ ") 2629 ) 2630 return False 2631 self._entropy.output( 2632 "[%s] %s" % ( 2633 brown(repository_id), 2634 blue(_("notice board removal success")), 2635 ), 2636 importance = 1, 2637 level = "info", 2638 header = blue(" @@ ") 2639 ) 2640 return True
2641 2642
2643 - def upload_notice_board(self, repository_id):
2644 """ 2645 Upload notice board for given repository identifier. 2646 2647 @param repository_id: repository identifier 2648 @type repository_id: string 2649 @return: True if upload went successful. 2650 @rtype: bool 2651 """ 2652 mirrors = self._entropy.remote_repository_mirrors(repository_id) 2653 rss_path = self._entropy._get_local_repository_notice_board_file( 2654 repository_id) 2655 2656 self._entropy.output( 2657 "[%s] %s %s" % ( 2658 brown(repository_id), 2659 blue(_("uploading notice board from")), 2660 red(rss_path), 2661 ), 2662 importance = 1, 2663 level = "info", 2664 header = blue(" @@ ") 2665 ) 2666 2667 remote_dir = os.path.join( 2668 self._entropy._get_remote_repository_relative_path(repository_id), 2669 self._settings['repositories']['branch']) 2670 2671 uploader = self.TransceiverServerHandler( 2672 self._entropy, 2673 mirrors, 2674 [rss_path], 2675 critical_files = [rss_path], 2676 txc_basedir = remote_dir, repo = repository_id 2677 ) 2678 errors, m_fine_uris, m_broken_uris = uploader.go() 2679 if errors: 2680 m_broken_uris = sorted(m_broken_uris) 2681 m_broken_uris = [EntropyTransceiver.get_uri_name(x_uri) \ 2682 for x_uri, x_uri_rc in m_broken_uris] 2683 self._entropy.output( 2684 "[%s] %s %s" % ( 2685 brown(repository_id), 2686 blue(_("notice board upload failed on")), 2687 red(', '.join(m_broken_uris)), 2688 ), 2689 importance = 1, 2690 level = "info", 2691 header = blue(" @@ ") 2692 ) 2693 return False 2694 self._entropy.output( 2695 "[%s] %s" % ( 2696 brown(repository_id), 2697 blue(_("notice board upload success")), 2698 ), 2699 importance = 1, 2700 level = "info", 2701 header = blue(" @@ ") 2702 ) 2703 return True
2704 2705
2706 - def update_notice_board(self, repository_id, title, notice_text, 2707 link = None):
2708 """ 2709 Update notice board adding a new entry, provided by a title and a 2710 body message (notice_text). Providing a link is optional. 2711 2712 @param repository_id: repository identifier 2713 @type repository_id: string 2714 @param title: noticeboard new entry title 2715 @type title: string 2716 @param notice_text: noticeboard new entry text 2717 @type notice_text: string 2718 @keyword link: optional link to provide with the noticeboard entry 2719 @type link: string 2720 @return: True if update went successful. 2721 @rtype: bool 2722 """ 2723 rss_title = "%s Notice Board" % (self._settings['system']['name'],) 2724 rss_description = "Inform about important distribution activities." 2725 rss_path = self._entropy._get_local_repository_notice_board_file( 2726 repository_id) 2727 srv_set = self._settings[Server.SYSTEM_SETTINGS_PLG_ID]['server'] 2728 if not link: 2729 link = srv_set['rss']['website_url'] 2730 2731 self.download_notice_board(repository_id) 2732 rss_main = RSS(rss_path, rss_title, rss_description, 2733 maxentries = 20) 2734 rss_main.add_item(title, link, description = notice_text) 2735 rss_main.write_changes() 2736 dict_list, items = rss_main.get_entries() 2737 if items == 0: 2738 status = self.remove_notice_board(repository_id) 2739 else: 2740 status = self.upload_notice_board(repository_id) 2741 return status
2742
2743 - def read_notice_board(self, repository_id, do_download = True):
2744 """ 2745 Read content of noticeboard for given repository. do_download, if True, 2746 fetches the noticeboard directly from the remote repository before 2747 returning its content. If noticeboard cannot be downloaded or 2748 do_download is False and there is any local cache, None will be 2749 returned. 2750 2751 @param repository_id: repository identifier 2752 @type repository_id: string 2753 @return: the output of entropy.misc.RSS.get_entries() or None 2754 @rtype: tuple or None 2755 """ 2756 rss_path = self._entropy._get_local_repository_notice_board_file( 2757 repository_id) 2758 if do_download: 2759 self.download_notice_board(repository_id) 2760 if not const_file_readable(rss_path): 2761 return None 2762 rss_main = RSS(rss_path, '', '') 2763 return rss_main.get_entries()
2764
2765 - def remove_from_notice_board(self, repository_id, identifier):
2766 """ 2767 Remove entry from noticeboard of given repository. read_notice_board() 2768 returns an object containing a list of entries, identifier here 2769 represents the index of that list, if it exists. 2770 2771 @param repository_id: repository identifier 2772 @type repository_id: string 2773 @param identifier: notice board identifier 2774 @type identifier: int 2775 @return: True, if operation is successful, False otherwise 2776 @rtype: bool 2777 """ 2778 rss_path = self._entropy._get_local_repository_notice_board_file( 2779 repository_id) 2780 rss_title = "%s Notice Board" % (self._settings['system']['name'],) 2781 rss_description = "Inform about important distribution activities." 2782 if not const_file_readable(rss_path): 2783 return False 2784 rss_main = RSS(rss_path, rss_title, rss_description) 2785 counter = rss_main.remove_entry(identifier) 2786 rss_main.write_changes() 2787 return True
2788