
| from datetime import datetime from time import sleep from sys import exit,argv from requests import get from PyQt5.QtWidgets import QApplication, QWidget, QPushButton, QLineEdit, QLabel, QDesktopWidget, QProgressBar from PyQt5.QtCore import pyqtSlot from concurrent.futures import ThreadPoolExecutor from urllib.parse import urljoin from Crypto.Cipher import AES import os from threading import Thread from json import dump,loads from subprocess import call
class App(QWidget):
def __init__(self): super().__init__() self.XSHOW()
def center(self): qr = self.frameGeometry() cp = QDesktopWidget().availableGeometry().center() qr.moveCenter(cp) self.move(qr.topLeft())
def XSHOW(self): self.IGUI() self.resize(600,90) self.center() self.setWindowTitle("xiaofang") self.show()
def IGUI(self): self.url_textbox = QLineEdit(self) self.url_textbox.move(20, 10) self.url_textbox.resize(480, 20)
self.download_button = QPushButton('下载', self) self.download_button.move(520, 10) self.download_button.resize(60, 20) self.download_button.clicked.connect(self.on_click)
self.pbar = QProgressBar(self) self.pbar.setGeometry(20, 40, 510, 20)
self.log_label = QLabel(self) self.log_label.move(20, 65) self.log_label.setText("请输入M3U8链接") self.log_label.resize(560, 20)
def del_dir(self,dir): if os.path.exists(dir): for root, dirs, files in os.walk(dir): for file in files: os.remove(os.path.join(root, file)) for dir in dirs: os.rmdir(os.path.join(root, dir)) def analysis(self,url): text = get(url).text urllist = [] keyurl = None for line in text.split("\n"): if line=="":continue if "DISCONTINUITY"in line:break if "ENDLIST"in line:break if "#EXT-X-KEY"in line and "URI" in line: keyurl = urljoin(url,line.split('"')[-2]) if "#"not in line: if "http"in line:urllist.append(line) else:urllist.append(urljoin(url,line)) return urllist,keyurl
def downloadTS(self,url,key,filepath): self.print(f"正在下载: {url}") while True: try:content = get(url,timeout=5).content;break except:continue os.makedirs("./TS/",exist_ok=True) with open(filepath,"wb") as f: if key:f.write(AES.new(key,AES.MODE_CBC,key).decrypt(content)) else:f.write(content)
def validationTS(self,urllist,filelist): errorlist = [] for url,file in zip(urllist,filelist): if not os.path.exists(file): errorlist.append(url) elif (os.path.getsize(file)/1024)<10: if file!=filelist[-1]: errorlist.append(url) self.print("尺寸过小:{}".format(os.path.getsize(file))) if len(errorlist)>0:self.print("缺少{}个TS文件".format(len(errorlist))) return errorlist
def mergeTS(self,filelist): self.print("正在合并TS文件") mp4name = datetime.now().strftime("%Y%m%d%H%M%S") with open(f"./TS/{mp4name}.txt","w+",encoding="utf-8") as f: [f.write("file '{}'\n".format(i.split("/")[-1]))for i in filelist] os.makedirs("./MP4/",exist_ok=True) cmd = f"ffmpeg -f concat -safe 0 -i ./TS/{mp4name}.txt -c copy ./MP4/{mp4name}.mp4 -loglevel quiet -n" try:r = call(cmd,creationflags=0x08000000) except:return 1 if r==0:return mp4name else:return False
def downloadm3u8(self,m3u8url): self.del_dir("./TS/") urllist,keyurl = self.analysis(m3u8url) if len(urllist)==0:return key = get(keyurl).content if keyurl else None filelist = ["./TS/"+i.split("/")[-1] for i in urllist] os.makedirs("./TS/",exist_ok=True) dump(filelist,open("./TS/filelist.json","w",encoding="utf-8"),ensure_ascii=False,indent=4) with ThreadPoolExecutor(max_workers=80) as pool: [pool.submit(self.downloadTS,url,key,filepath) for url,filepath in zip(urllist,filelist)] while True: errorlist = self.validationTS(urllist,filelist) if errorlist:[self.downloadTS(url,key,filepath) for url,filepath in zip(errorlist,filelist)] else:break mp4name = self.mergeTS(filelist) if mp4name:self.print("下载完成 {}".format(os.path.abspath("./MP4/{}.mp4".format(mp4name)))) elif mp4name==1:self.print("未识别到ffmpeg") else:self.print("下载失败") self.del_dir("./TS/")
def print(self,*args): print(*args) self.log_label.setText(*args)
def state(self): while True: try:filelist = loads(open("./TS/filelist.json","r",encoding="utf-8").read());break except:sleep(1);continue self.step = 0 self.pbar.setValue(self.step) count = len(filelist) while True: for i in filelist: if os.path.exists(i):filelist.remove(i) self.step = round((count-len(filelist))/count*100,0) self.pbar.setValue(int(self.step)) if self.step==100:break self.pbar.setValue(100)
@pyqtSlot() def on_click(self): self.del_dir("./TS/") url = self.url_textbox.text() if url==""or "m3u8"not in url:self.print("请输入M3U8链接");return Thread(target=self.downloadm3u8, args=(url,)).start() Thread(target=self.state, args=()).start()
if __name__ == '__main__': app = QApplication(argv) ex = App() exit(app.exec_())
|