#!/usr/bin/env python # # This file is part of oh-my-tuna # Copyright (c) 2018 oh-my-tuna's authors # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import subprocess import os import errno import argparse import re import platform from contextlib import contextmanager try: input = raw_input except NameError: pass try: import configparser except ImportError: import ConfigParser as configparser mirror_root = "mirrors.tuna.tsinghua.edu.cn" host_name = "tuna.tsinghua.edu.cn" always_yes = False verbose = False is_global = True os_release_regex = re.compile(r"^ID=\"?([^\"\n]+)\"?$", re.M) @contextmanager def cd(path): old_cwd = os.getcwd() os.chdir(path) try: yield finally: os.chdir(old_cwd) def sh(command): try: if verbose: print('$ %s' % command) if isinstance(command, list): command = ' '.join(command) return subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT).decode('utf-8').rstrip() except Exception as e: return None def user_prompt(): global always_yes if always_yes: return True ans = input('Do you wish to proceed(y/n/a):') if ans == 'a': always_yes = True return ans != 'n' def ask_if_change(name, expected, command_read, command_set): current = sh(command_read) if current != expected: print('%s Before:' % name) print(current) print('%s After:' % name) print(expected) if user_prompt(): sh(command_set) print('Command %s succeeded' % command_set) return True else: return False else: print('%s is already configured to TUNA mirrors' % name) return True def get_linux_distro(): os_release = sh('cat /etc/os-release') if not os_release: return None match = re.findall(os_release_regex, os_release) if len(match) != 1: return None return match[0] def set_env(key, value): shell = os.environ.get('SHELL').split('/')[-1] if shell == 'bash' or shell == 'sh': with open(os.path.expanduser('~/.profile'), 'a') as f: f.write('export %s=%s\n' % (key, value)) elif shell == 'zsh': with open(os.path.expanduser('~/.zprofile'), 'a') as f: f.write('export %s=%s\n' % (key, value)) else: print('Please set %s=%s' % (key, value)) def remove_env(key): shell = os.environ.get('SHELL').split('/')[-1] if shell == 'bash' or shell == 'sh': pattern = "^export %s=" % key profile = "~/.profile" elif shell == 'zsh': pattern = "^export %s=" % key profile = "~/.zprofile" if pattern: profile = os.path.expanduser(profile) if platform.system() == 'Darwin': # TODO: More BSD systems sed = ['sed', '-i', "", "/%s/d" % pattern, profile] else: sed = ['sed', '-i', "/%s/d" % pattern, profile] sh(sed) return True else: print('Please remove environment variable %s' % key) return False def mkdir_p(path): try: os.makedirs(path) except OSError as exc: if exc.errno == errno.EEXIST and os.path.isdir(path): pass else: raise class Base(object): """ Name of this mirror/module """ @staticmethod def name(): raise NotImplementedError """ Returns whether this mirror is applicable """ @staticmethod def is_applicable(): return False """ Returns whether this mirror is already up """ @staticmethod def is_online(): raise NotImplementedError """ Activate this mirror Returns True if this operation is completed, False otherwise Caller should never invoke this method when is_online returns True """ @staticmethod def up(): raise NotImplementedError """ Deactivate this mirror Returns True if this operation is completed, False otherwise Caller should never invoke this method when is_online returns False """ @staticmethod def down(): raise NotImplementedError """ Print a log entry with the name of this mirror/module """ @classmethod def log(cls, msg, level='i'): levels = "viodwe" # verbose, info, ok, debug, warning, error assert level in levels global verbose if level == 'v' and verbose: return color_prefix = { 'v': '', 'i': '', 'o': '\033[32m', 'd': '\033[34m', 'w': '\033[33m', 'e': '\033[31m' } if color_prefix[level]: color_suffix = '\033[0m' else: color_suffix = '' print('%s[%s]: %s%s' % (color_prefix[level], cls.name(), msg, color_suffix)) class Pypi(Base): mirror_url = 'https://pypi.%s/simple' % host_name """ Reference: https://pip.pypa.io/en/stable/user_guide/#configuration """ @staticmethod def config_files(): system = platform.system() if system == 'Darwin': return ('$HOME/Library/Application Support/pip/pip.conf', '$HOME/.pip/pip.conf') elif system == 'Windows': return (r'%APPDATA%\pip\pip.ini', r'~\pip\pip.ini') elif system == 'Linux': return ('$HOME/.config/pip/pip.conf', '$HOME/.pip/pip.conf') @staticmethod def name(): return "pypi" @staticmethod def is_applicable(): global is_global if is_global: # Skip if in global mode return False return sh('pip') is not None or sh('pip3') is not None @staticmethod def is_online(): pattern = re.compile(r' *index-url *= *%s' % Pypi.mirror_url) config_files = Pypi.config_files() for conf_file in config_files: if not os.path.exists(os.path.expandvars(conf_file)): continue with open(os.path.expandvars(conf_file)) as f: for line in f: if pattern.match(line): return True return False @staticmethod def up(): config_file = os.path.expandvars(Pypi.config_files()[0]) config = configparser.ConfigParser() if os.path.exists(config_file): config.read(config_file) if not config.has_section('global'): config.add_section('global') if not os.path.isdir(os.path.dirname(config_file)): mkdir_p(os.path.dirname(config_file)) config.set('global', 'index-url', Pypi.mirror_url) with open(config_file, 'w') as f: config.write(f) return True @staticmethod def down(): config_files = map(os.path.expandvars, Pypi.config_files()) config = configparser.ConfigParser() for path in config_files: if not os.path.exists(path): continue config.read(path) try: if config.get('global', 'index-url') == Pypi.mirror_url: config.remove_option('global', 'index-url') with open(path, 'w') as f: config.write(f) except (configparser.NoOptionError, configparser.NoSectionError): pass return True class ArchLinux(Base): @staticmethod def name(): return 'Arch Linux' @staticmethod def is_applicable(): global is_global if not is_global: return False return os.path.isfile( '/etc/pacman.d/mirrorlist') and get_linux_distro() == 'arch' @staticmethod def is_online(): mirror_re = re.compile( r" *Server *= *(http|https)://%s/archlinux/\$repo/os/\$arch\n" % mirror_root, re.M) ml = open('/etc/pacman.d/mirrorlist', 'r') lines = ml.readlines() result = map(lambda l: re.match(mirror_re, l), lines) result = any(result) ml.close() return result @staticmethod def up(): # Match commented or not mirror_re = re.compile( r" *(# *)?Server *= *(http|https)://%s/archlinux/\$repo/os/\$arch\n" % mirror_root, re.M) banner = '# Generated and managed by the awesome oh-my-tuna\n' target = "Server = https://%s/archlinux/$repo/os/$arch\n\n" % mirror_root print( 'This operation will insert the following line into the beginning of your pacman mirrorlist:\n%s' % target[:-2]) if not user_prompt(): return False ml = open('/etc/pacman.d/mirrorlist', 'r') lines = ml.readlines() # Remove all lines = filter(lambda l: re.match(mirror_re, l) is None, lines) # Remove banner lines = filter(lambda l: l != banner, lines) # Finish reading lines = list(lines) # Remove padding newlines k = 0 while k < len(lines) and lines[k] == '\n': k += 1 ml.close() ml = open('/etc/pacman.d/mirrorlist', 'w') # Add target ml.write(banner) ml.write(target) ml.writelines(lines[k:]) ml.close() return True @staticmethod def down(): print( 'This action will comment out TUNA mirrors from your pacman mirrorlist, if there is any.' ) if not user_prompt(): return False # Simply remove all matched lines mirror_re = re.compile( r" *Server *= *(http|https)://%s/archlinux/\$repo/os/\$arch\n" % mirror_root, re.M) ml = open('/etc/pacman.d/mirrorlist', 'r') lines = ml.readlines() lines = list( map(lambda l: l if re.match(mirror_re, l) is None else '# ' + l, lines)) ml.close() ml = open('/etc/pacman.d/mirrorlist', 'w') ml.writelines(lines) ml.close() return True class Homebrew(Base): @staticmethod def name(): return 'Homebrew' @staticmethod def is_applicable(): global is_global if not is_global: return False return sh('brew --repo') is not None @staticmethod def is_online(): repo = sh('brew --repo') with cd(repo): repo_online = sh('git remote get-url origin' ) == 'https://%s/git/homebrew/brew.git' % mirror_root if repo_online: return os.environ.get('HOMEBREW_BOTTLE_DOMAIN') == 'https://%s/homebrew-bottles' % mirror_root return False @staticmethod def up(): repo = sh('brew --repo') with cd(repo): ask_if_change( 'Homebrew repo', 'https://%s/git/homebrew/brew.git' % mirror_root, 'git remote get-url origin', 'git remote set-url origin https://%s/git/homebrew/brew.git' % mirror_root) for tap in ('homebrew-core', 'homebrew-python', 'homebrew-science'): tap_path = '%s/Library/Taps/homebrew/%s' % (repo, tap) if os.path.isdir(tap_path): with cd(tap_path): ask_if_change( 'Homebrew tap %s' % tap, 'https://%s/git/homebrew/%s.git' % (mirror_root, tap), 'git remote get-url origin', 'git remote set-url origin https://%s/git/homebrew/%s.git' % (mirror_root, tap)) set_env('HOMEBREW_BOTTLE_DOMAIN', 'https://%s/homebrew-bottles' % mirror_root) return True @staticmethod def down(): repo = sh('brew --repo') with cd(repo): sh('git remote set-url origin https://github.com/homebrew/brew.git' ) for tap in ('homebrew-core', 'homebrew-python', 'homebrew-science'): tap_path = '%s/Library/Taps/homebrew/%s' % (repo, tap) if os.path.isdir(tap_path): with cd(tap_path): sh('git remote set-url origin https://github.com/homebrew/%s.git' % tap) sh('git remote get-url origin' ) == 'https://github.com/homebrew/brew.git' return remove_env('HOMEBREW_BOTTLE_DOMAIN') class CTAN(Base): @staticmethod def name(): return 'CTAN' @staticmethod def is_applicable(): # Works both in global mode or local mode return sh('tlmgr --version') is not None @staticmethod def is_online(): global is_global base = "tlmgr" if not is_global: # Setup usertree first sh("tlmgr init-usertree") base += " --usermode" return sh( '%s option repository' % base ) == 'Default package repository (repository): https://mirrors.tuna.tsinghua.edu.cn/CTAN/systems/texlive/tlnet' @staticmethod def up(): global is_global base = "tlmgr" if not is_global: base += " --usermode" return ask_if_change( 'CTAN mirror', 'Default package repository (repository): https://mirrors.tuna.tsinghua.edu.cn/CTAN/systems/texlive/tlnet', '%s option repository' % base, '%s option repository https://mirrors.tuna.tsinghua.edu.cn/CTAN/systems/texlive/tlnet' % base ) class Anaconda(Base): url_free = 'https://%s/anaconda/pkgs/free/' % mirror_root url_main = 'https://%s/anaconda/pkgs/main/' % mirror_root @staticmethod def name(): return "Anaconda" @staticmethod def is_applicable(): # Works both in global mode and local mode return sh('conda -V') is not None @staticmethod def is_online(): cmd = 'conda config --get channels' global is_global if is_global: cmd += ' --system' channels = sh(cmd).split('\n') in_channels = 0 for line in channels: if Anaconda.url_free in line: in_channels += 1 elif Anaconda.url_main in line: in_channels += 1 return in_channels == 2 @staticmethod def up(): basecmd = 'conda config' global is_global if is_global: basecmd += ' --system' sh ("%s --add channels %s" % (basecmd, Anaconda.url_free)) sh ("%s --add channels %s" % (basecmd, Anaconda.url_main)) return True @staticmethod def down(): basecmd = 'conda config' global is_global if is_global: basecmd += ' --system' sh ("%s --remove channels %s" % (basecmd, Anaconda.url_free)) sh ("%s --remove channels %s" % (basecmd, Anaconda.url_main)) return True class Debian(Base): pools = "main contrib non-free" default_sources = { 'http://deb.debian.org/debian': ['', '-updates'], 'http://security.debian.org/debian-security': ['main', 'contrib', 'non-free'], } @staticmethod def build_mirrorspec(): return { 'https://' + mirror_root + '/debian': ['', '-updates'], 'https://' + mirror_root + '/debian-security': ['/updates'], } @classmethod def build_template(cls, mirrorspecs): release = sh('lsb_release -sc') lines = ['%s %s %s%s %s\n' % (repoType, mirror, release, repo, cls.pools) for mirror in mirrorspecs for repo in mirrorspecs[mirror] for repoType in ['deb', 'deb-src']] tmpl = ''.join(lines) return tmpl @staticmethod def name(): return 'Debian' @staticmethod def is_applicable(): global is_global if not is_global: return False return os.path.isfile( '/etc/apt/sources.list') and get_linux_distro() == 'debian' @classmethod def is_online(cls): with open('/etc/apt/sources.list', 'r') as sl: content = sl.read(); return content == cls.build_template(cls.build_mirrorspec()) @classmethod def up(cls): print('This operation will move your current sources.list to sources.oh-my-tuna.bak.list,\n' + \ 'and use TUNA apt source instead.') if not user_prompt(): return False if os.path.isfile('/etc/apt/sources.list'): sh('cp /etc/apt/sources.list /etc/apt/sources.oh-my-tuna.bak.list') with open('/etc/apt/sources.list', 'w') as sl: sl.write(cls.build_template(cls.build_mirrorspec())) return True @classmethod def down(cls): print('This operation will copy sources.oh-my-tuna.bak.list to sources.list if there is one,\n' + \ 'otherwise build a new sources.list with archive.ubuntu.com as its mirror root.') if not user_prompt(): return False if os.path.isfile('/etc/apt/sources.oh-my-tuna.bak.list'): if sh('cp /etc/apt/sources.oh-my-tuna.bak.list /etc/apt/sources.list') is not None: return True with open('/etc/apt/sources.list', 'w') as sl: sl.write(cls.build_template(cls.default_sources)) return True def _get_mirror_suffix(): uname = sh('uname -m') no_suffix_list = ['i386', 'i586', 'i686', 'x86_64', 'amd64'] if any(map(lambda x: x in uname, no_suffix_list)): return '' else: return '-ports' class Ubuntu(Debian): default_sources = { 'http://archive.ubuntu.com/ubuntu' + _get_mirror_suffix(): ['', '-updates', '-security', '-backports'] } pools = "main multiverse universe restricted" @staticmethod def build_mirrorspec(): return { 'https://' + mirror_root + '/ubuntu' + _get_mirror_suffix(): ['', '-updates', '-security', '-backports'], } @staticmethod def name(): return 'Ubuntu' @staticmethod def is_applicable(): global is_global if not is_global: return False return os.path.isfile( '/etc/apt/sources.list') and get_linux_distro() == 'ubuntu' class CentOS(Base): @staticmethod def name(): return 'CentOS' @staticmethod def is_applicable(): global is_global if not is_global: return False return os.path.isfile( '/etc/yum.repos.d/CentOS-Base.repo') and get_linux_distro() == 'centos' @staticmethod def is_online(): mirror_re = re.compile( r"baseurl=https://%s/centos/\$releasever/os/\$basearch/\n" % mirror_root, re.M) ml = open('/etc/yum.repos.d/CentOS-Base.repo', 'r') lines = ml.readlines() result = map(lambda l: re.match(mirror_re, l), lines) result = any(result) ml.close() return result @staticmethod def up(): sh('cp /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.bak') sh(r'sed -i -E s/^#?baseurl=https?:\/\/[^\/]+\/(.*)$/baseurl=https:\/\/%s\/\1/g /etc/yum.repos.d/CentOS-Base.repo' % mirror_root.replace('/', r'\/')) sh(r'sed -i -E s/^(mirrorlist=.*)$/#\1/g /etc/yum.repos.d/CentOS-Base.repo') return True @staticmethod def down(): if os.path.isfile('/etc/yum.repos.d/CentOS-Base.repo.bak'): sh('cp /etc/yum.repos.d/CentOS-Base.repo.bak /etc/yum.repos.d/CentOS-Base.repo') return True sh(r'sed -i -E s/^#(mirrorlist=.*)$/\1/g /etc/yum.repos.d/CentOS-Base.repo') sh(r'sed -i -E s/^(baseurl=.*)$/#\1/g /etc/yum.repos.d/CentOS-Base.repo') return True class AOSCOS(Base): @staticmethod def name(): return 'AOSC OS' @staticmethod def is_applicable(): global is_global if not is_global: return False return os.path.isfile( '/var/lib/apt/gen/status.json') and get_linux_distro() == 'aosc' @staticmethod def is_online(): agl_result = sh('env LC_ALL=C apt-gen-list now') if not agl_result: return None out_re = re.compile(r"mirrors:.* tuna.*", re.M) match = re.findall(out_re, agl_result) return len(match) > 0 @staticmethod def up(): agl_result = sh('env LC_ALL=C apt-gen-list now') if not agl_result: return False out_re = re.compile(r"mirrors: origin$", re.M) match = re.findall(out_re, agl_result) if len(match) > 0: if not sh('env LC_ALL=C apt-gen-list m tuna'): return False else: if not sh('env LC_ALL=C apt-gen-list m +tuna'): return False return True @staticmethod def down(): if not sh('env LC_ALL=C apt-gen-list m -tuna'): return False return True MODULES = [ArchLinux, Homebrew, CTAN, Pypi, Anaconda, Debian, Ubuntu, CentOS, AOSCOS] def main(): parser = argparse.ArgumentParser( description='Use TUNA mirrors everywhere when applicable') parser.add_argument( 'subcommand', nargs='?', metavar='SUBCOMMAND', choices=['up', 'down', 'status'], default='up') parser.add_argument( '-v', '--verbose', help='verbose output', action='store_true') parser.add_argument( '-y', '--yes', help='always answer yes to questions', action='store_true') parser.add_argument( '-g', '--global', dest='is_global', help='apply system-wide changes. This option may affect applicability of some modules.', action='store_true') args = parser.parse_args() global verbose verbose = args.verbose global always_yes always_yes = args.yes global is_global is_global = args.is_global if args.subcommand == 'up': for m in MODULES: if m.is_applicable(): if not m.is_online(): m.log('Activating...') try: result = m.up() if not result: m.log('Operation canceled', 'w') else: m.log('Mirror has been activated', 'o') except NotImplementedError: m.log( 'Mirror doesn\'t support activation. Please activate manually' , 'e') if args.subcommand == 'down': for m in MODULES: if m.is_applicable(): if m.is_online(): m.log('Deactivating...') try: result = m.down() if not result: m.log('Operation canceled', 'w') else: m.log('Mirror has been deactivated', 'o') except NotImplementedError: m.log( 'Mirror doesn\'t support deactivation. Please deactivate manually' , 'e') if args.subcommand == 'status': for m in MODULES: if m.is_applicable(): if m.is_online(): m.log('Online', 'o') else: m.log('Offline') if __name__ == "__main__": main()