Yin的笔记本

vuePress-theme-reco Howard Yin    2021 - 2025
Yin的笔记本 Yin的笔记本

Choose mode

  • dark
  • auto
  • light
Home
Category
  • CNCF
  • Docker
  • namespaces
  • Kubernetes
  • Kubernetes对象
  • Linux
  • MyIdeas
  • Revolution
  • WebRTC
  • 云计算
  • 人工智能
  • 分布式
  • 图像处理
  • 图形学
  • 微服务
  • 数学
  • OJ笔记
  • 博弈论
  • 形式语言与自动机
  • 数据库
  • 服务器运维
  • 编程语言
  • C
  • Git
  • Go
  • Java
  • JavaScript
  • Python
  • Nvidia
  • Rust
  • Tex
  • Shell
  • Vue
  • 视频编解码
  • 计算机网络
  • SDN
  • 论文笔记
  • 讨论
  • 边缘计算
  • 量子信息技术
Tag
TimeLine
About
查看源码
author-avatar

Howard Yin

303

Article

153

Tag

Home
Category
  • CNCF
  • Docker
  • namespaces
  • Kubernetes
  • Kubernetes对象
  • Linux
  • MyIdeas
  • Revolution
  • WebRTC
  • 云计算
  • 人工智能
  • 分布式
  • 图像处理
  • 图形学
  • 微服务
  • 数学
  • OJ笔记
  • 博弈论
  • 形式语言与自动机
  • 数据库
  • 服务器运维
  • 编程语言
  • C
  • Git
  • Go
  • Java
  • JavaScript
  • Python
  • Nvidia
  • Rust
  • Tex
  • Shell
  • Vue
  • 视频编解码
  • 计算机网络
  • SDN
  • 论文笔记
  • 讨论
  • 边缘计算
  • 量子信息技术
Tag
TimeLine
About
查看源码
  • interceptor寻踪:从TrackRemote开始深入挖掘pion/interceptor的用法

    • 在TrackRemote里
      • RTPReceiver
        • 最后一点
          • 总结

          interceptor寻踪:从`TrackRemote`开始深入挖掘`pion/interceptor`的用法

          vuePress-theme-reco Howard Yin    2021 - 2025

          interceptor寻踪:从TrackRemote开始深入挖掘pion/interceptor的用法


          Howard Yin 2021-10-09 11:34:00 WebRTC编程框架pion概念

          上接《interceptor寻踪:pion/interceptor在pion/webrtc里的用法解析》,来深入挖掘一下interceptor在TrackRemote里的用法

          # 在TrackRemote里

          从《pion中的TrackRemote》里的调用链可以看到,最核心的函数就只有一个Read:

          // Read reads data from the track.
          func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes, err error) {
          	t.mu.RLock()
          	r := t.receiver
          	peeked := t.peeked != nil
          	t.mu.RUnlock()
          
          	......
          
          	n, attributes, err = r.readRTP(b, t)
          	if err != nil {
          		return
          	}
          
          	err = t.checkAndUpdateTrack(b)
          	return
          }
          
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17

          以及一个调用Read的ReadRTP:

          // ReadRTP is a convenience method that wraps Read and unmarshals for you.
          func (t *TrackRemote) ReadRTP() (*rtp.Packet, interceptor.Attributes, error) {
          	b := make([]byte, t.receiver.api.settingEngine.getReceiveMTU())
          	i, attributes, err := t.Read(b)
          	if err != nil {
          		return nil, nil, err
          	}
          
          	r := &rtp.Packet{}
          	if err := r.Unmarshal(b[:i]); err != nil {
          		return nil, nil, err
          	}
          	return r, attributes, nil
          }
          
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14

          Read里最核心的明显就是这句r.readRTP(b, t),而这个r就是来自于上面那句r := t.receiver。再去看看TrackRemote的定义,可以发现这个t.receiver是个RTPReceiver:

          type TrackRemote struct {
          	......
          
          	receiver         *RTPReceiver
          
          	......
          }
          
          1
          2
          3
          4
          5
          6
          7

          所以这个TrackRemote里的interceptor相关操作是在外面定义好了封进RTPReceiver传进来的,TrackRemote是在调用它。

          那看看这个RTPReceiver何许人也?这个RTPReceiver和TrackRemote一样,也早在《pion学习总结:等待传入track的一般流程》就了解过了,《pion学习总结:等待传入track的一般流程》里可以看到它们是OnTrack里面输入Track处理函数的输入值。而从示例《用实例学习pion - gocv-receive》和《用实例学习pion - rtp-forwarder》中可以看到,处理传入流的操作就是调用TrackRemote的ReadRTP读取传进来的数据进行自己想要的处理。但pion的几个官方案例里面只示范了调用TrackRemote的过程,这个传入的RTPReceiver怎么用还没找到有案例。

          # RTPReceiver

          言归正传,现在来看看RTPReceiver的具体情况。首先从那句r.readRTP(b, t)入手看看。这个readRTP是个非导出函数:

          // readRTP should only be called by a track, this only exists so we can keep state in one place
          func (r *RTPReceiver) readRTP(b []byte, reader *TrackRemote) (n int, a interceptor.Attributes, err error) {
          	<-r.received
          	if t := r.streamsForTrack(reader); t != nil {
          		return t.rtpInterceptor.Read(b, a)
          	}
          
          	return 0, nil, fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
          }
          
          1
          2
          3
          4
          5
          6
          7
          8
          9

          这个开头注释说这个函数是给TrackRemote专用的,难怪是个非导出。想想也是,用户只要调用TrackRemote.ReadRTP就能读取RTP了,也不需要RTPReceiver里再搞个功能一样的函数,给做成非导出防止出Bug。

          很明显,就是用一个r.streamsForTrack读出一个包含了RTPReader的变量然后调用Read。这个streamsForTrack就是个查找函数:

          func (r *RTPReceiver) streamsForTrack(t *TrackRemote) *trackStreams {
          	for i := range r.tracks {
          		if r.tracks[i].track == t {
          			return &r.tracks[i]
          		}
          	}
          	return nil
          }
          
          1
          2
          3
          4
          5
          6
          7
          8

          这个函数从RTPReceiver的trackStreams列表里面一个个对比TrackRemote,找出对应的trackStreams。居然用的是顺序查找,哈哈哈。再看看这个trackStreams又是什么:

          // trackStreams maintains a mapping of RTP/RTCP streams to a specific track
          // a RTPReceiver may contain multiple streams if we are dealing with Simulcast
          type trackStreams struct {
          	track *TrackRemote
          
          	streamInfo, repairStreamInfo *interceptor.StreamInfo
          
          	rtpReadStream  *srtp.ReadStreamSRTP
          	rtpInterceptor interceptor.RTPReader
          
          	rtcpReadStream  *srtp.ReadStreamSRTCP
          	rtcpInterceptor interceptor.RTCPReader
          
          	repairReadStream  *srtp.ReadStreamSRTP
          	repairInterceptor interceptor.RTPReader
          
          	repairRtcpReadStream  *srtp.ReadStreamSRTCP
          	repairRtcpInterceptor interceptor.RTCPReader
          }
          
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19

          哦!RTPReader和RTCPReader都在这里,再看看它们都是从哪来的:

          哇居然都在一起生成的,在Receive函数里面,一股子《interceptor寻踪:从TrackLocal开始深入挖掘pion/interceptor的用法》取名为动词Send的初始化函数的既视感来了,这应该也是个取名为Receive实际上只是接收过程前的初始化函数吧。去看看,果然如此:

          // Receive initialize the track and starts all the transports
          func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) error {
          	r.mu.Lock()
          	defer r.mu.Unlock()
          	select {
          	case <-r.received:
          		return errRTPReceiverReceiveAlreadyCalled
          	default:
          	}
          	defer close(r.received)
          
          	globalParams := r.getParameters()
          	codec := RTPCodecCapability{}
          	if len(globalParams.Codecs) != 0 {
          		codec = globalParams.Codecs[0].RTPCodecCapability
          	}
          
          	for i := range parameters.Encodings { // 对每种编码方式都进行初始化
          		t := trackStreams{ // 创建trackStreams
          			track: newTrackRemote( // 这就是系统内生成TrackRemote的地方
          				r.kind,
          				parameters.Encodings[i].SSRC,
          				parameters.Encodings[i].RID,
          				r,
          			),
          		}
          
          		if parameters.Encodings[i].SSRC != 0 {
          			t.streamInfo = createStreamInfo("", parameters.Encodings[i].SSRC, 0, codec, globalParams.HeaderExtensions) // 生成trackStreams.streamInfo
          			var err error
          			if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *t.streamInfo); err != nil {
          				return err
          			} // 创建RTPReader和RTCPReader
          		}
          
          		r.tracks = append(r.tracks, t)
          
          		if rtxSsrc := parameters.Encodings[i].RTX.SSRC; rtxSsrc != 0 {
          			streamInfo := createStreamInfo("", rtxSsrc, 0, codec, globalParams.HeaderExtensions) // 生成trackStreams.streamInfo
          			rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, err := r.transport.streamsForSSRC(rtxSsrc, *streamInfo)
          			if err != nil {
          				return err
          			} // 创建RTPReader和RTCPReader
          
          			if err = r.receiveForRtx(rtxSsrc, "", streamInfo, rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor); err != nil {
          				return err
          			} // 这个操作和repair stream以及传输层拥塞控制TWCC有关,暂时还不了解
          		}
          	}
          
          	return nil
          }
          
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          36
          37
          38
          39
          40
          41
          42
          43
          44
          45
          46
          47
          48
          49
          50
          51
          52

          可以看到,主要是对传入的RTPReceiveParameters构造trackStreams并填入里面的各种东西,很好理解。

          但这里仍然不是最根本的,我们看到interceptor还是在r.transport.streamsForSSRC里生成的。进一步找这个r.transport.streamsForSSRC,发现在一个DTLSTransport的类里:

          // DTLSTransport allows an application access to information about the DTLS
          // transport over which RTP and RTCP packets are sent and received by
          // RTPSender and RTPReceiver, as well other data such as SCTP packets sent
          // and received by data channels.
          type DTLSTransport struct {
          	......
          }
          
          ......
          
          func (t *DTLSTransport) streamsForSSRC(ssrc SSRC, streamInfo interceptor.StreamInfo) (*srtp.ReadStreamSRTP, interceptor.RTPReader, *srtp.ReadStreamSRTCP, interceptor.RTCPReader, error) {
          	srtpSession, err := t.getSRTPSession()
          	if err != nil {
          		return nil, nil, nil, nil, err
          	}
          
          	rtpReadStream, err := srtpSession.OpenReadStream(uint32(ssrc))
          	if err != nil {
          		return nil, nil, nil, nil, err
          	}
          
          	rtpInterceptor := t.api.interceptor.BindRemoteStream(&streamInfo, interceptor.RTPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
          		n, err = rtpReadStream.Read(in)
          		return n, a, err
          	}))
          
          	srtcpSession, err := t.getSRTCPSession()
          	if err != nil {
          		return nil, nil, nil, nil, err
          	}
          
          	rtcpReadStream, err := srtcpSession.OpenReadStream(uint32(ssrc))
          	if err != nil {
          		return nil, nil, nil, nil, err
          	}
          
          	rtcpInterceptor := t.api.interceptor.BindRTCPReader(interceptor.RTPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
          		n, err = rtcpReadStream.Read(in)
          		return n, a, err
          	}))
          
          	return rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, nil
          }
          
          ......
          
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          36
          37
          38
          39
          40
          41
          42
          43
          44
          45

          从注释里可以看到,这是一个和SRTP以及SRTCP有关的类。所以这个类的存在也好理解,就是为了透明地实现SRTP和SRTCP的功能。在《interceptor寻踪:从TrackLocal开始深入挖掘pion/interceptor的用法》里我们也见到过发送端实现的SRTP和SRTCP功能,就是在NewRTPSender里给RTPSender所用的interceptor绑一个SRTCP发送RTCP包的操作,以及在Send里给RTPSender所用的interceptor绑一个SRTP发送RTP包的操作,和这里绑SRTP以及SRTCP接收操作的思想如出一辙。在用户那边看来好像是SRTP和STCP是透明的一样。

          # 最后一点

          截至目前,我们以及找到了interceptor初始化的位置和初始化的方式,但还不知道初始化是在哪里进行的。于是顺着Receive开始往上找:

          啊哈!果不其然,和《interceptor寻踪:从TrackLocal开始深入挖掘pion/interceptor的用法》里一样,这些初始化过程在最上层也是在SetLocalDescription和SetRemoteDescription里调用的。

          也好理解,SetLocalDescription和SetRemoteDescription里进行的就是根据SDP创建连接的过程,在这之后就能直接开始传输了,这些创建interceptor的初始化过程放在这个里面很合理。

          # 总结

          总结一下,TrackRemote接收流的相关操作其实还挺简单的:

          • 读取RTP包:OnTrack里用户获取到TrackRemote,调用TrackRemote里的Read,Read调用RTPReceiver里的非导出类执行发RTP包的操作
          • 读取RTCP包:OnTrack里用户获取到RTPReceiver,调用RTPReceiver里的Read就是实际读取RTCP包的操作
          • 初始化:在SetLocalDescription和SetRemoteDescription里,interceptor相关类被初始化(BindRemoteStream和BindRTCPReader)后放入TrackRemote和RTPReceiver里,在OnTrack里里用户获取到的就是这些初始化好的类
          • 接收方不负责发送,没有BindLocalStream和BindRTCPWriter,很合理

          接下来是最终总结:《interceptor寻踪:总结》

          帮助我们改善此页面!
          创建于: 2021-10-08 08:44:32

          更新于: 2021-10-09 11:34:07