0. 概述
影视剧的直播,有其特殊性:
- 直播内容是“静态的”。直播源通常是预制的视频文件,而非实时通过摄像头采集的数据。
- 没有主播,所以也就没有主播和观众的互动需求。所以对延时(主播和观众的时间差)要求不高,只要保证所有观众端具有差不多的播放进度即可。
所以相比摄像头开播的直播,影视剧直播的技术方案有可简化的余地:
- 把视频文件预先转换成目标格式,避免推流过程中的编解码(当然这里只考虑为观众提供原始码率,不需要转码的情况)。
- 把视频文件切割成多个时间长度为几秒钟的小视频文件,依次轮播。当新用户进入时,使其从当前序号的视频文件播放即可——同其它观众的时间差不超过此文件的时长。
同时,通过预转制为合适的格式,有机会利用 http 直接作为传输协议。这样就只需搭建一个简单的 web 服务,观众通过浏览器就可以拉流观看直播了。
方案主要包含 5 个功能模块:
- to_ts.py:格式转换
- Broadcaster:遍历文件、remux、推流
- Redis:推流队列、弹幕发布订阅
- Web 服务:为客户端提供接口
- 前端:基于 DPlayer 实现的弹幕播放器
Broadcaster、Redis、Web 三个服务部署在了同一台机器上。1C1G 的机器,推一条流,CPU 只占用 3% 左右。
1. 格式转换
格式转换发生在资源上线之前,只需要进行一次。
利用 ffmpeg,把源文件切分成多个 .ts 文件,每个 ts 片段 10s 左右:
1
| ffmpeg -i in.mkv 省略其它参数 -hls_time 10 out.m3u8
|
生成的 .m3u8 文件可以丢掉,实际只会用到 .ts 文件。推流时会依序遍历 .ts 文件,串流为一个 http 流。.ts 是受到广泛浏览器支持的容器格式,实测只有 iOS 上浏览器不支持它。
2. 推流
2.1 使用 pyAV 库重封装(remux) .ts 文件
轮播(一个源文件反复播、多个源文件串流播)时,要重封装 .ts 文件确保 pts/dts 连贯递增。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| if packet.stream == src_video_stream: if video_dts is None: video_dts = packet.dts packet.dts = video_dts packet.pts = video_dts + delta video_dts += packet.duration packet.stream = dst_video_stream elif packet.stream == src_audio_stream: if audio_dts is None: audio_dts = packet.dts packet.dts = audio_dts packet.pts = audio_dts + delta audio_dts += packet.duration packet.stream = dst_audio_stream dst_container.mux(packet)
|
通过不断累加,保证了 pts/dts 是连续的。
重封装是不需要编解码的,只是一个简单的数值计算,但是 pyAV 这个库好像不支持封装后写入到内存,必须创建一个磁盘文件,写入再读出,算是不算完美的一个点。
2.2 使用 Redis 做推流队列
推流单位是 .ts 文件,而非音视频帧。处理逻辑就很简单了:每次读一个 .ts 文件,push 到 redis 队列中即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| def send_fragment(temp_ts, duration, start_time): global fragment_index with open(temp_ts, mode='rb') as f: fragment_data = f.read() fragment_data = base64.standard_b64encode(fragment_data).decode( encoding='ascii') redis_client = redis.Redis(connection_pool=redis_pool, decode_responses=True) redis_client.rpush(redis_key_fragments, orjson.dumps( {'index': fragment_index, 'duration': duration, 'data': fragment_data})) redis_client.ltrim(redis_key_fragments, -2, -1) playing_fragment = redis_client.lindex(redis_key_fragments, 0) redis_client.close() playing_fragment = orjson.loads(playing_fragment) time_span = time.time() - start_time duration = max(playing_fragment['duration'] - time_span, 0) time.sleep(duration) fragment_index += 1
|
redis 数据类型用到了 list,因为每个直播流我保留了最新的 2 个 .ts 片段,避免网络抖动或其它原因引起的推流跟不上而导致的卡顿。
每次 push 后 sleep 的时长很重要。太短的话,观众端还没看完当前片段就跳跃到了下一个片段;太长,观众端已经播完了推流还没跟上,就会卡顿。
所以要根据观众当前正在观看的片段——也就是 redis 队列里的第一个片段——的时长进行 sleep。
3. Sanic 实现 Web 服务
Web 服务主要两大块功能:
- 提供拉流接口。用户通过访问此接口,获取到直播流,显示到播放器上。
- 弹幕。Web 服务应支持 websocket,记录所有连接,当收到弹幕时,“组播”给同频道的所有连接。
2.1 拉流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @api_bp.get('/live/<channel:str>') async def get_live_stream(request, channel: str): channel = urllib.parse.unquote(channel) response = await request.respond() redis_key = REDIS_KEY_PREFIX_PLAY_FRAGMENT + channel new_client = True while True: sent = False async with redis.Redis(connection_pool=request.app.ctx.redis_pool, auto_close_connection_pool=False, decode_responses=True) as redis_client: fragments = await redis_client.lrange(redis_key, 0, 1 if new_client else 0) for fragment in fragments: fragment = orjson.loads(fragment) fragment_index = fragment['index'] if new_client or request.ctx.fragment_index < fragment_index: request.ctx.fragment_index = fragment_index data = base64.standard_b64decode(fragment['data']) await response.send(data) fragment_duration = fragment['duration'] await asyncio.sleep(0 if new_client else fragment_duration if fragment_duration <= 1 else fragment_duration - 1) sent = True new_client = False if not sent: await asyncio.sleep(1)
|
轮播时流传输应该是永不退出的,所以最外层用死循环包住。
不断地从 redis 队列获取数据(但不 pop,不然别人拿不到了),写入到 response 中,完成串流。
new_client
相关的逻辑,是为了保证一次性塞给播放器足够时长的数据,否则会容易卡顿。
2.2 弹幕
如果是单体应用(且只有一个 worker),一个典型的 websocket 弹幕实现应该是这样的:
1 2 3 4 5 6 7 8
| @api_bp.websocket('/chat/<channel:str>') async def on_websocket(request, ws: Websocket, channel: str): if channel not in app_ctx.clients: app_ctx.clients[channel] = [] app_ctx.clients[channel].append(ws) while True: danmaku = await ws.recv() await broadcast_danmaku(danmaku)
|
为了保证在有多个 Web worker 时弹幕可以互通,就必须使用某种进程间通信方式把弹幕通知给其它 worker。既然前边已经用到了 Redis,
那就继续利用它的 Pub/Sub 做弹幕通知。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @api_bp.websocket('/chat/<channel:str>') async def on_websocket(request, ws: Websocket, channel: str): app_ctx = request.app.ctx if channel not in app_ctx.clients: app_ctx.clients[channel] = [] await app_ctx.redis_pubsub.subscribe(REDIS_CHANNEL_PREFIX_DANMAKU + channel) if not hasattr(request.app.ctx, 'broadcast_task'): request.app.ctx.broadcast_task = asyncio.create_task(broadcast_danmaku(request.app.ctx)) app_ctx.clients[channel].append(ws) while True: try: message = await ws.recv() except WebsocketClosed: app_ctx.clients[channel].remove(ws) break async with redis.Redis(connection_pool=request.app.ctx.redis_pool, auto_close_connection_pool=False) as redis_client: await redis_client.publish(REDIS_CHANNEL_PREFIX_DANMAKU + channel, message)
async def broadcast_danmaku(ctx): while True: message = await ctx.redis_pubsub.get_message(True, None) if message: live_channel = message['channel'].decode('utf-8')[len(REDIS_CHANNEL_PREFIX_DANMAKU):] clients = ctx.clients[live_channel][:] danmaku = message['data'].decode('utf-8') for client in clients: try: await client.send(danmaku) except WebsocketClosed: ctx.clients[live_channel].remove(client)
|
每个 worker 创建一个异步任务(asyncio.create_task
),在任务中阻塞等待 Redis 订阅频道上的消息到来。
这样 websocket 路由接口还是阻塞等待在 recv
处,两者互不影响。
当接收到新的客户端弹幕,发布到 Redis 频道,所有订阅过的 worker(包括发送 worker 自己)都会被唤醒,分别广播给自己承载的客户端。
注意 broadcast_danmaku
的实现,我是在 for
循环里串行发送的,这里可以用 asyncio.gather
优化为并行版本。
3. Web 播放器
DPlayer 很好很强大,支持直播、支持弹幕,可以配合插件支持多种传输格式,完全满足我的需求。
. ts 格式的视频播放库我用的是 mpegts.js,好像是 flv.js 的替代品。配合 DPlayer,大概的用法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| playerRef.current = new DPlayer({ container: document.getElementById('dplayer'), live: true, autoplay: true, video: { url: `/api/live/${props.channel}`, type: 'customMpegts', customType: { customMpegts: function (video) { const player = mpegts.createPlayer({ type: 'mpegts', isLive: true, url: video.src }); player.attachMediaElement(video); player.load(); } } }, danmaku: true, apiBackend: { read: function (options) { options.success([]); }, send: function (options) { send(JSON.stringify({color: options.data.color, text: options.data.text})); showDanmaku(); options.success(); } } });
|
最新版的 Dplayer(1.27.1)有 bug,
给弹幕元素动态生成 css 类时没有动画信息,导致弹幕不能正常显示。我在使用时简单补偿了一下:
1 2 3 4 5 6
| function showDanmaku() { for (const item of document.getElementsByClassName('dplayer-danmaku-item')) { item.style.animationName = 'danmaku'; item.style.animationPlayState = 'running'; } }
|
4. 总结
1) 这并不是一套适用于 UGC(用户生产内容)平台的方案,因为我既是平台方,又是内容生产者,所以才心甘情愿地做额外的格式转换。
2) 预算决定技术方案。如果足够有钱,何苦在乎我下到的资源是什么格式?ffmpeg 转一切。