Bilibili Downloader

Update: 2024/04/08
突然又想下个视频,发现旧的用不了了,稍微修了修

依据 API 收集项目 编写,需要手动复制cookieSESSDATA字段,需要同目录存在ffmpeg

script

from argparse import ArgumentParser
from os import path, popen, unlink
from threading import Thread, Lock

from loguru import logger
from requests import get
from requests.exceptions import SSLError

cookie = ''
lock = Lock()
video_quality = {
    6: '12-240P', 16: '11-320P', 32: '10-640P', 64: '09-720P', 74: '08-720P 60', 80: '07-1080P', 112: '06-1080P+',
    116: '05-1080P 60', 120: '04-4K 超清', 125: '03-HDR', 126: '02-杜比视界', 127: '01-8K 超清'
}
audio_quality = {
    30216: '05-64K', 30232: '04-132K', 30280: '03-192K', 30250: '02-杜比全景声', 30251: '01-Hi-Res 无损'
}


def check_ffmpeg():
    if not path.exists('ffmpeg.exe'):
        logger.error('未在当前环境下发现 ffmpeg 文件...')
        exit(1)


def download_range(save_name: str, s_pos: int, e_pos: int, url: str):
    global lock, cookie
    headers = {
        "Range": f"bytes={s_pos}-{e_pos}", "Referer": "https://www.bilibili.com",
        "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)'
                      ' Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78'
    }
    with get(url, headers=headers, stream=True) as r:
        with open(save_name, 'r+b') as f:
            # 移动文件指针到指定位置
            f.seek(s_pos)
            # 写入数据
            for chunk in r.iter_content(chunk_size=1024):
                if chunk:
                    f.write(chunk)


def download(url: str, filename: str, thread=3):
    global cookie
    logger.debug(f"Download url: {url}")
    logger.info(f'开始下载文件:{filename}')
    headers = {
        'Referer': "https://www.bilibili.com",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)"
                      " Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78"
    }
    with get(url, headers=headers) as r:
        file_size = int(r.headers.get('content-length', 0))
    with open(filename, 'wb') as f:
        f.seek(file_size - 1)
        f.write(b'\0')
    part = file_size // thread
    threads = []
    for i in range(thread):
        start = part * i
        end = start + part if i < thread - 1 else file_size - 1
        t = Thread(target=download_range, args=(filename, start, end, url))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()


def check_cookie():
    """
    测试缓存 cookie 是否可用

    :return:
    """
    global cookie
    user_space_url = 'https://api.bilibili.com/x/space/myinfo'
    headers = {
        'Referer': "https://www.bilibili.com",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)"
                      " Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78"
    }
    if not path.exists('cookie.txt'):
        logger.error('请先登录以获取 cookie ...')
        exit(0)
    else:
        cache = open('cookie.txt', 'r', encoding='utf-8').read()
        cookies = {
            'SESSDATA': cache
        }
        vip_type = {0: "无", 1: "月度大会员", 2: "年度及以上大会员"}
        response = get(user_space_url, cookies=cookies, headers=headers).json()
        if response['code'] == 0:
            logger.debug('缓存 cookie 可用...')
            vip = vip_type[response["data"]["vip"]["type"]]
            logger.info(f'当前缓存账户:{response["data"]["name"]};VIP:{vip}')
            if vip == "无":
                logger.warning("当前账户非大会员账户,将无法使用更高权限的 API...")
            cookie = cache
            return cache
        else:
            logger.warning('缓存 cookie 失效,请重新登录...')
            exit(0)


def get_title(bvid: str) -> str:
    global cookie
    url = f'https://api.bilibili.com/x/web-interface/view?bvid={bvid}'
    cookies = {
        "SESSDATA": cookie
    }
    headers = {
        'Referer': "https://www.bilibili.com",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)"
                      " Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78"
    }
    status = {
        0: '成功', -400: '请求错误', -403: '权限不足', -404: '无视频', 62002: '稿件不可见', 62004: '稿件审核中'
    }
    response = get(url, cookies=cookies, headers=headers).json()
    if response['code']:
        logger.error(f'当前视频: {bvid} {status[response["code"]]}')
        exit(0)
    title = response['data']['title']
    black_list = "<>?/\\:\"?*\n|! "
    for i in black_list:
        title = title.replace(i, '')
    return title


def get_parts_info(bvid: str) -> list:
    url = 'https://api.bilibili.com/x/player/pagelist'
    headers = {
        'Referer': "https://www.bilibili.com",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)"
                      " Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78"
    }
    response = get(url, params={'bvid': bvid}, headers=headers).json()
    return response['data']


def get_download_url(bvid: str, part=0) -> dict:
    """
    获取具体的视频音频流 url

    三个测试样例:
        BV1Xe4y1P7xZ
        BV1eV411W7tt
        BV1rR4y1y7fG

    :param part:
    :param bvid:
    :return:
    """
    global cookie
    if not cookie:
        cookie = check_cookie()
    parts = get_parts_info(bvid)
    cookies = {
        "SESSDATA": cookie
    }
    headers = {
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.61',
        'referer': 'https://www.bilibili.com',
        'cookies': f'SESSDATA={cookie}'
    }
    cid = parts[part]['cid']
    player_url = f'https://api.bilibili.com/x/player/playurl?bvid={bvid}&cid={cid}&fnver=0&fnval=2000&fourk=1'
    response = get(player_url, headers=headers, cookies=cookies).json()['data']['dash']
    return extract_url(response)


def extract_url(response: dict) -> dict:
    """
    格式化,提取有用的部分

    :param response:
    :return:
    """
    global video_quality, audio_quality
    format_data = {'audio': {}, 'video': {}, 'dolby': {}, 'flac': {}}
    audios, dolby, flac, videos = response['audio'], response['dolby'], response['flac'], response['video']
    for audio in audios:
        if audio_quality[audio['id']] not in format_data['audio'].keys():
            format_data['audio'][audio_quality[audio['id']]] = audio['base_url']
    for video in videos:
        if video_quality[video['id']] not in format_data['video'].keys():
            format_data['video'][video_quality[video['id']]] = video['base_url']
    if dolby['type'] != 0:
        for i in dolby['audio']:
            if audio_quality[i['id']] not in format_data['dolby'].keys():
                format_data['dolby'][audio_quality[i['id']]] = i['base_url']
    if flac:
        format_data['flac'][audio_quality[flac['audio']['id']]] = flac['audio']['base_url']
    return format_data


def download_video(bvid: str, quality: int = 0, part=0, thread=3):
    global video_quality, audio_quality
    parts = get_parts_info(bvid)
    if part > len(parts):
        logger.error('不存在当前分 P 视频...')
        exit(0)
    while True:
        try:
            urls = get_download_url(bvid, part)
            break
        except SSLError:
            logger.warning('访问出错,正在重连...')
    audios, videos, dolby, flac = urls['audio'], urls['video'], urls['audio'], urls['flac']
    video_choice = sorted(list(videos.keys()))
    audio_choice = sorted(list(set(list(audios.keys()) + list(dolby.keys()) + list(flac.keys()))))
    logger.info(f'当前存在视频质量: {video_choice}')
    logger.info(f'当前存在音频质量: {audio_choice}')
    if quality:
        if quality in video_quality and video_quality[quality] in video_choice:
            logger.info(f"开始下载视频 {bvid} {parts[part]['part']} {video_quality[quality]}")
            download(videos[video_quality[quality]], f"video-{bvid}.m4s", thread)
        else:
            logger.error("不存在当前画质的视频...")
            exit(1)
    else:
        logger.info('默认下载最高质量音视频...')
        logger.info(f'开始下载视频 {bvid} {parts[part]["part"]} {video_choice[0]}')
        download(videos[video_choice[0]], f'video-{bvid}.m4s', thread)
    logger.info(f'开始下载音频 {bvid} {parts[part]["part"]} {audio_choice[0]}')
    if audio_choice[0] in audios:
        download(audios[audio_choice[0]], f'audio-{bvid}.m4s', thread)
    elif audio_choice[0] in dolby:
        download(dolby[audio_choice[0]], f'audio-{bvid}.m4s', thread)
    else:
        download(flac[audio_choice[0]], f'audio-{bvid}.m4s', thread)
    check_ffmpeg()
    title = get_title(bvid)
    popen(f'ffmpeg -i video-{bvid}.m4s -i audio-{bvid}.m4s -c:v copy -c:a copy -f mp4 {title}.mp4').read()
    unlink(f'video-{bvid}.m4s')
    unlink(f'audio-{bvid}.m4s')
    logger.success("Done...")


def main():
    parser = ArgumentParser()
    parser.add_argument("bvid", help="The bvid of the video", type=str)
    parser.add_argument("-q", "--quality",
                        help="6: 12-240P, 16: 11-320P, 32: 10-640P, 64: 09-720P, 74: 08-720P 60, 80: 07-1080P, "
                             "112: 06-1080P+, 116: 05-1080P 60, 120: 04-4K 超清, 125: 03-HDR, 126: 02-杜比视界, "
                             "127: 01-8K 超清",
                        type=int,
                        choices=[6, 16, 32, 64, 74, 80, 112, 116, 120, 125, 126, 127])
    parser.add_argument("-p", "--part", help="The part for the video", type=int, default=0)
    parser.add_argument("-t", "--thread", help="The thread for download", type=int, default=3)
    args = parser.parse_args()
    download_video(args.bvid, quality=args.quality, part=args.part, thread=args.thread)


if __name__ == "__main__":
    main()
    # download_video("BV1cD421L73M")
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇