websocket_server_main.cc

websocket_server_main.cc

可执行文件路径:wenet/runtime/libtorch/build/bin/websocket_server_main

直接执行

在命令行窗口执行: bash websocket_server.sh

websocket_server.sh:

1
2
3
4
5
6
7
8
9
10
export GLOG_logtostderr=1
export GLOG_v=2
model_dir=/mnt/k/wenet/model/20210601_u2++_conformer_libtorch
#model_dir=/mnt/k/wenet/model/20220506_u2pp_conformer_libtorch

./build/bin/websocket_server_main \
--port 10086 \
--chunk_size 16 \
--model_path $model_dir/final.zip \
--unit_path $model_dir/units.txt 2>&1 | tee server.log

在另一个命令行窗口执行:bash websocket_client.sh

websocket_client.sh:

1
2
3
4
5
6
7
export GLOG_logtostderr=1
export GLOG_v=2
wav_path=test.wav

./build/bin/websocket_client_main \
--hostname 127.0.0.1 --port 10086 \
--wav_path $wav_path 2>&1 | tee client.log

调试

1
2
3
4
5
6
7
8
9
10
# cd runtime/libtorch
export GLOG_logtostderr=1
export GLOG_v=2

cgdb ./build/bin/websocket_server_main
# 打断点 比如 b core/websocket/websocket_server.cc:125
run --port 10086 --chunk_size 16 --model_path /mnt/k/wenet/model/20210601_u2++_conformer_libtorch/final.zip --unit_path /mnt/k/wenet/model/20210601_u2++_conformer_libtorch/units.txt

# 在另一个命令行窗口,执行
bash websocket_client.sh

过程解释

server监听来自client的消息,接收到client消息就新开一个线程,在子线程里进行通信。通信的类叫websocket::stream<tcp::socket>,客户端写数据消息 write进这个类对象 ws_里,服务器端read消息。

子线程具体通信过程(下面把“当前这个子线程”叫“主线程”,因为这个子线程里面又会新开线程):

client先发一个text作为开始通信的表示,server接收,server通过连续三个判断(接收的字符串里有三个字段,分别是signal == "start"obj.find("nbest") != obj.end()obj.find("continuous_decoding") != obj.end())确认它是想开始通信。

然后进入OnSpeechStart()函数,server告诉client接收到了,发送给client(发送,也就是ws.write)ok的状态,server准备好了。

新建一个线程类型的共享指针,子线程内执行的函数是 DecodeThreadFunc()函数。于是子线程decode_thread_去执行 DecodeThreadFunc()函数,原来的线程继续执行,新一轮for循环会执行到 OnSpeechData() 函数,读入音频数据,feature_pipeline_->AcceptWaveform(pcm_data, num_samples);,计算成fbank特征,放进特征队列里,等待解码器的解码。音频数据全部读完后,break退出for循环,程序来到 decode_thread_->join();,主线程等待子线程 decode_thread_ 程序执行完毕。

DecodeThreadFunc()函数流程:用了while(true)循环,直到遇到end结束符就break,否则一直循环,执行state = decoder_->Decode()decoder_->Decode() 具体操作是:从特征队列里读取read特征,存放在chunk_feat 里,送入模型 model_->ForwardEncoder 识别,再做CtcPrefixBeamSearch。

根据解码返回状态 state 决定接下去的步骤:

  • 状态为 “解码完所有音频特征 kEndFeats 时”,说明所有音频解码完了,进行rescore,结果保存在json中,写入ws_,也就是传结果这个消息给客户端,然后break出去;
  • 状态为“检测到端点 kEndpoint 时”,说明遇到端点,进行rescore,结果保存在json中,写入ws_,也就是传结果这个消息给客户端,然后根据初始参数设置的是要连续解码还是非连续解码,进行不同的后续步骤。如果设定为要连续解码,则虽然遇到端点了,还是要接着下一轮循环继续解码,把decoder清空历史内存,reset各种状态,重新开始解码;如果设定为非连续解码,则遇到端点,视为结束,传结束符给client,break出去。
  • 状态为“kEndBatch或kWaitFeats时”,输出部分结果,告诉client部分结果,接着下一轮循环。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// DecodeThreadFunc():

while (true) {
DecodeState state = decoder_->Decode();
if (state == DecodeState::kEndFeats) {
decoder_->Rescoring();
std::string result = SerializeResult(true);
OnFinalResult(result);
OnFinish();
stop_recognition_ = true;
break;
} else if (state == DecodeState::kEndpoint) {
decoder_->Rescoring();
std::string result = SerializeResult(true);
OnFinalResult(result);
// If it's not continuous decoding, continue to do next recognition
// otherwise stop the recognition
if (continuous_decoding_) {
decoder_->ResetContinuousDecoding();
} else {
OnFinish();
stop_recognition_ = true;
break;
}
} else {
if (decoder_->DecodedSomething()) {
std::string result = SerializeResult(false);
OnPartialResult(result);
}
}
}