一种低成本视频直播(轮播)解决方案

0. 概述

影视剧的直播,有其特殊性:

  1. 直播内容是“静态的”。直播源通常是预制的视频文件,而非实时通过摄像头采集的数据。
  2. 没有主播,所以也就没有主播和观众的互动需求。所以对延时(主播和观众的时间差)要求不高,只要保证所有观众端具有差不多的播放进度即可。

所以相比摄像头开播的直播,影视剧直播的技术方案有可简化的余地:

  1. 把视频文件预先转换成目标格式,避免推流过程中的编解码(当然这里只考虑为观众提供原始码率,不需要转码的情况)。
  2. 把视频文件切割成多个时间长度为几秒钟的小视频文件,依次轮播。当新用户进入时,使其从当前序号的视频文件播放即可——同其它观众的时间差不超过此文件的时长。

同时,通过预转制为合适的格式,有机会利用 http 直接作为传输协议。这样就只需搭建一个简单的 web 服务,观众通过浏览器就可以拉流观看直播了。

方案主要包含 5 个功能模块:

  • to_ts.py:格式转换
  • Broadcaster:遍历文件、remux、推流
  • Redis:推流队列、弹幕发布订阅
  • Web 服务:为客户端提供接口
  • 前端:基于 DPlayer 实现的弹幕播放器

Broadcaster、Redis、Web 三个服务部署在了同一台机器上。1C1G 的机器,推一条流,CPU 只占用 3% 左右。

1. 格式转换

格式转换发生在资源上线之前,只需要进行一次。

利用 ffmpeg,把源文件切分成多个 .ts 文件,每个 ts 片段 10s 左右:

1
ffmpeg -i in.mkv 省略其它参数 -hls_time 10 out.m3u8

生成的 .m3u8 文件可以丢掉,实际只会用到 .ts 文件。推流时会依序遍历 .ts 文件,串流为一个 http 流。.ts 是受到广泛浏览器支持的容器格式,实测只有 iOS 上浏览器不支持它。

2. 推流

2.1 使用 pyAV 库重封装(remux) .ts 文件

轮播(一个源文件反复播、多个源文件串流播)时,要重封装 .ts 文件确保 pts/dts 连贯递增。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if packet.stream == src_video_stream:
if video_dts is None:
video_dts = packet.dts
packet.dts = video_dts
packet.pts = video_dts + delta
video_dts += packet.duration # dts 累加
packet.stream = dst_video_stream
elif packet.stream == src_audio_stream:
if audio_dts is None:
audio_dts = packet.dts
packet.dts = audio_dts
packet.pts = audio_dts + delta
audio_dts += packet.duration # dts 累加
packet.stream = dst_audio_stream
dst_container.mux(packet)

通过不断累加,保证了 pts/dts 是连续的。

重封装是不需要编解码的,只是一个简单的数值计算,但是 pyAV 这个库好像不支持封装后写入到内存,必须创建一个磁盘文件,写入再读出,算是不算完美的一个点。

2.2 使用 Redis 做推流队列

推流单位是 .ts 文件,而非音视频帧。处理逻辑就很简单了:每次读一个 .ts 文件,push 到 redis 队列中即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def send_fragment(temp_ts, duration, start_time):
global fragment_index
with open(temp_ts, mode='rb') as f:
fragment_data = f.read()
fragment_data = base64.standard_b64encode(fragment_data).decode(
encoding='ascii')
redis_client = redis.Redis(connection_pool=redis_pool, decode_responses=True)
redis_client.rpush(redis_key_fragments, orjson.dumps(
{'index': fragment_index, 'duration': duration,
'data': fragment_data}))
redis_client.ltrim(redis_key_fragments, -2, -1) # 保留最新的两个片段
playing_fragment = redis_client.lindex(redis_key_fragments, 0)
redis_client.close()
playing_fragment = orjson.loads(playing_fragment)
time_span = time.time() - start_time
duration = max(playing_fragment['duration'] - time_span, 0)
time.sleep(duration)
fragment_index += 1

redis 数据类型用到了 list,因为每个直播流我保留了最新的 2 个 .ts 片段,避免网络抖动或其它原因引起的推流跟不上而导致的卡顿。

每次 push 后 sleep 的时长很重要。太短的话,观众端还没看完当前片段就跳跃到了下一个片段;太长,观众端已经播完了推流还没跟上,就会卡顿。
所以要根据观众当前正在观看的片段——也就是 redis 队列里的第一个片段——的时长进行 sleep。

3. Sanic 实现 Web 服务

Web 服务主要两大块功能:

  • 提供拉流接口。用户通过访问此接口,获取到直播流,显示到播放器上。
  • 弹幕。Web 服务应支持 websocket,记录所有连接,当收到弹幕时,“组播”给同频道的所有连接。

2.1 拉流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@api_bp.get('/live/<channel:str>')
async def get_live_stream(request, channel: str):
channel = urllib.parse.unquote(channel)
response = await request.respond()
redis_key = REDIS_KEY_PREFIX_PLAY_FRAGMENT + channel
new_client = True
while True:
sent = False
async with redis.Redis(connection_pool=request.app.ctx.redis_pool, auto_close_connection_pool=False,
decode_responses=True) as redis_client:
fragments = await redis_client.lrange(redis_key, 0, 1 if new_client else 0)
for fragment in fragments:
fragment = orjson.loads(fragment)
fragment_index = fragment['index']
if new_client or request.ctx.fragment_index < fragment_index:
request.ctx.fragment_index = fragment_index
data = base64.standard_b64decode(fragment['data'])
await response.send(data)
fragment_duration = fragment['duration']
await asyncio.sleep(0 if new_client else fragment_duration if fragment_duration <= 1 else fragment_duration - 1)
sent = True
new_client = False
if not sent:
await asyncio.sleep(1)

轮播时流传输应该是永不退出的,所以最外层用死循环包住。

不断地从 redis 队列获取数据(但不 pop,不然别人拿不到了),写入到 response 中,完成串流。

new_client 相关的逻辑,是为了保证一次性塞给播放器足够时长的数据,否则会容易卡顿。

2.2 弹幕

如果是单体应用(且只有一个 worker),一个典型的 websocket 弹幕实现应该是这样的:

1
2
3
4
5
6
7
8
@api_bp.websocket('/chat/<channel:str>')
async def on_websocket(request, ws: Websocket, channel: str):
if channel not in app_ctx.clients:
app_ctx.clients[channel] = []
app_ctx.clients[channel].append(ws) # 记录当前频道所有客户端
while True:
danmaku = await ws.recv() # 等待
await broadcast_danmaku(danmaku) # 广播给其它客户端

为了保证在有多个 Web worker 时弹幕可以互通,就必须使用某种进程间通信方式把弹幕通知给其它 worker。既然前边已经用到了 Redis,
那就继续利用它的 Pub/Sub 做弹幕通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@api_bp.websocket('/chat/<channel:str>')
async def on_websocket(request, ws: Websocket, channel: str):
app_ctx = request.app.ctx
if channel not in app_ctx.clients:
app_ctx.clients[channel] = []
await app_ctx.redis_pubsub.subscribe(REDIS_CHANNEL_PREFIX_DANMAKU + channel) # 订阅 Redis 频道
if not hasattr(request.app.ctx, 'broadcast_task'):
request.app.ctx.broadcast_task = asyncio.create_task(broadcast_danmaku(request.app.ctx)) # 创建任务
app_ctx.clients[channel].append(ws)
while True:
try:
message = await ws.recv()
except WebsocketClosed:
app_ctx.clients[channel].remove(ws)
break
async with redis.Redis(connection_pool=request.app.ctx.redis_pool,
auto_close_connection_pool=False) as redis_client:
await redis_client.publish(REDIS_CHANNEL_PREFIX_DANMAKU + channel, message) # 发布到 Redis 频道


async def broadcast_danmaku(ctx):
while True:
message = await ctx.redis_pubsub.get_message(True, None)
if message:
live_channel = message['channel'].decode('utf-8')[len(REDIS_CHANNEL_PREFIX_DANMAKU):]
clients = ctx.clients[live_channel][:]
danmaku = message['data'].decode('utf-8')
for client in clients:
try:
await client.send(danmaku)
except WebsocketClosed:
ctx.clients[live_channel].remove(client)

每个 worker 创建一个异步任务(asyncio.create_task),在任务中阻塞等待 Redis 订阅频道上的消息到来。
这样 websocket 路由接口还是阻塞等待在 recv 处,两者互不影响。

当接收到新的客户端弹幕,发布到 Redis 频道,所有订阅过的 worker(包括发送 worker 自己)都会被唤醒,分别广播给自己承载的客户端。

注意 broadcast_danmaku 的实现,我是在 for 循环里串行发送的,这里可以用 asyncio.gather 优化为并行版本。

3. Web 播放器

DPlayer 很好很强大,支持直播、支持弹幕,可以配合插件支持多种传输格式,完全满足我的需求。

. ts 格式的视频播放库我用的是 mpegts.js,好像是 flv.js 的替代品。配合 DPlayer,大概的用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
playerRef.current = new DPlayer({
container: document.getElementById('dplayer'),
live: true,
autoplay: true,
video: {
url: `/api/live/${props.channel}`,
type: 'customMpegts',
customType: {
customMpegts: function (video) {
const player = mpegts.createPlayer({
type: 'mpegts',
isLive: true,
url: video.src
});
player.attachMediaElement(video);
player.load();
}
}
},
danmaku: true,
apiBackend: {
read: function (options) {
options.success([]);
},
send: function (options) {
send(JSON.stringify({color: options.data.color, text: options.data.text}));
showDanmaku();
options.success();
}
}
});

最新版的 Dplayer(1.27.1)有 bug
给弹幕元素动态生成 css 类时没有动画信息,导致弹幕不能正常显示。我在使用时简单补偿了一下:

1
2
3
4
5
6
function showDanmaku() {
for (const item of document.getElementsByClassName('dplayer-danmaku-item')) {
item.style.animationName = 'danmaku';
item.style.animationPlayState = 'running';
}
}

4. 总结

1) 这并不是一套适用于 UGC(用户生产内容)平台的方案,因为我既是平台方,又是内容生产者,所以才心甘情愿地做额外的格式转换。

2) 预算决定技术方案。如果足够有钱,何苦在乎我下到的资源是什么格式?ffmpeg 转一切。

评论