Bilibili Downloader

依据 API 收集项目 编写

Python Script

  • 版本一
import logging
from base64 import b64encode
from concurrent.futures import ThreadPoolExecutor, as_completed
from os import path, system, unlink
from pprint import pformat
from time import sleep
from re import compile, findall
from sys import stdout
from threading import Thread
from typing import TYPE_CHECKING, Union

import loguru
from matplotlib import pyplot
from qrcode import make
from requests import get, post, head, Session
from requests.exceptions import SSLError
from rsa import PublicKey, encrypt
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait

if TYPE_CHECKING:
    from loguru import Logger
logger: "Logger" = loguru.logger


class Filter:
    def __init__(self) -> None:
        self.level: Union[int, str] = "DEBUG"

    def __call__(self, record):
        module_name: str = "Bilibili"
        record["name"] = module_name.split(".")[0]
        levelno = (
            logger.level(self.level).no if isinstance(self.level, str) else self.level
        )
        return record["level"].no >= levelno


class LoguruHandler(logging.Handler):
    def emit(self, record):
        try:
            level = logger.level(record.levelname).name
        except ValueError:
            level = record.levelno

        frame, depth = logging.currentframe(), 2
        while frame and frame.f_code.co_filename == logging.__file__:
            frame = frame.f_back
            depth += 1

        logger.opt(depth=depth, exception=record.exc_info).log(
            level, record.getMessage()
        )


logger.remove()
default_filter: Filter = Filter()
"""默认日志等级过滤器"""
default_format: str = (
    "<g>{time:MM-DD HH:mm:ss}</g> "
    "[<g><lvl>{level}</lvl></g>] "
    "<c><u>{name}</u></c> | "
    # "<c>{function}:{line}</c>| "
    "{message}"
)
"""默认日志格式"""
logger_id = logger.add(
    stdout,
    level=0,
    colorize=True,
    diagnose=False,
    filter=default_filter,
    format=default_format,
)

__autodoc__ = {"Filter": False, "LoguruHandler": False}
cookie = ''
check = [0, 0]
total = [0, 0]
num = 2


def check_ffmpeg():
    if not path.exists('ffmpeg.exe'):
        logger.error('未在当前环境下发现 ffmpeg 文件...')
        exit(1)


def calc_divisional_range(filesize):
    global num
    step = filesize // num
    arr = list(range(0, filesize, step))
    logger.debug(arr)
    check[0] = len(arr) - 1
    result = []
    for i in range(len(arr)-1):
        s_pos, e_pos = arr[i], arr[i+1]-1
        result.append([s_pos, e_pos])
    result[-1][-1] = filesize-1
    logger.debug(result)
    return result


def range_download(save_name: str, s_pos: int, e_pos: int, url: str):
    global check, total
    headers = {
        "Range": f"bytes={s_pos}-{e_pos}", "Referer": "https://www.bilibili.com",
        "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)'
                      ' Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78'
    }
    res = get(url, headers=headers, stream=True)
    with open(save_name, "rb+") as f:
        f.seek(s_pos)
        for chunk in res.iter_content(chunk_size=64*1024):
            if chunk:
                f.write(chunk)
                total[1] += 64 * 1024
    check[1] += 1


def progress():
    global total
    while total[0] >= total[1]:
        print('\r', end='')
        print(f'Progress {round(total[1] / total[0] * 100, 1)}%', chr(9619) * round(total[1] / total[0] * 100), end='')
        stdout.flush()
        sleep(0.5)


def download(url: str, filename: str):
    global check, total, num
    logger.info(f'开始下载文件:{filename}')
    headers = {
        'Referer': "https://www.bilibili.com",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)"
                      " Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78"
    }
    open(filename, "wb")
    with ThreadPoolExecutor() as p:
        futures = []
        length = int(head(url, headers=headers).headers['Content-Length'])
        total[0] = length
        logger.debug(f'文件大小:{length}')
        for s_pos, e_pos in calc_divisional_range(length):
            logger.debug(f'正在下载块: {s_pos} ~ {e_pos}')
            futures.append(p.submit(range_download, filename, s_pos, e_pos, url))
        futures.append(p.submit(progress))
        as_completed(futures)
    if check[1] != check[0]:
        print()
        check = [0, 0]
        total = [0, 0]
        logger.error('文件下载不完全,正在尝试重新下载...')
        download(url, filename)
    else:
        print()
        check = [0, 0]
        total = [0, 0]
        logger.debug(f'文件 {filename} 下载完毕')


def pass_geetest(gt: str, challenge: str) -> tuple:
    """
    人机验证

    :param gt:
    :param challenge:
    :return:
    """
    geetest_url = 'https://kuresaru.github.io/geetest-validator/'
    driver = webdriver.Chrome()
    driver.get(url=geetest_url)
    driver.find_element(By.ID, 'gt').send_keys(gt)
    driver.find_element(By.ID, 'challenge').send_keys(challenge)
    driver.find_element(By.ID, 'btn-gen').click()
    WebDriverWait(driver, timeout=20).until(
        lambda d: d.find_element(By.CLASS_NAME, "geetest_success_radar_tip_content"))
    driver.find_element(By.CLASS_NAME, 'geetest_radar_tip').click()
    WebDriverWait(driver, timeout=20).until(
        lambda d: d.find_element(By.CLASS_NAME, "geetest_success_radar_tip_content").text == '验证成功')
    driver.find_element(By.ID, 'btn-result').click()
    validate = driver.find_element(By.XPATH, '/html/body/div[6]/input').get_attribute('value')
    seccode = driver.find_element(By.XPATH, '/html/body/div[7]/input').get_attribute('value')
    driver.close()
    return validate, seccode


def gt_challenge() -> tuple:
    """
    获取 geetest 验证码

    :return:
    """
    captcha_url = 'https://passport.bilibili.com/x/passport-login/captcha?source=main_web'
    response = get(captcha_url).json()['data']
    token, gt, challenge = response['token'], response['geetest']['gt'], response['geetest']['challenge']
    validate, seccode = pass_geetest(gt, challenge)
    return token, challenge, validate, seccode


def salt_pubkey(password: str) -> str:
    """
    密码加盐

    :param password:
    :return:
    """
    salt_pubkey_url = 'https://passport.bilibili.com/x/passport-login/web/key'
    response = get(salt_pubkey_url).json()
    salt, key = response['data']['hash'], response['data']['key']
    pubkey = PublicKey.load_pkcs1_openssl_pem(key)
    encrypted_pwd = encrypt((salt + password).encode(), pubkey)
    encrypted_pwd = b64encode(encrypted_pwd).decode()
    return encrypted_pwd


def login_by_pwd(username: str, password: str):
    """
    以账号密码的方式登录

    :param username:
    :param password:
    :return:
    """
    login_by_pwd_url = 'https://passport.bilibili.com/x/passport-login/web/login'
    token, challenge, validate, seccode = gt_challenge()
    data = {
        'username': username,
        'password': salt_pubkey(password),
        'keep': 0,
        'token': token,
        'challenge': challenge,
        'validate': validate,
        'seccode': seccode,
        'go_url': 'https://www.bilibili.com',
        'source': 'main_web'
    }
    logger.debug(pformat(data))
    response = post(login_by_pwd_url, data=data).json()
    return response


def login_by_phone(phone: int, cid=1):
    """
    以手机验证码方式登录

    :param phone:
    :param cid:
    :return:
    """
    send_code_url = 'https://passport.bilibili.com/x/passport-login/web/sms/send'
    login_by_phone_url = 'https://passport.bilibili.com/x/passport-login/web/login/sms'
    token, challenge, validate, seccode = gt_challenge()
    data = {
        'cid': cid,
        'tel': phone,
        'source': 'main_web',
        'token': token,
        'challenge': challenge,
        'validate': validate,
        'seccode': seccode
    }
    response = post(send_code_url, data=data).json()
    logger.debug(pformat(response))
    if int(response['code']) == 0:
        data = {
            'cid': cid,
            'tel': phone,
            'code': int(input("code: ")),
            'source': 'main_web',
            'captcha_key': response['data']['captcha_key'],
            'go_url': 'https://www.bilibili.com',
            'keep': True
        }
        response = post(login_by_phone_url, data=data).json()
        return response
    else:
        return response


def login_by_sacn():
    """
    以 APP 扫描二维码方式登录

    :return:
    """
    get_qrcode_url = 'https://passport.bilibili.com/x/passport-login/web/qrcode/generate'
    response = get(get_qrcode_url).json()['data']
    content, qrcode_key = response['url'], response['qrcode_key']
    img = make(content)
    Thread(target=scan, args=[pyplot, qrcode_key]).start()
    pyplot.imshow(img)
    pyplot.show()


def scan(plt: pyplot, key: str):
    """
    等待扫码,记录 cookie

    :param plt:
    :param key:
    :return:
    """
    scan_url = f'https://passport.bilibili.com/x/passport-login/web/qrcode/poll?qrcode_key={key}'
    times = 0
    logger.info('请使用手机 APP 扫码...')
    while True:
        response = get(scan_url).json()
        status = response['data']['code']
        if status == 86090 and times == 0:
            logger.info('已扫码,请确认登陆...')
            plt.close()
            times += 1
        elif status == 86038:
            logger.error('二维码已过期,请重新申请二维码...')
            exit(0)
        elif status == 0:
            logger.success('登录成功')
            break
    logger.debug(pformat(response))
    session_pattern = compile('SESSDATA=(.*?)&bili_jct')
    open('cookie.txt', 'w').write(findall(session_pattern, response['data']['url'])[0])
    logger.info("已缓存 cookie...")
    test_cookie()


def test_cookie():
    """
    测试缓存 cookie 是否可用

    :return:
    """
    global cookie
    user_space_url = 'https://api.bilibili.com/x/space/myinfo'
    if not path.exists('cookie.txt'):
        logger.error('请先登录以获取 cookie ...')
        login_by_sacn()
    else:
        cache = open('cookie.txt', 'r', encoding='utf-8').read()
        cookies = {
            'SESSDATA': cache
        }
        vip_type = {0: "无", 1: "月度大会员", 2: "年度及以上大会员"}
        response = get(user_space_url, cookies=cookies).json()
        if response['code'] == 0:
            logger.debug('缓存 cookie 可用...')
            vip = vip_type[response["data"]["vip"]["type"]]
            logger.info(f'当前缓存账户:{response["data"]["name"]};VIP:{vip}')
            if vip == "无":
                logger.warning("当前账户非大会员账户,将无法使用更高权限的 API...")
            cookie = cache
            return cache
        else:
            logger.warning('缓存 cookie 失效,请重新登录...')
            login_by_sacn()


def search_by_word(word: str, search_type='video') -> list:
    """
    以关键词搜索,获取 bvid

    :param word:
    :param search_type:
    :return:
    """
    global cookie
    search_url = f'https://api.bilibili.com/x/web-interface/search/type?keyword={word}&search_type={search_type}'
    if not cookie:
        cookie = test_cookie()
    cookies = {"SESSDATA": cookie}
    s = Session()
    response = s.get('https://bilibili.com/')
    while 'Set-cookie' not in response.headers.keys():
        s.get('https://bilibili.com/')
    logger.debug(response.headers['set-cookie'])
    response = s.get(search_url, cookies=cookies).json()
    result: list = response['data']['result']
    format_result = {}
    for i in range(len(result)):
        description = result[i]['description'].replace('\n', '\n\t')
        format_result[i] = f"\nbvid: {result[i]['bvid']}\n"
        format_result[i] += f"title: {result[i]['title']}\n"
        format_result[i] += f"tag: {result[i]['tag']}\n"
        format_result[i] += f"description: \n\t{description}"
        logger.debug(format_result[i])
    return result


def get_title(bvid: str) -> str:
    global cookie
    url = f'https://api.bilibili.com/x/web-interface/view?bvid={bvid}'
    cookies = {
        "SESSDATA": cookie
    }
    status = {
        0: '成功', -400: '请求错误', -403: '权限不足', -404: '无视频', 62002: '稿件不可见', 62004: '稿件审核中'
    }
    response = get(url, cookies=cookies).json()
    if response['code']:
        logger.error(f'当前视频: {bvid} {status[response["code"]]}')
        exit(0)
    return response['data']['title']


def get_parts_info(bvid: str) -> list:
    url = 'https://api.bilibili.com/x/player/pagelist'
    response = get(url, params={'bvid': bvid}).json()
    return response['data']


def get_download_url(bvid: str, part=0) -> dict:
    """
    获取具体的视频音频流 url

    三个测试样例:
        BV1Xe4y1P7xZ
        BV1eV411W7tt
        BV1rR4y1y7fG

    :param part:
    :param bvid:
    :return:
    """
    global cookie
    if not cookie:
        cookie = test_cookie()
    parts = get_parts_info(bvid)
    cookies = {
        "SESSDATA": cookie
    }
    headers = {
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.61',
        'referer': 'https://www.bilibili.com',
        'cookies': f'SESSDATA={cookie}'
    }
    cid = parts[part]['cid']
    player_url = f'https://api.bilibili.com/x/player/playurl?bvid={bvid}&cid={cid}&fnver=0&fnval=2000&fourk=1'
    response = get(player_url, headers=headers, cookies=cookies).json()['data']['dash']
    return extract_url(response)


def extract_url(response: dict) -> dict:
    """
    格式化,提取有用的部分

    :param response:
    :return:
    """

    format_data = {'audio': {}, 'video': {}, 'dolby': {}, 'flac': {}}
    video_quality = {
        6: '12-240P', 16: '11-320P', 32: '10-640P', 64: '09-720P', 74: '08-720P 60', 80: '07-1080P', 112: '06-1080P+',
        116: '05-1080P 60', 120: '04-4K 超清', 125: '03-HDR', 126: '02-杜比视界', 127: '01-8K 超清'
    }
    audio_quality = {
        30216: '05-64K', 30232: '04-132K', 30280: '03-192K', 30250: '02-杜比全景声', 30251: '01-Hi-Res 无损'
    }
    audios, dolby, flac, videos = response['audio'], response['dolby'], response['flac'], response['video']
    for audio in audios:
        if audio_quality[audio['id']] not in format_data['audio'].keys():
            format_data['audio'][audio_quality[audio['id']]] = audio['base_url']
    for video in videos:
        if video_quality[video['id']] not in format_data['video'].keys():
            format_data['video'][video_quality[video['id']]] = video['base_url']
    if dolby['type'] != 0:
        for i in dolby['audio']:
            if audio_quality[i['id']] not in format_data['dolby'].keys():
                format_data['dolby'][audio_quality[i['id']]] = i['base_url']
    if flac:
        format_data['flac'][audio_quality[flac['audio']['id']]] = flac['audio']['base_url']
    return format_data


def download_video(bvid: str, part=0):
    parts = get_parts_info(bvid)
    cid = parts[part]['cid']
    if part > len(parts):
        logger.error('不存在当前分 P 视频...')
        exit(0)
    while True:
        try:
            urls = get_download_url(bvid, part)
            break
        except SSLError:
            logger.warning('访问出错,正在重连...')
    audios, videos, dolby, flac = urls['audio'], urls['video'], urls['audio'], urls['flac']
    video_choice = sorted(list(videos.keys()))
    audio_choice = sorted(list(set(list(audios.keys()) + list(dolby.keys()) + list(flac.keys()))))
    logger.info(f'当前存在视频质量: {video_choice}')
    logger.info(f'当前存在音频质量: {audio_choice}')
    logger.info('默认下载最高质量音视频...')
    logger.info(f'开始下载视频 {bvid} {parts[part]["part"]} {video_choice[0]}')
    download(videos[video_choice[0]], 'video.m4s')
    logger.info(f'开始下载音频 {bvid} {parts[part]["part"]} {audio_choice[0]}')
    if audio_choice[0] in audios:
        download(audios[audio_choice[0]], 'audio.m4s')
    elif audio_choice[0] in dolby:
        download(dolby[audio_choice[0]], 'audio.m4s')
    else:
        download(flac[audio_choice[0]], 'audio.m4s')
    check_ffmpeg()
    system(f'ffmpeg -i video.m4s -i audio.m4s -c:v copy -c:a copy -f mp4 {bvid}-{cid}.mp4')
    unlink('video.m4s')
    unlink('audio.m4s')
    logger.success("Done...")


def main():
    bvid = 'BV1Eb411u7Fw'
    download(bvid)


if __name__ == "__main__":
    main()
  • 修改版:无登录部分,允许命令行传递参数
import logging
from argparse import ArgumentParser
from concurrent.futures import ThreadPoolExecutor, as_completed
from os import path, popen, unlink
from time import sleep
from sys import stdout
from typing import TYPE_CHECKING, Union

import loguru
from requests import get, head
from requests.exceptions import SSLError

if TYPE_CHECKING:
    from loguru import Logger
logger: "Logger" = loguru.logger


class Filter:
    def __init__(self) -> None:
        self.level: Union[int, str] = "DEBUG"

    def __call__(self, record):
        module_name: str = "Bilibili"
        record["name"] = module_name.split(".")[0]
        levelno = (
            logger.level(self.level).no if isinstance(self.level, str) else self.level
        )
        return record["level"].no >= levelno


class LoguruHandler(logging.Handler):
    def emit(self, record):
        try:
            level = logger.level(record.levelname).name
        except ValueError:
            level = record.levelno

        frame, depth = logging.currentframe(), 2
        while frame and frame.f_code.co_filename == logging.__file__:
            frame = frame.f_back
            depth += 1

        logger.opt(depth=depth, exception=record.exc_info).log(
            level, record.getMessage()
        )


logger.remove()
default_filter: Filter = Filter()
"""默认日志等级过滤器"""
default_format: str = (
    "<g>{time:MM-DD HH:mm:ss}</g> "
    "[<g><lvl>{level}</lvl></g>] "
    "<c><u>{name}</u></c> | "
    # "<c>{function}:{line}</c>| "
    "{message}"
)
"""默认日志格式"""
logger_id = logger.add(
    stdout,
    level=0,
    colorize=True,
    diagnose=False,
    filter=default_filter,
    format=default_format,
)

__autodoc__ = {"Filter": False, "LoguruHandler": False}
cookie = ''
check = [0, 0]
total = [0, 0]
num = 2

video_quality = {
    6: '12-240P', 16: '11-320P', 32: '10-640P', 64: '09-720P', 74: '08-720P 60', 80: '07-1080P', 112: '06-1080P+',
    116: '05-1080P 60', 120: '04-4K 超清', 125: '03-HDR', 126: '02-杜比视界', 127: '01-8K 超清'
}
audio_quality = {
    30216: '05-64K', 30232: '04-132K', 30280: '03-192K', 30250: '02-杜比全景声', 30251: '01-Hi-Res 无损'
}

parser = ArgumentParser()
parser.add_argument("bvid", help="The bvid of the video", type=str)
parser.add_argument("-q", "--quality",
                    help="6: 12-240P, 16: 11-320P, 32: 10-640P, 64: 09-720P, 74: 08-720P 60, 80: 07-1080P, "
                         "112: 06-1080P+, 116: 05-1080P 60, 120: 04-4K 超清, 125: 03-HDR, 126: 02-杜比视界, 127: 01-8K 超清",
                    type=int,
                    choices=[6, 16, 32, 64, 74, 80, 112, 116, 120, 125, 126, 127])
parser.add_argument("-p", "--part", help="The part for the video", type=int, default=0)
args = parser.parse_args()


def check_ffmpeg():
    if not path.exists('ffmpeg.exe'):
        logger.error('未在当前环境下发现 ffmpeg 文件...')
        exit(1)


def calc_divisional_range(filesize):
    global num
    step = filesize // num
    arr = list(range(0, filesize, step))
    logger.debug(arr)
    check[0] = len(arr) - 1
    result = []
    for i in range(len(arr)-1):
        s_pos, e_pos = arr[i], arr[i+1]-1
        result.append([s_pos, e_pos])
    result[-1][-1] = filesize-1
    logger.debug(result)
    return result


def range_download(save_name: str, s_pos: int, e_pos: int, url: str):
    global check, total
    headers = {
        "Range": f"bytes={s_pos}-{e_pos}", "Referer": "https://www.bilibili.com",
        "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)'
                      ' Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78'
    }
    res = get(url, headers=headers, stream=True)
    with open(save_name, "rb+") as f:
        f.seek(s_pos)
        for chunk in res.iter_content(chunk_size=64*1024):
            if chunk:
                f.write(chunk)
                total[1] += 64 * 1024
    check[1] += 1


def progress():
    global total
    while total[0] >= total[1]:
        print('\r', end='')
        print(f'Progress {round(total[1] / total[0] * 100, 1)}%', chr(9619) * round(total[1] / total[0] * 100), end='')
        stdout.flush()
        sleep(0.5)


def download(url: str, filename: str):
    global check, total, num
    logger.info(f'开始下载文件:{filename}')
    headers = {
        'Referer': "https://www.bilibili.com",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)"
                      " Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78"
    }
    open(filename, "wb")
    with ThreadPoolExecutor() as p:
        futures = []
        length = int(head(url, headers=headers).headers['Content-Length'])
        total[0] = length
        logger.debug(f'文件大小:{length}')
        for s_pos, e_pos in calc_divisional_range(length):
            logger.debug(f'正在下载块: {s_pos} ~ {e_pos}')
            futures.append(p.submit(range_download, filename, s_pos, e_pos, url))
        futures.append(p.submit(progress))
        as_completed(futures)
    if check[1] != check[0]:
        print()
        check = [0, 0]
        total = [0, 0]
        logger.error('文件下载不完全,正在尝试重新下载...')
        download(url, filename)
    else:
        print()
        check = [0, 0]
        total = [0, 0]
        logger.debug(f'文件 {filename} 下载完毕')


def check_cookie():
    """
    测试缓存 cookie 是否可用

    :return:
    """
    global cookie
    user_space_url = 'https://api.bilibili.com/x/space/myinfo'
    if not path.exists('cookie.txt'):
        logger.error('请先登录以获取 cookie ...')
        exit(0)
    else:
        cache = open('cookie.txt', 'r', encoding='utf-8').read()
        cookies = {
            'SESSDATA': cache
        }
        vip_type = {0: "无", 1: "月度大会员", 2: "年度及以上大会员"}
        response = get(user_space_url, cookies=cookies).json()
        if response['code'] == 0:
            logger.debug('缓存 cookie 可用...')
            vip = vip_type[response["data"]["vip"]["type"]]
            logger.info(f'当前缓存账户:{response["data"]["name"]};VIP:{vip}')
            if vip == "无":
                logger.warning("当前账户非大会员账户,将无法使用更高权限的 API...")
            cookie = cache
            return cache
        else:
            logger.warning('缓存 cookie 失效,请重新登录...')
            exit(0)


def get_title(bvid: str) -> str:
    global cookie
    url = f'https://api.bilibili.com/x/web-interface/view?bvid={bvid}'
    cookies = {
        "SESSDATA": cookie
    }
    status = {
        0: '成功', -400: '请求错误', -403: '权限不足', -404: '无视频', 62002: '稿件不可见', 62004: '稿件审核中'
    }
    response = get(url, cookies=cookies).json()
    if response['code']:
        logger.error(f'当前视频: {bvid} {status[response["code"]]}')
        exit(0)
    title = response['data']['title']
    black_list = "<>?/\\:\"?*\n|! "
    for i in black_list:
        title = title.replace(i, '')
    return title


def get_parts_info(bvid: str) -> list:
    url = 'https://api.bilibili.com/x/player/pagelist'
    response = get(url, params={'bvid': bvid}).json()
    return response['data']


def get_download_url(bvid: str, part=0) -> dict:
    """
    获取具体的视频音频流 url

    三个测试样例:
        BV1Xe4y1P7xZ
        BV1eV411W7tt
        BV1rR4y1y7fG

    :param part:
    :param bvid:
    :return:
    """
    global cookie
    if not cookie:
        cookie = check_cookie()
    parts = get_parts_info(bvid)
    cookies = {
        "SESSDATA": cookie
    }
    headers = {
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.61',
        'referer': 'https://www.bilibili.com',
        'cookies': f'SESSDATA={cookie}'
    }
    cid = parts[part]['cid']
    player_url = f'https://api.bilibili.com/x/player/playurl?bvid={bvid}&cid={cid}&fnver=0&fnval=2000&fourk=1'
    response = get(player_url, headers=headers, cookies=cookies).json()['data']['dash']
    return extract_url(response)


def extract_url(response: dict) -> dict:
    """
    格式化,提取有用的部分

    :param response:
    :return:
    """
    global video_quality, audio_quality
    format_data = {'audio': {}, 'video': {}, 'dolby': {}, 'flac': {}}
    audios, dolby, flac, videos = response['audio'], response['dolby'], response['flac'], response['video']
    for audio in audios:
        if audio_quality[audio['id']] not in format_data['audio'].keys():
            format_data['audio'][audio_quality[audio['id']]] = audio['base_url']
    for video in videos:
        if video_quality[video['id']] not in format_data['video'].keys():
            format_data['video'][video_quality[video['id']]] = video['base_url']
    if dolby['type'] != 0:
        for i in dolby['audio']:
            if audio_quality[i['id']] not in format_data['dolby'].keys():
                format_data['dolby'][audio_quality[i['id']]] = i['base_url']
    if flac:
        format_data['flac'][audio_quality[flac['audio']['id']]] = flac['audio']['base_url']
    return format_data


def download_video(bvid: str, quality: int = 0, part=0):
    global video_quality, audio_quality
    parts = get_parts_info(bvid)
    if part > len(parts):
        logger.error('不存在当前分 P 视频...')
        exit(0)
    while True:
        try:
            urls = get_download_url(bvid, part)
            break
        except SSLError:
            logger.warning('访问出错,正在重连...')
    audios, videos, dolby, flac = urls['audio'], urls['video'], urls['audio'], urls['flac']
    video_choice = sorted(list(videos.keys()))
    audio_choice = sorted(list(set(list(audios.keys()) + list(dolby.keys()) + list(flac.keys()))))
    logger.info(f'当前存在视频质量: {video_choice}')
    logger.info(f'当前存在音频质量: {audio_choice}')
    if quality:
        if quality in video_quality and video_quality[quality] in video_choice:
            logger.info(f"开始下载视频 {bvid} {parts[part]['part']} {video_quality[quality]}")
            download(videos[video_quality[quality]], f"video-{bvid}.m4s")
        else:
            logger.error("不存在当前画质的视频...")
            exit(1)
    else:
        logger.info('默认下载最高质量音视频...')
        logger.info(f'开始下载视频 {bvid} {parts[part]["part"]} {video_choice[0]}')
        download(videos[video_choice[0]], f'video-{bvid}.m4s')
    logger.info(f'开始下载音频 {bvid} {parts[part]["part"]} {audio_choice[0]}')
    if audio_choice[0] in audios:
        download(audios[audio_choice[0]], f'audio-{bvid}.m4s')
    elif audio_choice[0] in dolby:
        download(dolby[audio_choice[0]], f'audio-{bvid}.m4s')
    else:
        download(flac[audio_choice[0]], f'audio-{bvid}.m4s')
    check_ffmpeg()
    title = get_title(bvid)
    popen(f'ffmpeg -i video-{bvid}.m4s -i audio-{bvid}.m4s -c:v copy -c:a copy -f mp4 {title}.mp4').read()
    unlink(f'video-{bvid}.m4s')
    unlink(f'audio-{bvid}.m4s')
    logger.success("Done...")


def main():
    download_video(args.bvid, quality=args.quality, part=args.part)


if __name__ == "__main__":
    main()
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇