From c3e39c1f43f7026a0432e36f57d97891c22fafd8 Mon Sep 17 00:00:00 2001 From: simka Date: Sun, 30 May 2010 18:27:00 +0300 Subject: 1) Add hash checks. 2) Exclude duplicates from the distfile list. --- twrapper/twrapper.py | 302 ++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 287 insertions(+), 15 deletions(-) (limited to 'twrapper') diff --git a/twrapper/twrapper.py b/twrapper/twrapper.py index 3df41a4..9cd3d2a 100755 --- a/twrapper/twrapper.py +++ b/twrapper/twrapper.py @@ -13,6 +13,7 @@ import re import time import sys import signal +from portage.checksum import perform_md5, verify_all from portage.process import spawn import idfetch_settings import pickle @@ -22,7 +23,7 @@ import subprocess import shlex from threading import Thread CLEAN_LINE=" " -MAX_ACTIVE_DOWNLOADS=5 +MAX_ACTIVE_DOWNLOADS=8 TASK_SPACE=3 USE_CURSES_FLAG=1 DEBUG_ON=0 @@ -99,7 +100,6 @@ def open_task_list(): error_msg("open_task_list(): Can't delete file: "+idfetch_pkg_list_file_name) time.sleep(3) - #unlock file os.unlink(idfetch_pkg_list_file_lock_name) @@ -125,11 +125,7 @@ class fetchit(Thread): def start(self,place_in_the_list): self.place_in_the_list=place_in_the_list Thread.start(self) - def run(self): - global exit_flag -# self.download_file_url_list_file=open(idfetch_settings.TASK_DIR+'/urls/'+self.download_file+".urllist") -# self.download_file_url_list=pickle.load(self.download_file_url_list_file) -# self.download_file_url_list_file.close() + def fetch(self): self.url_list=list(self.distfile['url_list']) self.trials_log_file= open (self.trials_log_file_name,"w") self.tries_counter=0 @@ -155,7 +151,13 @@ class fetchit(Thread): progress_msg(self.place_in_the_list,self.line.rstrip()) self.igot = re.findall('saved',self.line) if self.igot: - self.status = 0 + self.verified_ok, self.reason = verify_all( + self.myfile_path, self.distfile) + if not self.verified_ok: + sms("ERROR: while checking hash for "+self.myfile_path+" Refetching...") + os.unlink(self.myfile_path) + else: + self.status=0 self.wget_subprocess.wait() break else: @@ -180,8 +182,265 @@ class fetchit(Thread): self.msg_text="[ERROR+LIST_IS_EMPTY "+str(self.index)+" size:"+str(self.distfile['size'])+"b ]: "+self.current_url self.trials_log_file.write(self.msg_text+"\n") download_msg(self.place_in_the_list,self.msg_text) - self.trials_log_file.close() + + def run(self): + sms("started "+str(self.index)) + global exit_flag +# self.download_file_url_list_file=open(idfetch_settings.TASK_DIR+'/urls/'+self.download_file+".urllist") +# self.download_file_url_list=pickle.load(self.download_file_url_list_file) +# self.download_file_url_list_file.close() + self.myfile_path=idfetch_settings.DIST_DIR+"/"+self.distfile['name'] +# for myfile in filedict: +# """ +# fetched status +# 0 nonexistent +# 1 partially downloaded +# 2 completely downloaded +# """ +# fetched = 0 + +# orig_digests = mydigests.get(myfile, {}) +# size = orig_digests.get("size") + +# if size == 0: + # Zero-byte distfiles are always invalid, so discard their digests. +# del mydigests[myfile] +# orig_digests.clear() +# size = None +# pruned_digests = orig_digests +# if parallel_fetchonly: +# pruned_digests = {} +# if size is not None: +# pruned_digests["size"] = size + +# myfile_path = os.path.join(mysettings["DISTDIR"], myfile) +# has_space = True +# has_space_superuser = True +# file_lock = None +# if listonly: +# writemsg_stdout("\n", noiselevel=-1) +# else: +# # check if there is enough space in DISTDIR to completely store myfile + # overestimate the filesize so we aren't bitten by FS overhead +# if size is not None and hasattr(os, "statvfs"): +# vfs_stat = os.statvfs(mysettings["DISTDIR"]) +# try: +# mysize = os.stat(myfile_path).st_size +# except OSError as e: +# if e.errno not in (errno.ENOENT, errno.ESTALE): +# raise +# del e +# mysize = 0 +# if (size - mysize + vfs_stat.f_bsize) >= \ +# (vfs_stat.f_bsize * vfs_stat.f_bavail): + +# if (size - mysize + vfs_stat.f_bsize) >= \ +# (vfs_stat.f_bsize * vfs_stat.f_bfree): +# has_space_superuser = False + +# if not has_space_superuser: +# has_space = False +# elif secpass < 2: +# has_space = False +# elif userfetch: +# has_space = False + +# if not has_space: +# writemsg(_("!!! Insufficient space to store %s in %s\n") % \ +# (myfile, mysettings["DISTDIR"]), noiselevel=-1) + +# if has_space_superuser: +# writemsg(_("!!! Insufficient privileges to use " +# "remaining space.\n"), noiselevel=-1) +# if userfetch: +# writemsg(_("!!! You may set FEATURES=\"-userfetch\"" +# " in /etc/make.conf in order to fetch with\n" +# "!!! superuser privileges.\n"), noiselevel=-1) + +# if distdir_writable and use_locks: + +# lock_kwargs = {} +# if fetchonly: +# lock_kwargs["flags"] = os.O_NONBLOCK + +# try: +# file_lock = lockfile(myfile_path, +# wantnewlockfile=1, **lock_kwargs) +# except TryAgain: +# writemsg(_(">>> File '%s' is already locked by " +# "another fetcher. Continuing...\n") % myfile, +# noiselevel=-1) +# continue +# try: +# if not listonly: +# +# eout = EOutput() +# eout.quiet = mysettings.get("PORTAGE_QUIET") == "1" +# match, mystat = _check_distfile( +# myfile_path, pruned_digests, eout) +# if match: +# if distdir_writable: +# try: +# apply_secpass_permissions(myfile_path, +# gid=portage_gid, mode=0o664, mask=0o2, +# stat_cached=mystat) +# except PortageException as e: +# if not os.access(myfile_path, os.R_OK): +# writemsg(_("!!! Failed to adjust permissions:" +# " %s\n") % str(e), noiselevel=-1) +# del e +# continue + +# if distdir_writable and mystat is None: +# # Remove broken symlinks if necessary. +# try: +# os.unlink(myfile_path) +# except OSError: +# pass + +# if mystat is not None: +# if stat.S_ISDIR(mystat.st_mode): +# writemsg_level( +# _("!!! Unable to fetch file since " +# "a directory is in the way: \n" +# "!!! %s\n") % myfile_path, +# level=logging.ERROR, noiselevel=-1) +# return 0 + +# if mystat.st_size == 0: +# if distdir_writable: +# try: +# os.unlink(myfile_path) +# except OSError: +# pass +# elif distdir_writable: +# if mystat.st_size < fetch_resume_size and \ +# mystat.st_size < size: +# # If the file already exists and the size does not +# # match the existing digests, it may be that the +# # user is attempting to update the digest. In this +# # case, the digestgen() function will advise the +# # user to use `ebuild --force foo.ebuild manifest` +# # in order to force the old digests to be replaced. +# # Since the user may want to keep this file, rename +# # it instead of deleting it. +# writemsg(_(">>> Renaming distfile with size " +# "%d (smaller than " "PORTAGE_FETCH_RESU" +# "ME_MIN_SIZE)\n") % mystat.st_size) +# temp_filename = \ +# _checksum_failure_temp_file( +# mysettings["DISTDIR"], myfile) +# writemsg_stdout(_("Refetching... " +# "File renamed to '%s'\n\n") % \ +# temp_filename, noiselevel=-1) +# elif mystat.st_size >= size: +# temp_filename = \ +# _checksum_failure_temp_file( +# mysettings["DISTDIR"], myfile) +# writemsg_stdout(_("Refetching... " +# "File renamed to '%s'\n\n") % \ +# temp_filename, noiselevel=-1) + +# if distdir_writable and ro_distdirs: +# readonly_file = None +# for x in ro_distdirs: +# filename = os.path.join(x, myfile) +# match, mystat = _check_distfile( +# filename, pruned_digests, eout) +# if match: +# readonly_file = filename +# break +# if readonly_file is not None: +# try: +# os.unlink(myfile_path) +# except OSError as e: +# if e.errno not in (errno.ENOENT, errno.ESTALE): +# raise +# del e +# os.symlink(readonly_file, myfile_path) +# continue + +# if fsmirrors and not os.path.exists(myfile_path) and has_space: +# for mydir in fsmirrors: +# mirror_file = os.path.join(mydir, myfile) +# try: +# shutil.copyfile(mirror_file, myfile_path) +# writemsg(_("Local mirror has file: %s\n") % myfile) +# break +# except (IOError, OSError) as e: +# if e.errno not in (errno.ENOENT, errno.ESTALE): +# raise +# del e + if 1==1: + try: + self.mystat = os.stat(self.myfile_path) + except OSError as e: +# if e.errno not in (errno.ENOENT, errno.ESTALE): +# raise + debug("cant get mystat for file: "+self.myfile_path) + del e + self.fetch() + else: +# try: +# apply_secpass_permissions( +# self.myfile_path, gid=portage_gid, mode=0o664, mask=0o2, +# stat_cached=self.mystat) +# except PortageException as e: +# if not os.access(self.myfile_path, os.R_OK): +# writemsg(_("!!! Failed to adjust permissions:" +# " %s\n") % str(e), noiselevel=-1) + if 1==1: + # If the file is empty then it's obviously invalid. Remove + # the empty file and try to download if possible. +# if mystat.st_size == 0: +# if distdir_writable: +# try: +# os.unlink(myfile_path) +# except EnvironmentError: +# pass +# elif myfile not in mydigests: + # We don't have a digest, but the file exists. We must + # assume that it is fully downloaded. +# continue +# else: + if self.mystat.st_size < self.distfile["size"]: +# and \ +# not restrict_fetch: + self.fetched = 1 # Try to resume this download. +# elif parallel_fetchonly and \ +# mystat.st_size == mydigests[myfile]["size"]: +# eout = EOutput() +# eout.quiet = \ +# mysettings.get("PORTAGE_QUIET") == "1" +# eout.ebegin( +# "%s size ;-)" % (myfile, )) +# eout.eend(0) +# continue + debug("size too small, continue fetch") + self.fetch() + else: + self.verified_ok, self.reason = verify_all( + self.myfile_path, self.distfile) + if not self.verified_ok: + sms("ERROR: while checking hash for "+self.myfile_path+" Refetching...") + os.unlink(self.myfile_path) + self.fetch() + else: +# eout = EOutput() +# eout.quiet = \ +# mysettings.get("PORTAGE_QUIET", None) == "1" +# digests = mydigests.get(myfile) +# if digests: +# digests = list(digests) +# digests.sort() +# eout.ebegin( +# "%s %s ;-)" % (myfile, " ".join(digests))) +# eout.eend(0) +# continue # fetch any remaining files + self.status=0 + + def get_place_in_the_list(self): return self.place_in_the_list def do_tasks(idfetch_pkg_list): @@ -219,10 +478,23 @@ def do_tasks(idfetch_pkg_list): # distfiles_list_file.close() for current_distfile in current_pkg['distfile_list']: # print("distfile",current_distfile['name']) - index+=1 - total_size+=current_distfile['size'] - current_fetch_distfile_thread = fetchit(current_distfile, index) - fetch_distfile_thread_list.append(current_fetch_distfile_thread) + + #some files may have duplicates, let's skip them + no_duplicate=1 + for cur_fetch_distfile_thread in fetch_distfile_thread_list: + if current_distfile['name']==cur_fetch_distfile_thread.distfile['name']: + no_duplicate=0 + break + if no_duplicate: + #create new thread because there is no duplicate + index+=1 + debug("============================================") + debug("pkg: "+current_pkg['pkg_name']) + debug("name: "+current_distfile['name']) + debug("size: "+str(current_distfile['size']/1000)) + total_size+=current_distfile['size'] + current_fetch_distfile_thread = fetchit(current_distfile, index) + fetch_distfile_thread_list.append(current_fetch_distfile_thread) finished_downloads=0 place_in_the_list=0 @@ -307,8 +579,8 @@ def cleanup(): if __name__ == '__main__': exit_flag=0 task_list=[] - # mypids will hold the pids of all processes created. - mypids = [] + # mypids will hold the pids of all processes created. + mypids = [] if USE_CURSES_FLAG: stdscr = curses.initscr() max_y,max_x=stdscr.getmaxyx() -- cgit v1.2.3-65-gdbad