Package entropy :: Package server :: Module transceivers

Source Code for Module entropy.server.transceivers

  1  # -*- coding: utf-8 -*- 
  2  """ 
  3   
  4      @author: Fabio Erculiani <[email protected]> 
  5      @contact: [email protected] 
  6      @copyright: Fabio Erculiani 
  7      @license: GPL-2 
  8   
  9      B{Entropy Server transceivers module}. 
 10   
 11  """ 
 12  import os 
 13   
 14  from entropy.const import const_isstring, const_isnumber, etpConst 
 15  from entropy.output import darkred, blue, brown, darkgreen, red, bold 
 16  from entropy.transceivers.exceptions import TransceiverConnectionError 
 17  from entropy.i18n import _ 
 18  from entropy.client.interfaces.db import InstalledPackagesRepository 
 19  from entropy.core.settings.base import SystemSettings 
 20  from entropy.transceivers import EntropyTransceiver 
 21  from entropy.tools import print_traceback, is_valid_md5, compare_md5, md5sum 
 22   
23 -class TransceiverServerHandler:
24
25 - def __init__(self, entropy_interface, uris, files_to_upload, 26 download = False, remove = False, txc_basedir = None, 27 local_basedir = None, critical_files = None, 28 handlers_data = None, repo = None, copy_herustic_support = False):
29 30 if critical_files is None: 31 critical_files = [] 32 if handlers_data is None: 33 handlers_data = {} 34 35 self._entropy = entropy_interface 36 if not isinstance(uris, list): 37 raise AttributeError("uris must be a list instance") 38 if not isinstance(files_to_upload, (list, dict)): 39 raise AttributeError( 40 "files_to_upload must be a list or dict instance") 41 self.uris = uris 42 if isinstance(files_to_upload, list): 43 self.myfiles = files_to_upload[:] 44 else: 45 self.myfiles = sorted([x for x in files_to_upload]) 46 47 self._settings = SystemSettings() 48 self.sys_settings_plugin_id = \ 49 etpConst['system_settings_plugins_ids']['server_plugin'] 50 srv_set = self._settings[self.sys_settings_plugin_id]['server'] 51 52 # server-side speed limit 53 self.speed_limit = srv_set['sync_speed_limit'] 54 self.download = download 55 self.remove = remove 56 self.repo = repo 57 if self.repo == None: 58 self.repo = self._entropy.repository() 59 if self.remove: 60 self.download = False 61 self._copy_herustic = copy_herustic_support 62 if self._copy_herustic and (self.download or self.remove): 63 raise AttributeError( 64 "copy_herustic_support can be enabled only for uploads") 65 66 if not txc_basedir: 67 raise AttributeError("invalid txc_basedir passed") 68 self.txc_basedir = txc_basedir 69 70 if not local_basedir: 71 # default to database directory 72 self.local_basedir = os.path.dirname( 73 self._entropy._get_local_repository_file(self.repo)) 74 else: 75 self.local_basedir = local_basedir 76 77 self.critical_files = critical_files 78 self.handlers_data = handlers_data.copy()
79
80 - def handler_verify_upload(self, local_filepath, uri, counter, maxcount, 81 tries, remote_md5 = None):
82 83 crippled_uri = EntropyTransceiver.get_uri_name(uri) 84 85 self._entropy.output( 86 "[%s|#%s|(%s/%s)] %s: %s" % ( 87 blue(crippled_uri), 88 darkgreen(str(tries)), 89 blue(str(counter)), 90 bold(str(maxcount)), 91 darkgreen(_("verifying upload (if supported)")), 92 blue(os.path.basename(local_filepath)), 93 ), 94 importance = 0, 95 level = "info", 96 header = red(" @@ "), 97 back = True 98 ) 99 100 valid_remote_md5 = True 101 # if remote server supports MD5 commands, remote_md5 is filled 102 if const_isstring(remote_md5): 103 valid_md5 = is_valid_md5(remote_md5) 104 ckres = False 105 if valid_md5: # seems valid 106 ckres = compare_md5(local_filepath, remote_md5) 107 if ckres: 108 self._entropy.output( 109 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 110 blue(crippled_uri), 111 darkgreen(str(tries)), 112 blue(str(counter)), 113 bold(str(maxcount)), 114 blue(_("digest verification")), 115 os.path.basename(local_filepath), 116 darkgreen(_("so far, so good!")), 117 ), 118 importance = 0, 119 level = "info", 120 header = red(" @@ ") 121 ) 122 return True 123 # ouch! 124 elif not valid_md5: 125 # mmmh... malformed md5, try with handlers 126 self._entropy.output( 127 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 128 blue(crippled_uri), 129 darkgreen(str(tries)), 130 blue(str(counter)), 131 bold(str(maxcount)), 132 blue(_("digest verification")), 133 os.path.basename(local_filepath), 134 bold(_("malformed md5 provided to function")), 135 ), 136 importance = 0, 137 level = "warning", 138 header = brown(" @@ ") 139 ) 140 else: # it's really bad! 141 self._entropy.output( 142 "[%s|#%s|(%s/%s)] %s: %s: %s" % ( 143 blue(crippled_uri), 144 darkgreen(str(tries)), 145 blue(str(counter)), 146 bold(str(maxcount)), 147 blue(_("digest verification")), 148 os.path.basename(local_filepath), 149 bold(_("remote md5 is invalid")), 150 ), 151 importance = 0, 152 level = "warning", 153 header = brown(" @@ ") 154 ) 155 valid_remote_md5 = False 156 157 return valid_remote_md5 # always valid
158
159 - def _transceive(self, uri):
160 161 fine = set() 162 broken = set() 163 fail = False 164 crippled_uri = EntropyTransceiver.get_uri_name(uri) 165 action = 'push' 166 if self.download: 167 action = 'pull' 168 elif self.remove: 169 action = 'remove' 170 171 try: 172 txc = EntropyTransceiver(uri) 173 if const_isnumber(self.speed_limit): 174 txc.set_speed_limit(self.speed_limit) 175 txc.set_output_interface(self._entropy) 176 except TransceiverConnectionError: 177 print_traceback() 178 return True, fine, broken # issues 179 180 maxcount = len(self.myfiles) 181 counter = 0 182 183 with txc as handler: 184 185 for mypath in self.myfiles: 186 187 base_dir = self.txc_basedir 188 189 if isinstance(mypath, tuple): 190 if len(mypath) < 2: 191 continue 192 base_dir, mypath = mypath 193 194 if not handler.is_dir(base_dir): 195 handler.makedirs(base_dir) 196 197 mypath_fn = os.path.basename(mypath) 198 remote_path = os.path.join(base_dir, mypath_fn) 199 200 syncer = handler.upload 201 myargs = (mypath, remote_path) 202 if self.download: 203 syncer = handler.download 204 local_path = os.path.join(self.local_basedir, mypath_fn) 205 myargs = (remote_path, local_path) 206 elif self.remove: 207 syncer = handler.delete 208 myargs = (remote_path,) 209 210 fallback_syncer, fallback_args = None, None 211 # upload -> remote copy herustic support 212 # if a package file might have been already uploaded 213 # to remote mirror, try to look in other repositories' 214 # package directories if a file, with the same md5 and name 215 # is already available. In this case, use remote copy instead 216 # of upload to save bandwidth. 217 if self._copy_herustic and (syncer == handler.upload): 218 # copy herustic support enabled 219 # we are uploading 220 new_syncer, new_args = self._copy_herustic_support( 221 handler, mypath, base_dir, remote_path) 222 if new_syncer is not None: 223 fallback_syncer, fallback_args = syncer, myargs 224 syncer, myargs = new_syncer, new_args 225 action = "copy" 226 227 counter += 1 228 tries = 0 229 done = False 230 lastrc = None 231 232 while tries < 5: 233 tries += 1 234 self._entropy.output( 235 "[%s|#%s|(%s/%s)] %s: %s" % ( 236 blue(crippled_uri), 237 darkgreen(str(tries)), 238 blue(str(counter)), 239 bold(str(maxcount)), 240 blue(action), 241 red(os.path.basename(mypath)), 242 ), 243 importance = 0, 244 level = "info", 245 header = red(" @@ ") 246 ) 247 rc = syncer(*myargs) 248 if (not rc) and (fallback_syncer is not None): 249 # if we have a fallback syncer, try it first 250 # before giving up. 251 rc = fallback_syncer(*myargs) 252 253 if rc and not (self.download or self.remove): 254 remote_md5 = handler.get_md5(remote_path) 255 rc = self.handler_verify_upload(mypath, uri, 256 counter, maxcount, tries, remote_md5 = remote_md5) 257 if rc: 258 self._entropy.output( 259 "[%s|#%s|(%s/%s)] %s %s: %s" % ( 260 blue(crippled_uri), 261 darkgreen(str(tries)), 262 blue(str(counter)), 263 bold(str(maxcount)), 264 blue(action), 265 _("successful"), 266 red(os.path.basename(mypath)), 267 ), 268 importance = 0, 269 level = "info", 270 header = darkgreen(" @@ ") 271 ) 272 done = True 273 fine.add(uri) 274 break 275 else: 276 self._entropy.output( 277 "[%s|#%s|(%s/%s)] %s %s: %s" % ( 278 blue(crippled_uri), 279 darkgreen(str(tries)), 280 blue(str(counter)), 281 bold(str(maxcount)), 282 blue(action), 283 brown(_("failed, retrying")), 284 red(os.path.basename(mypath)), 285 ), 286 importance = 0, 287 level = "warning", 288 header = brown(" @@ ") 289 ) 290 lastrc = rc 291 continue 292 293 if not done: 294 295 self._entropy.output( 296 "[%s|(%s/%s)] %s %s: %s - %s: %s" % ( 297 blue(crippled_uri), 298 blue(str(counter)), 299 bold(str(maxcount)), 300 blue(action), 301 darkred("failed, giving up"), 302 red(os.path.basename(mypath)), 303 _("error"), 304 lastrc, 305 ), 306 importance = 1, 307 level = "error", 308 header = darkred(" !!! ") 309 ) 310 311 if mypath not in self.critical_files: 312 self._entropy.output( 313 "[%s|(%s/%s)] %s: %s, %s..." % ( 314 blue(crippled_uri), 315 blue(str(counter)), 316 bold(str(maxcount)), 317 blue(_("not critical")), 318 os.path.basename(mypath), 319 blue(_("continuing")), 320 ), 321 importance = 1, 322 level = "warning", 323 header = brown(" @@ ") 324 ) 325 continue 326 327 fail = True 328 broken.add((uri, lastrc)) 329 # next mirror 330 break 331 332 return fail, fine, broken
333
334 - def _copy_herustic_support(self, handler, local_path, 335 txc_basedir, remote_path):
336 """ 337 Determine if it's possible to remote copy the package from other 338 configured repositories to save bandwidth. 339 This herustic only works with package files, not repository db files. 340 Thus, it should be only enabled for these kind of uploads. 341 """ 342 pkg_download = self.handlers_data.get('download') 343 if pkg_download is None: 344 # unsupported, we need at least package "download" metadatum 345 # to be able to reconstruct a valid remote URI 346 return None, None 347 348 current_repository_id = self.repo 349 available_repositories = self._entropy.available_repositories() 350 test_repositories = [] 351 for repository_id, repo_meta in available_repositories.items(): 352 if current_repository_id == repository_id: 353 # not me 354 continue 355 if repository_id == InstalledPackagesRepository.NAME: 356 # __system__ repository doesn't have anything remotely 357 # it's a fake repo, skip 358 continue 359 # In order to take advantage of remote copy, it is also required 360 # that current working uri (handler.get_uri()) is also a packages 361 # mirror of the other repository. 362 if handler.get_uri() not in repo_meta['pkg_mirrors']: 363 # no way 364 continue 365 test_repositories.append(repository_id) 366 367 if not test_repositories: 368 # sorry! 369 return None, None 370 371 test_repositories.sort() 372 373 local_path_filename = os.path.basename(local_path) 374 local_md5 = None 375 for repository_id in test_repositories: 376 repo_txc_basedir = \ 377 self._entropy.complete_remote_package_relative_path( 378 pkg_download, repository_id) 379 test_remote_path = repo_txc_basedir + "/" + local_path_filename 380 if not handler.is_file(test_remote_path): 381 # not found on this packages mirror 382 continue 383 # then check md5 and compare 384 remote_md5 = handler.get_md5(test_remote_path) 385 if not const_isstring(remote_md5): 386 # transceiver or remote server doesn't support md5sum() 387 # so cannot verify the integrity 388 continue 389 if local_md5 is None: 390 local_md5 = md5sum(local_path) 391 if local_md5 == remote_md5: 392 # yay! we can copy over! 393 return handler.copy, (test_remote_path, remote_path) 394 395 return None, None
396
397 - def go(self):
398 399 broken_uris = set() 400 fine_uris = set() 401 errors = False 402 action = 'push' 403 if self.download: 404 action = 'pull' 405 elif self.remove: 406 action = 'remove' 407 408 for uri in self.uris: 409 410 crippled_uri = EntropyTransceiver.get_uri_name(uri) 411 self._entropy.output( 412 "[%s|%s] %s..." % ( 413 blue(crippled_uri), 414 brown(action), 415 blue(_("connecting to mirror")), 416 ), 417 importance = 0, 418 level = "info", 419 header = blue(" @@ ") 420 ) 421 422 self._entropy.output( 423 "[%s|%s] %s %s..." % ( 424 blue(crippled_uri), 425 brown(action), 426 blue(_("setting directory to")), 427 darkgreen(self.txc_basedir), 428 ), 429 importance = 0, 430 level = "info", 431 header = blue(" @@ ") 432 ) 433 434 fail, fine, broken = self._transceive(uri) 435 fine_uris |= fine 436 broken_uris |= broken 437 if fail: 438 errors = True 439 440 return errors, fine_uris, broken_uris
441