python --m3u8下载和合并 代码如下:
import contextlib import os import tempfile from urllib.parse import ( splittype) _url_tempfiles = [] from urllib import request import requests def Myurlretrieve(url, filename=None, reporthook=None, data=None, mode="ab"): """ Retrieve a URL into a temporary location on disk. Requires a URL argument. If a filename is passed, it is used as the temporary file location. The reporthook argument should be a callable that accepts a block number, a read size, and the total file size of the URL target. The data argument should be valid URL encoded data. If a filename is passed and the URL points to a local resource, the result is a copy from local file to new file. Returns a tuple containing the path to the newly created data file as well as the resulting HTTPMessage object. """ url_type, path = splittype(url) with contextlib.closing(request.urlopen(url, data)) as fp: headers = fp.info() # Just return the local path and the "headers" for file:// # URLs. No sense in performing a copy unless requested. if url_type == "file" and not filename: return os.path.normpath(path), headers # Handle temporary file setup. if filename: tfp = open(filename, mode) else: tfp = tempfile.NamedTemporaryFile(delete=False) filename = tfp.name _url_tempfiles.append(filename) with tfp: result = filename, headers bs = 1024 * 8 size = -1 read = 0 blocknum = 0 if "content-length" in headers: size = int(headers["Content-Length"]) if reporthook: reporthook(blocknum, bs, size) while True: block = fp.read(bs) if not block: break read += len(block) tfp.write(block) blocknum += 1 if reporthook: reporthook(blocknum, bs, size) if size >= 0 and read < size: pass # raise ContentTooShortError( # "retrieval incomplete: got only %i out of %i bytes" # % (read, size), result) return result class Url_ts(): """ 通过一条url获取m3u8文件并下载其ts文件合并到.MP4 """ def __init__(self,url,filename="defualt.mp4"): """ :param url: 下载m3u8文件地址 :param filename: 文件名如."defualt.mp4" """ self.url=url self.server=self.url.rsplit("/", 2)[0] + "/" download_path = os.getcwd() +r"\download/" if not os.path.exists(download_path): os.mkdir(download_path) self.filename=download_path+filename def getVideoTsUrl(self): """ 通过url 下载m3u8文件,读取m3u8文件获取.ts文件链接 :param url: 下载m3u8文件的地址 :return: yeild :ts文件地址 """ all_content = requests.get(url=self.url).text # all_content = requests.get(url).text # 获取M3U8的文件内容 file_line = all_content.split("\n") # 读取文件里的每一行 # 通过判断文件头来确定是否是M3U8文件 if file_line[0] != "#EXTM3U": raise BaseException(u"非M3U8的链接") else: unknow = True # 用来判断是否找到了下载的地址 for index, line in enumerate(file_line): if "EXT" in line: unknow = False else: if line != "": pd_url =self.server + line yield pd_url if unknow: raise BaseException("未找到对应的下载链接") def download_ts(self): """ 下载ts文件 :return: """ for url in self.getVideoTsUrl(): print(url) Myurlretrieve(url=url, filename=self.filename, reporthook=self.Schedule, mode="ab") def Schedule(self,a, b, c): """ 进度条 :param a: :param b: :param c: :return: """ per = 100.0 * a * b / c if per > 100: per = 1 print(" " + "%.2f%% 已经下载的大小:%ld 文件大小:%ld" % (per, a * b, c) + '\r') def run(self): self.download_ts() if __name__ == '__main__': ts=Url_ts(url="https://vip.yingshidaqian.com/ppvod/209D602D37968D0C6A792E46491B9672.m3u8") ts.run()执行效果如下:
用
with open(download_path + "\\" + "1.mp4", 'ab') as f: f.write(res.content) f.flush()写入会有卡顿