Package entropy :: Package spm :: Package plugins :: Package interfaces :: Package portage_plugin :: Module xpaktools

Source Code for Module entropy.spm.plugins.interfaces.portage_plugin.xpaktools

  1  # -*- coding: utf-8 -*- 
  2  """ 
  3   
  4      @author: Fabio Erculiani <[email protected]> 
  5      @author: Slawomir Nizio <[email protected]> 
  6      @contact: [email protected] 
  7      @copyright: Fabio Erculiani 
  8      @license: GPL-2 
  9   
 10      B{Entropy Source Package Manager "Portage" Plugin XPAK tools}. 
 11   
 12  """ 
 13  import sys 
 14  import os 
 15  import errno 
 16  import shutil 
 17  from entropy.const import etpConst, const_is_python3, const_mkdtemp, \ 
 18      const_mkstemp 
 19  from entropy.output import TextInterface 
 20  from entropy.spm.plugins.factory import get_default_instance 
 21  from entropy.spm.plugins.interfaces.portage_plugin import xpak 
 22   
23 -def extract_xpak(tbz2file, tmpdir = None):
24 """ 25 docstring_title 26 27 @param tbz2file: 28 @type tbz2file: 29 @keyword tmpdir: 30 @type tmpdir: 31 @return: 32 @rtype: 33 """ 34 # extract xpak content 35 tmp_fd, tmp_path = const_mkstemp( 36 prefix="entropy.spm.portage.extract_xpak") 37 os.close(tmp_fd) 38 try: 39 done = suck_xpak(tbz2file, tmp_path) 40 if not done: 41 return None 42 return unpack_xpak(tmp_path, tmpdir = tmpdir) 43 finally: 44 try: 45 # unpack_xpak already removes it 46 os.remove(tmp_path) 47 except OSError as err: 48 if err.errno != errno.ENOENT: 49 raise
50
51 -def read_xpak(tbz2file):
52 """ 53 docstring_title 54 55 @param tbz2file: 56 @type tbz2file: 57 @return: 58 @rtype: 59 """ 60 tmp_fd, tmp_path = const_mkstemp( 61 prefix="entropy.spm.portage.read_xpak") 62 os.close(tmp_fd) 63 try: 64 done = suck_xpak(tbz2file, tmp_path) 65 if not done: 66 return None 67 with open(tmp_path, "rb") as f: 68 data = f.read() 69 return data 70 finally: 71 os.remove(tmp_path)
72
73 -def unpack_xpak(xpakfile, tmpdir = None):
74 """ 75 docstring_title 76 77 @param xpakfile: 78 @type xpakfile: 79 @keyword tmpdir: 80 @type tmpdir: 81 @return: 82 @rtype: 83 """ 84 if tmpdir is None: 85 tmpdir = const_mkdtemp(prefix="unpack_xpak") 86 elif not os.path.isdir(tmpdir): 87 raise AttributeError("tmpdir %s does not exist" % (tmpdir,)) 88 try: 89 xpakdata = xpak.getboth(xpakfile) 90 xpak.xpand(xpakdata, tmpdir) 91 return tmpdir 92 except TypeError: 93 return None 94 finally: 95 try: 96 os.remove(xpakfile) 97 except OSError: 98 pass
99
100 -def suck_xpak(tbz2file, xpakpath):
101 """ 102 docstring_title 103 104 @param tbz2file: 105 @type tbz2file: 106 @param xpakpath: 107 @type xpakpath: 108 @return: 109 @rtype: 110 """ 111 if const_is_python3(): 112 xpak_end = b"XPAKSTOP" 113 xpak_start = b"XPAKPACK" 114 else: 115 xpak_end = "XPAKSTOP" 116 xpak_start = "XPAKPACK" 117 118 chunk_size = 2048 119 120 # Sanity check: makes the position calculations easier (seek_length below). 121 assert len(xpak_end) == len(xpak_start) 122 123 old, db = None, None 124 try: 125 old = open(tbz2file, "rb") 126 db = open(xpakpath, "wb") 127 data_start_position = None 128 data_end_position = None 129 # position old to the end 130 old.seek(0, os.SEEK_END) 131 n_bytes = old.tell() 132 133 chunk_size = min(chunk_size, n_bytes) 134 135 # position one chunk from the end, then continue 136 seek_pos = n_bytes - chunk_size 137 138 while True: 139 old.seek(seek_pos, os.SEEK_SET) 140 read_bytes = old.read(chunk_size) 141 142 end_idx = read_bytes.rfind(xpak_end) 143 if end_idx != -1: 144 if data_start_position is None: 145 data_end_position = seek_pos + end_idx + len(xpak_end) 146 # avoid START after END in rfind() 147 read_bytes = read_bytes[:end_idx] 148 149 start_idx = read_bytes.rfind(xpak_start) 150 if start_idx != -1: 151 if data_end_position is not None: 152 data_start_position = seek_pos + start_idx 153 break 154 155 if seek_pos == 0: 156 break 157 158 # Make sure the seeks are so that there is enough overlap. 159 seek_length = chunk_size - (len(xpak_start) - 1) 160 seek_pos -= seek_length 161 if seek_pos < 0: 162 seek_pos = 0 163 164 if data_start_position is None: 165 return False 166 if data_end_position is None: 167 return False 168 169 # now write to found metadata to file 170 # starting from data_start_position 171 # ending to data_end_position 172 old.seek(data_start_position) 173 to_read = data_end_position - data_start_position 174 while to_read > 0: 175 data = old.read(to_read) 176 db.write(data) 177 to_read -= len(data) 178 return True 179 180 finally: 181 if old is not None: 182 old.close() 183 if db is not None: 184 db.close()
185
186 -def aggregate_xpak(tbz2file, xpakfile):
187 """ 188 Aggregate xpakfile content to tbz2file 189 190 @param tbz2file: 191 @type tbz2file: 192 @param xpakfile: 193 @type xpakfile: 194 @return: 195 @rtype: 196 """ 197 tbz2 = xpak.tbz2(tbz2file) 198 with open(xpakfile, "rb") as xpak_f: 199 # put all in memory 200 xpak_data = xpak_f.read() 201 tbz2.recompose_mem(xpak_data) 202 del xpak_data
203