1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
| from datetime import datetime from time import sleep from sys import exit,argv from requests import get from PyQt5.QtWidgets import QApplication, QWidget, QPushButton, QLineEdit, QLabel, QDesktopWidget, QProgressBar from PyQt5.QtCore import pyqtSlot from concurrent.futures import ThreadPoolExecutor from urllib.parse import urljoin from Crypto.Cipher import AES import os from threading import Thread from json import dump,loads from subprocess import call
class App(QWidget):
def __init__(self): super().__init__() self.XSHOW()
def center(self): qr = self.frameGeometry() cp = QDesktopWidget().availableGeometry().center() qr.moveCenter(cp) self.move(qr.topLeft())
def XSHOW(self): self.IGUI() self.resize(600,90) self.center() self.setWindowTitle("xiaofang") self.show()
def IGUI(self): self.url_textbox = QLineEdit(self) self.url_textbox.move(20, 10) self.url_textbox.resize(480, 20)
self.download_button = QPushButton('下载', self) self.download_button.move(520, 10) self.download_button.resize(60, 20) self.download_button.clicked.connect(self.on_click)
self.pbar = QProgressBar(self) self.pbar.setGeometry(20, 40, 510, 20)
self.log_label = QLabel(self) self.log_label.move(20, 65) self.log_label.setText("请输入M3U8链接") self.log_label.resize(560, 20)
def del_dir(self,dir): if os.path.exists(dir): for root, dirs, files in os.walk(dir): for file in files: os.remove(os.path.join(root, file)) for dir in dirs: os.rmdir(os.path.join(root, dir)) def analysis(self,url): text = get(url).text urllist = [] keyurl = None for line in text.split("\n"): if line=="":continue if "DISCONTINUITY"in line:break if "ENDLIST"in line:break if "#EXT-X-KEY"in line and "URI" in line: keyurl = urljoin(url,line.split('"')[-2]) if "#"not in line: if "http"in line:urllist.append(line) else:urllist.append(urljoin(url,line)) return urllist,keyurl
def downloadTS(self,url,key,filepath): self.print(f"正在下载: {url}") while True: try:content = get(url,timeout=5).content;break except:continue os.makedirs("./TS/",exist_ok=True) with open(filepath,"wb") as f: if key:f.write(AES.new(key,AES.MODE_CBC,key).decrypt(content)) else:f.write(content)
def validationTS(self,urllist,filelist): errorlist = [] for url,file in zip(urllist,filelist): if not os.path.exists(file): errorlist.append(url) elif (os.path.getsize(file)/1024)<10: if file!=filelist[-1]: errorlist.append(url) self.print("尺寸过小:{}".format(os.path.getsize(file))) if len(errorlist)>0:self.print("缺少{}个TS文件".format(len(errorlist))) return errorlist
def mergeTS(self,filelist): self.print("正在合并TS文件") mp4name = datetime.now().strftime("%Y%m%d%H%M%S") with open(f"./TS/{mp4name}.txt","w+",encoding="utf-8") as f: [f.write("file '{}'\n".format(i.split("/")[-1]))for i in filelist] os.makedirs("./MP4/",exist_ok=True) cmd = f"ffmpeg -f concat -safe 0 -i ./TS/{mp4name}.txt -c copy ./MP4/{mp4name}.mp4 -loglevel quiet -n" try:r = call(cmd,creationflags=0x08000000) except:return 1 if r==0:return mp4name else:return False
def downloadm3u8(self,m3u8url): self.del_dir("./TS/") urllist,keyurl = self.analysis(m3u8url) if len(urllist)==0:return key = get(keyurl).content if keyurl else None filelist = ["./TS/"+i.split("/")[-1] for i in urllist] os.makedirs("./TS/",exist_ok=True) dump(filelist,open("./TS/filelist.json","w",encoding="utf-8"),ensure_ascii=False,indent=4) with ThreadPoolExecutor(max_workers=80) as pool: [pool.submit(self.downloadTS,url,key,filepath) for url,filepath in zip(urllist,filelist)] while True: errorlist = self.validationTS(urllist,filelist) if errorlist:[self.downloadTS(url,key,filepath) for url,filepath in zip(errorlist,filelist)] else:break mp4name = self.mergeTS(filelist) if mp4name:self.print("下载完成 {}".format(os.path.abspath("./MP4/{}.mp4".format(mp4name)))) elif mp4name==1:self.print("未识别到ffmpeg") else:self.print("下载失败") self.del_dir("./TS/")
def print(self,*args): print(*args) self.log_label.setText(*args)
def state(self): while True: try:filelist = loads(open("./TS/filelist.json","r",encoding="utf-8").read());break except:sleep(1);continue self.step = 0 self.pbar.setValue(self.step) count = len(filelist) while True: for i in filelist: if os.path.exists(i):filelist.remove(i) self.step = round((count-len(filelist))/count*100,0) self.pbar.setValue(int(self.step)) if self.step==100:break self.pbar.setValue(100)
@pyqtSlot() def on_click(self): self.del_dir("./TS/") url = self.url_textbox.text() if url==""or "m3u8"not in url:self.print("请输入M3U8链接");return Thread(target=self.downloadm3u8, args=(url,)).start() Thread(target=self.state, args=()).start()
if __name__ == '__main__': app = QApplication(argv) ex = App() exit(app.exec_())
|