Yin的笔记本

vuePress-theme-reco Howard Yin    2021 - 2025
Yin的笔记本 Yin的笔记本

Choose mode

  • dark
  • auto
  • light
Home
Category
  • CNCF
  • Docker
  • namespaces
  • Kubernetes
  • Kubernetes对象
  • Linux
  • MyIdeas
  • Revolution
  • WebRTC
  • 云计算
  • 人工智能
  • 分布式
  • 图像处理
  • 图形学
  • 微服务
  • 数学
  • OJ笔记
  • 博弈论
  • 形式语言与自动机
  • 数据库
  • 服务器运维
  • 编程语言
  • C
  • Git
  • Go
  • Java
  • JavaScript
  • Python
  • Nvidia
  • Rust
  • Tex
  • Shell
  • Vue
  • 视频编解码
  • 计算机网络
  • SDN
  • 论文笔记
  • 讨论
  • 边缘计算
  • 量子信息技术
Tag
TimeLine
About
查看源码
author-avatar

Howard Yin

303

Article

153

Tag

Home
Category
  • CNCF
  • Docker
  • namespaces
  • Kubernetes
  • Kubernetes对象
  • Linux
  • MyIdeas
  • Revolution
  • WebRTC
  • 云计算
  • 人工智能
  • 分布式
  • 图像处理
  • 图形学
  • 微服务
  • 数学
  • OJ笔记
  • 博弈论
  • 形式语言与自动机
  • 数据库
  • 服务器运维
  • 编程语言
  • C
  • Git
  • Go
  • Java
  • JavaScript
  • Python
  • Nvidia
  • Rust
  • Tex
  • Shell
  • Vue
  • 视频编解码
  • 计算机网络
  • SDN
  • 论文笔记
  • 讨论
  • 边缘计算
  • 量子信息技术
Tag
TimeLine
About
查看源码
  • interceptor寻踪:从TrackLocal开始深入挖掘pion/interceptor的用法

    • 在TrackLocal里
      • 进一步深入
        • 中场休息
          • 在RTPSender里
            • NewRTPSender
            • Read
          • 结束

          interceptor寻踪:从`TrackLocal`开始深入挖掘`pion/interceptor`的用法

          vuePress-theme-reco Howard Yin    2021 - 2025

          interceptor寻踪:从TrackLocal开始深入挖掘pion/interceptor的用法


          Howard Yin 2021-10-08 11:57:05 WebRTC编程框架pion概念

          上接《interceptor寻踪:pion/interceptor在pion/webrtc里的用法解析》,来深入挖掘一下interceptor在TrackLocal里的用法

          《pion中的TrackLocal》里面已经解析过,TrackLocal是用于发送媒体流的类,所以这里面的intrceptor也应该主要是为发送服务的。

          # 在TrackLocal里

          从《pion中的TrackLocal》里可以看到TrackLocal只是一个接口,interceptor应该是隐藏在Bind函数所输入的TrackLocalContext的writeStream里的:

          // TrackLocalWriter is the Writer for outbound RTP Packets
          type TrackLocalWriter interface {
          	// WriteRTP encrypts a RTP packet and writes to the connection
          	WriteRTP(header *rtp.Header, payload []byte) (int, error)
          
          	// Write encrypts and writes a full RTP packet
          	Write(b []byte) (int, error)
          }
          
          // TrackLocalContext is the Context passed when a TrackLocal has been Binded/Unbinded from a PeerConnection, and used
          // in Interceptors.
          type TrackLocalContext struct {
          	id          string
          	params      RTPParameters
          	ssrc        SSRC
          	writeStream TrackLocalWriter
          }
          
          ......
          
          // WriteStream returns the WriteStream for this TrackLocal. The implementer writes the outbound
          // media packets to it
          func (t *TrackLocalContext) WriteStream() TrackLocalWriter {
          	return t.writeStream
          }
          
          ......
          
          // TrackLocal is an interface that controls how the user can send media
          // The user can provide their own TrackLocal implementations, or use
          // the implementations in pkg/media
          type TrackLocal interface {
          	// Bind should implement the way how the media data flows from the Track to the PeerConnection
          	// This will be called internally after signaling is complete and the list of available
          	// codecs has been determined
          	Bind(TrackLocalContext) (RTPCodecParameters, error)
          	
          	......
          }
          
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          36
          37
          38
          39

          从《pion中的TrackLocal》里最后面介绍的TrackLocalStaticRTP案例可以看到,TrackLocalContext只有四个变量,前面三个一看就是静态的数据变量,显然TrackLocal发送数据用的东西应该就是这个TrackLocalWriter。从注释上看,这个TrackLocalWriter是由框架构造好了再传进去的,所以与interceptor相关的操作都是在外面定义好了封装为TrackLocalWriter再传进来的,TrackLocal里面本身不涉及interceptor相关的操作。

          # 进一步深入

          那么顺藤摸瓜,只要顺着TrackLocalWriter的实现这条路,也就能找到interceptor是怎么传进来的了。pion/webrtc里的TrackLocalWriter实现只有一个非导出类:

          type interceptorToTrackLocalWriter struct{ interceptor atomic.Value } // interceptor.RTPWriter }
          
          func (i *interceptorToTrackLocalWriter) WriteRTP(header *rtp.Header, payload []byte) (int, error) {
          	if writer, ok := i.interceptor.Load().(interceptor.RTPWriter); ok && writer != nil {
          		return writer.Write(header, payload, interceptor.Attributes{})
          	}
          
          	return 0, nil
          }
          
          func (i *interceptorToTrackLocalWriter) Write(b []byte) (int, error) {
          	packet := &rtp.Packet{}
          	if err := packet.Unmarshal(b); err != nil {
          		return 0, err
          	}
          
          	return i.WriteRTP(&packet.Header, packet.Payload)
          }
          
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18

          很明显,这个类就是在调用interceptor.RTPWriter,因为是个非导出类,所以肯定只会在框架里实例化。再顺藤摸瓜,可以找到这个类只在RTPSender实例化了一次:

          // Send Attempts to set the parameters controlling the sending of media.
          func (r *RTPSender) Send(parameters RTPSendParameters) error {
          	r.mu.Lock()
          	defer r.mu.Unlock()
          
          	if r.hasSent() {
          		return errRTPSenderSendAlreadyCalled
          	}
          
          	writeStream := &interceptorToTrackLocalWriter{} // 实例化interceptorToTrackLocalWriter
          	r.context = TrackLocalContext{
          		id:          r.id,
          		params:      r.api.mediaEngine.getRTPParametersByKind(r.track.Kind(), []RTPTransceiverDirection{RTPTransceiverDirectionSendonly}),
          		ssrc:        parameters.Encodings[0].SSRC,
          		writeStream: writeStream,
          	} // 用interceptorToTrackLocalWriter构造TrackLocalContext
          
          	codec, err := r.track.Bind(r.context) // 把TrackLocalContext绑给RTPSender里的TrackLocal(这个r.track是个TrackLocal)
          	if err != nil {
          		return err
          	}
          	r.context.params.Codecs = []RTPCodecParameters{codec}
          
          	r.streamInfo = createStreamInfo(r.id, parameters.Encodings[0].SSRC, codec.PayloadType, codec.RTPCodecCapability, parameters.HeaderExtensions)
          	rtpInterceptor := r.api.interceptor.BindLocalStream(&r.streamInfo, interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
          		return r.srtpStream.WriteRTP(header, payload)
          	})) // 给RTPSender所用的interceptor绑一个发送RTP包的操作
          	writeStream.interceptor.Store(rtpInterceptor) // 将返回的RTPWriter放进interceptorToTrackLocalWriter里
          
          	close(r.sendCalled)
          	return nil
          }
          
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32

          明显得不能再明显了,直接从无到有构造了一个TrackLocalContext给一个RTPSender中的TrackLocal(即r.track)去绑定,还给API里的interceptor输出流绑定了RTPWriter,然后把返回的RTPWriter赋值给了TrackLocalContext里的writeStream.interceptor。这样,当用户在其定义的TrackLocal里调用绑定的TrackLocalContext的WriteStream()就会得到一个包含发送操作的TrackLocalWriter(内部实际上是这个非导出的interceptorToTrackLocalWriter),调用其WriteRTP或是Write能执行发送操作。

          PS:如果顾名思义,Send函数里应该包含实际发送包的操作,然而并没有。这里面的各种东西构造好了之后直接就被存起来了,都没有返回给外面,没有什么发送的操作。所以这个函数的作用应该是“绑定”而不是“生成”,主要是要把各种东西绑到传入的TrackLocal里,实际收发RTP包的过程由用户在自己定义的TrackLocal里完成,就像《pion学习总结:等待传出track的一般流程》里介绍的那样。

          再看看这个Send函数是哪里调用的,发现唯一的调用在PeerConnection的一个私有方法里:

          // startRTPSenders starts all outbound RTP streams
          func (pc *PeerConnection) startRTPSenders(currentTransceivers []*RTPTransceiver) error {
          	for _, transceiver := range currentTransceivers {
          		if sender := transceiver.Sender(); sender != nil && sender.isNegotiated() && !sender.hasSent() {
          			err := sender.Send(sender.GetParameters())
          			if err != nil {
          				return err
          			}
          		}
          	}
          
          	return nil
          }
          
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13

          从函数名就可以看出来,这里只是一个启动RTP发送器的操作。这也更加印证了上面的想法:Send函数的作用是“绑定”。私以为,这个Send函数本应有个更好的名称,比如Start之类,这样就不会像现在这样令人迷惑了。

          PS:这里还有个操作比较迷惑。对每个currentTransceivers执行sender := transceiver.Sender()好理解,就是从RTP收发器里把发送器取出来;后面的这个sender.Send(sender.GetParameters())把自己的参数取出来又放回自己的函数里就不太好理解了,既然都是自己的参数,为什么不直接把取参数的操作放在Send里?可能是什么历史遗留问题吧。

          然后再看看startRTPSenders又是在哪调用的:

          哦!居然是在webrtc标准接口方法SetLocalDescription和SetRemoteDescription里,结合这个标准接口的功能,这就好理解了:

          • P2P连接开始时通信双方需要先交换自己的连接信息,让对方能通过这些信息找到要往哪个端口发视频。连接信息生成前,连接必须得先准备好,这个Send函数就是连接准备过程的一部分。

          # 中场休息

          截至目前,我们已经找到了传输数据的操作,我们看到:

          • BindRTCPWriter在NewPeerConnection里被调用,返回的RTCPWriter.Write在PeerConnection的WriteRTCP里调用,供用户发送一些自定义的RTCP包
          • BindLocalStream在RTPSender.Send里被调用,并且在最顶层上都是在SetLocalDescription和SetRemoteDescription里调用的,实际的包发送操作由用户在自己实现的TrackLocal里调用
          • 负责处理传入链接的两个操作BindRemoteStream和BindRTCPReader还没找到

          # 在RTPSender里

          现在,Send函数将我们引导到了这个RTPSender,从名字就能看出这应该是个发送RTP包的类,但从上面Send函数里的调用关系看到,Send函数只执行了一些绑定操作,实际的RTP和RTCP包的发送过程是用户在TrackLocal里调用的。

          # NewRTPSender

          定义RTPSender的文件开头就是这个NewRTPSender函数,但是它并不是RTPSender的类方法,而是API的:

          func (api *API) NewRTPSender(track TrackLocal, transport *DTLSTransport) (*RTPSender, error) {
          	......
          
          	r := &RTPSender{
          		track:      track,
          		transport:  transport,
          		api:        api,
          		sendCalled: make(chan struct{}),
          		stopCalled: make(chan struct{}),
          		ssrc:       SSRC(randutil.NewMathRandomGenerator().Uint32()),
          		id:         id,
          		srtpStream: &srtpWriterFuture{},
          	}
          
          	r.srtpStream.rtpSender = r
          
          	r.rtcpInterceptor = r.api.interceptor.BindRTCPReader(interceptor.RTPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
          		n, err = r.srtpStream.Read(in)
          		return n, a, err
          	}))
          
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20

          一看就能懂,这个函数里面构造了一个RTPSender,然后给它绑了一个和SRTP相关的RTCP包读取操作,获取到RTCPReader赋值给r.rtcpInterceptor。

          # RTPSender在系统中的地位

          查找一下NewRTPSender的调用位置,发现它就是在AddTrack里调用的:

          // AddTrack adds a Track to the PeerConnection
          func (pc *PeerConnection) AddTrack(track TrackLocal) (*RTPSender, error) {
          	if pc.isClosed.get() {
          		return nil, &rtcerr.InvalidStateError{Err: ErrConnectionClosed}
          	}
          
          	pc.mu.Lock()
          	defer pc.mu.Unlock()
          	for _, t := range pc.rtpTransceivers {
          		if !t.stopped && t.kind == track.Kind() && t.Sender() == nil {
          			sender, err := pc.api.NewRTPSender(track, pc.dtlsTransport)
          			if err == nil {
          				err = t.SetSender(sender, track)
          				if err != nil {
          					_ = sender.Stop()
          					t.setSender(nil)
          				}
          			}
          			if err != nil {
          				return nil, err
          			}
          			pc.onNegotiationNeeded()
          			return sender, nil
          		}
          	}
          
          	transceiver, err := pc.newTransceiverFromTrack(RTPTransceiverDirectionSendrecv, track)
          	if err != nil {
          		return nil, err
          	}
          	pc.addRTPTransceiver(transceiver)
          	return transceiver.Sender(), nil
          }
          
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33

          并且可以看到,这个AddTrack的核心功能其实就是:

          • 用输入的track构造RTPSender
          • 将RTPSender绑定到所有收发器(因为需要向每个传出流发RTP包)
          • 返回RTPSender

          所以很明显了,PeerConnection的AddTrack本质上就是在用TrackLocal构造RTPSender,然后把构造出来的RTPSender设置给各种Transceiver

          # Read

          再看看r.rtcpInterceptor是在哪调用的,发现了两个简单的函数:

          // Read reads incoming RTCP for this RTPReceiver
          func (r *RTPSender) Read(b []byte) (n int, a interceptor.Attributes, err error) {
          	select {
          	case <-r.sendCalled:
          		return r.rtcpInterceptor.Read(b, a)
          	case <-r.stopCalled:
          		return 0, nil, io.ErrClosedPipe
          	}
          }
          
          // ReadRTCP is a convenience method that wraps Read and unmarshals for you.
          func (r *RTPSender) ReadRTCP() ([]rtcp.Packet, interceptor.Attributes, error) {
          	b := make([]byte, r.api.settingEngine.getReceiveMTU())
          	i, attributes, err := r.Read(b)
          	if err != nil {
          		return nil, nil, err
          	}
          
          	pkts, err := rtcp.Unmarshal(b[:i])
          	if err != nil {
          		return nil, nil, err
          	}
          
          	return pkts, attributes, nil
          }
          
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25

          就是两个接收RTCP包的函数,r.rtcpInterceptor应该是阻塞式的,所以这里大概也是阻塞式的,不用太多解释。再找这两个函数的调用位置,发现都是在用户代码里:

          点进去一看发现都是这种东西:

          很熟悉啊!从《用实例学习pion - rtp-to-webrtc》开始就一直没搞懂的奇怪操作,学了这么多到现在就能搞懂了:因为RTCP包的处理操作都在RTCPReader.Read里,所以必须主动调用它才能够让RTCP的各种功能有效。比如在NACK功能里面,读取NACK包并识别哪些包要重发的操作就是在RTCPReader.Read里,必须调用它才能让interceptor知道哪些包要重发。

          # 结束

          目前为止,我们看到:

          • BindRTCPWriter在NewPeerConnection里被调用,返回的RTCPWriter.Write在PeerConnection的WriteRTCP里调用,供用户发送一些自定义的RTCP包
          • BindRTCPReader在NewRTPSender里被调用,返回的RTCPReader.Read在RTPSender的Read里调用,供用户从RTPSender里读取自定义的RTCP包
          • BindLocalStream在RTPSender.Send里被调用,并且在最顶层上都是在SetLocalDescription和SetRemoteDescription里初始化时调用的。在RTPSender.Send里,RTPSender构造为TrackLocalWriter封装进TrackLocalContext然后绑定给用户定义的TrackLocal里,实际发送RTP包需要用户在自己实现的TrackLocal里调用TrackLocalWriter.Write
          • 没有BindRemoteStream,毕竟TrackLocal是发送数据流的,没有接收RTP包的相关操作,很合理

          本篇主要解析了发送方的interceptor调用链,接下来解析接收方的interceptor调用链:《interceptor寻踪:从TrackRemote开始深入挖掘pion/interceptor的用法》

          帮助我们改善此页面!
          创建于: 2021-10-08 08:28:09

          更新于: 2021-10-08 11:57:52