2 SRS流媒体服务器集群之Edge模式

本篇文章,主要讲解Edege模式推拉流的调试和源码分析 。
1.Edege推拉流相关类介绍
(1) 从这里的源码可以看出,Edege模式的拉流和推流的管理 , 都是由SrsSource这个类来管理 , 后面的源码和函数调用,也会体现出来 。
SrsSource::SrsSource(){req = NULL;jitter_algorithm = SrsRtmpJitterAlgorithmOFF;mix_correct = false;mix_queue = new SrsMixQueue();_can_publish = true;_pre_source_id = _source_id = -1;die_at = 0;//控制边缘节点的拉流,源站origin到边缘节点Edgeplay_edge = new SrsPlayEdge();//控制边缘节点的推流,边缘节点Edge到源站originpublish_edge = new SrsPublishEdge();//控制gop的cachegop_cache = new SrsGopCache();//控制源站的路由hub = new SrsOriginHub();//控制Meta的cachemeta = new SrsMetaCache();is_monotonically_increase = false;last_packet_time = 0;_srs_config->subscribe(this);atc = false;}
(2) SrsEdgeUpstream就是从源站里面去拉流 。
SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost){SrsConfDirective* conf = get_vhost(vhost);if (!conf) {return NULL;}conf = conf->get("cluster");if (!conf) {return NULL;}return conf->get("origin");}
(3)Rtmp源站拉流类SrsEdgeRtmpUpstream,基础上面的类 。源码如下:
SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost){SrsConfDirective* conf = get_vhost(vhost);if (!conf) {return NULL;}conf = conf->get("cluster");if (!conf) {return NULL;}return conf->get("origin");}
(4)拉流 , 播放相关:
class SrsPublishEdge{private:SrsEdgeState state;SrsEdgeForwarder* forwarder;public:SrsPublishEdge();virtual ~SrsPublishEdge();public:virtual void set_queue_size(srs_utime_t queue_size);public:virtual srs_error_t initialize(SrsSource* source, SrsRequest* req);virtual bool can_publish();// When client publish stream on edge.virtual srs_error_t on_client_publish();// Proxy publish stream to edgevirtual srs_error_t on_proxy_publish(SrsCommonMessage* msg);// Proxy unpublish stream to edge.virtual void on_proxy_unpublish();};
// The edge used to ingest stream from origin.class SrsEdgeIngester : public ISrsCoroutineHandler{private:SrsSource* source;SrsPlayEdge* edge;SrsRequest* req;SrsCoroutine* trd;SrsLbRoundRobin* lb;SrsEdgeUpstream* upstream;public:SrsEdgeIngester();virtual ~SrsEdgeIngester();public:virtual srs_error_t initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r);virtual srs_error_t start();virtual void stop();virtual std::string get_curr_origin();// Interface ISrsReusableThread2Handlerpublic:virtual srs_error_t cycle();private:virtual srs_error_t do_cycle();private:virtual srs_error_t ingest(std::string& redirect);virtual srs_error_t process_publish_message(SrsCommonMessage* msg, std::string& redirect);};
(5)源站推流类SrsEdgeForwarder , 源码如下:
SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost){SrsConfDirective* conf = get_vhost(vhost);if (!conf) {return NULL;}conf = conf->get("cluster");if (!conf) {return NULL;}return conf->get("origin");}
(6)推流相关类:
class SrsPublishEdge{private:SrsEdgeState state;SrsEdgeForwarder* forwarder;public:SrsPublishEdge();virtual ~SrsPublishEdge();public:virtual void set_queue_size(srs_utime_t queue_size);public:virtual srs_error_t initialize(SrsSource* source, SrsRequest* req);virtual bool can_publish();// When client publish stream on edge.virtual srs_error_t on_client_publish();// Proxy publish stream to edgevirtual srs_error_t on_proxy_publish(SrsCommonMessage* msg);// Proxy unpublish stream to edge.virtual void on_proxy_unpublish();};
2.Edge模式推流源码分析
(1)推流源码分析,从配置文件,开始读取 。
SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost){SrsConfDirective* conf = get_vhost(vhost);if (!conf) {return NULL;}conf = conf->get("cluster");if (!conf) {return NULL;}return conf->get("origin");}
(2)前面的文章已经讲过,在Edge模式下,当推流端推流时,首先是推到源站 。拉流时,如果边缘节点有缓存,就直接从边缘节点拉?。裨蚧故且枰釉凑救ダ?。当有多个源站origin时,
推流,首先推到源站 。
srs_error_t SrsEdgeForwarder::start(){srs_error_t err = srs_success;// reset the error code.send_error_code = ERROR_SUCCESS;std::string url;if (true) {SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);srs_assert(conf);// select the origin.std::string server = lb->select(conf->args);int port = SRS_CONSTS_RTMP_DEFAULT_PORT;srs_parse_hostport(server, server, port);// support vhost tranform for edge,// @see https://github.com/ossrs/srs/issues/372std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);vhost = srs_string_replace(vhost, "[vhost]", req->vhost);url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);}// open socket.srs_freep(sdk);srs_utime_t cto = SRS_EDGE_FORWARDER_TIMEOUT;srs_utime_t sto = SRS_CONSTS_RTMP_TIMEOUT;sdk = new SrsSimpleRtmpClient(url, cto, sto);if ((err = sdk->connect()) != srs_success) {return srs_error_wrap(err, "sdk connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));}if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost))) != srs_success) {return srs_error_wrap(err, "sdk publish");}srs_freep(trd);trd = new SrsSTCoroutine("edge-fwr", this, _srs_context->get_id());if ((err = trd->start()) != srs_success) {return srs_error_wrap(err, "coroutine");}srs_trace("edge-fwr publish url %s", url.c_str());return err;}
(3)如果这个时候有一个源站断开 , 当再开启推流时,就会推送到另外一个源站 。如果有多个源站,就是按照配置文件的配置,从左到右,这样一个顺序,去一个接一个去推 。
(4)开启调试SRS,输入命令:
gdb ./objs/srs
界面如下:
(5)调试配置文件,输入命令:
set args -c conf/edge1.conf
b main
c
r
界面如下:
打印断点,输入命令:
b SrsConfig::get_vhost_edge_origin
界面如下:
SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost){SrsConfDirective* conf = get_vhost(vhost);if (!conf) {return NULL;}conf = conf->get("cluster");if (!conf) {return NULL;}return conf->get("origin");}
判断当前节点,输入命令:
b SrsConfig::get_vhost_is_edge
界面如下:
(6)判断当前节点是否为边缘节点的源码 。
bool SrsConfig::get_vhost_is_edge(SrsConfDirective* vhost){static bool DEFAULT = false;SrsConfDirective* conf = vhost;if (!conf) {return DEFAULT;}conf = conf->get("cluster");if (!conf) {return DEFAULT;}conf = conf->get("mode");if (!conf || conf->arg0().empty()) {return DEFAULT;}return "remote" == conf->arg0();}
运行起来,执行命令:
c
跑起来 , 如下界面:
到这里 , 应该要开启推流,注意这里是推流到边缘节点(这里举例以19350),如果不知道怎么推流,可以参考前面一篇文章 。
(7)这个时候,就会运行到断点这里,然后输入命令:
bt
查看调用栈,如下界面:
0 SrsConfig::get_vhost_is_edge(SrsConfDirective* vhost)at src/app/srs_app_config.cpp:50631 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:4722SrsRtmpConn::service_cycle()at src/app/srs_app_rtmp_conn.cpp:3883 SrsRtmpConn::do_cycle()at src/app/srs_app_rtmp_conn.cpp:2094 SrsConnection::cycle()at src/app/srs_app_conn.cpp:1715 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:1986 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:2137 _st_thread_main at sched.c:3378 st_thread_create at sched.c:616
继续输入调试命令:
c
最主要是看get_vhost_edge_origin,如下界面:
(8)查看调用栈,输入命令:
bt
如下界面:
0SrsConfig::get_vhost_edge_origin(string vhost) at src/app/srs_app_config.cpp:50911 SrsEdgeForwarder::start() at src/app/srs_app_edge.cpp:4822 SrsPublishEdge::on_client_publish() at src/app/srs_app_edge.cpp:7773 SrsSource::on_edge_start_publish() at src/app/srs_app_source.cpp:25924 SrsRtmpConn::acquire_publish(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:9365 SrsRtmpConn::publishing(SrsSource* source)at src/app/srs_app_rtmp_conn.cpp:8226 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:5347SrsRtmpConn::service_cycle()at src/app/srs_app_rtmp_conn.cpp:3888 SrsRtmpConn::do_cycle()at src/app/srs_app_rtmp_conn.cpp:2099 SrsConnection::cycle()at src/app/srs_app_conn.cpp:17110 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:19811 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:21312 _st_thread_main at sched.c:33713st_thread_create at sched.c:616
(9)从上面调用栈来看,最大的区别就是该函数下判断是否是边缘节点 。源码如下:
srs_error_t SrsRtmpConn::acquire_publish(SrsSource* source){srs_error_t err = srs_success;SrsRequest* req = info->req;if (!source->can_publish(info->edge)) {return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp: stream %s is busy", req->get_stream_url().c_str());}// when edge, ignore the publish event, directly proxy it.if (info->edge) {//是边缘节点if ((err = source->on_edge_start_publish()) != srs_success) {return srs_error_wrap(err, "rtmp: edge start publish");}} else {//不是边缘节点if ((err = source->on_publish()) != srs_success) {return srs_error_wrap(err, "rtmp: source publish");}}return err;}
(10)看看边缘节点的分支,源码如下:
srs_error_t SrsSource::on_edge_start_publish(){//推流到源站return publish_edge->on_client_publish();}
调用SrsPublishEdge::on_client_publish() 。该函数的功能是从边缘节点推送到源站 。从源码可以看出是调用forwarder去推送 。
srs_error_t SrsPublishEdge::on_client_publish(){srs_error_t err = srs_success;// error when not init state.if (state != SrsEdgeStateInit) {return srs_error_new(ERROR_RTMP_EDGE_PUBLISH_STATE, "invalid state");}// @see https://github.com/ossrs/srs/issues/180// to avoid multiple publish the same stream on the same edge,// directly enter the publish stage.if (true) {SrsEdgeState pstate = state;state = SrsEdgeStatePublish;srs_trace("edge change from %d to state %d (push).", pstate, state);}// start to forward stream to origin.err = forwarder->start();// @see https://github.com/ossrs/srs/issues/180// when failed, revert to initif (err != srs_success) {SrsEdgeState pstate = state;state = SrsEdgeStateInit;srs_trace("edge revert from %d to state %d (push), error %s", pstate, state, srs_error_desc(err).c_str());}return err;}
(11)每一路边缘节点推流到源站,都是用forwarder , 开启forwarder去推送数据 。这时候会开启一个协程 。源码如下:
srs_error_t SrsEdgeForwarder::start(){srs_error_t err = srs_success;// reset the error code.send_error_code = ERROR_SUCCESS;std::string url;if (true) {SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);srs_assert(conf);// select the origin.std::string server = lb->select(conf->args);int port = SRS_CONSTS_RTMP_DEFAULT_PORT;srs_parse_hostport(server, server, port);// support vhost tranform for edge,// @see https://github.com/ossrs/srs/issues/372std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);vhost = srs_string_replace(vhost, "[vhost]", req->vhost);url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);}// open socket.srs_freep(sdk);srs_utime_t cto = SRS_EDGE_FORWARDER_TIMEOUT;srs_utime_t sto = SRS_CONSTS_RTMP_TIMEOUT;sdk = new SrsSimpleRtmpClient(url, cto, sto);if ((err = sdk->connect()) != srs_success) {return srs_error_wrap(err, "sdk connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));}if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost))) != srs_success) {return srs_error_wrap(err, "sdk publish");}srs_freep(trd);//开启协程trd = new SrsSTCoroutine("edge-fwr", this, _srs_context->get_id());if ((err = trd->start()) != srs_success) {return srs_error_wrap(err, "coroutine");}srs_trace("edge-fwr publish url %s", url.c_str());return err;}
(12)每一个协程,都必定有一个do_cycle() , 这里的sdk代表的是客户端(指的是sdk edge到origin的rtmp客户端),源码如下:
srs_error_t SrsEdgeForwarder::do_cycle(){srs_error_t err = srs_success;sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE);SrsPithyPrint* pprint = SrsPithyPrint::create_edge();SrsAutoFree(SrsPithyPrint, pprint);SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "edge forward pull");}if (send_error_code != ERROR_SUCCESS) {srs_usleep(SRS_EDGE_FORWARDER_TIMEOUT);continue;}// read from client.if (true) {SrsCommonMessage* msg = NULL;//sdk代表的是客户端err = sdk->recv_message(&msg);srs_verbose("edge loop recv message. ret=%d", ret);if (err != srs_success && srs_error_code(err) != ERROR_SOCKET_TIMEOUT) {srs_error("edge push get server control message failed. err=%s", srs_error_desc(err).c_str());send_error_code = srs_error_code(err);srs_error_reset(err);continue;}srs_error_reset(err);srs_freep(msg);}// forward all messages.// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.int count = 0;//从队列里读取数据if ((err = queue->dump_packets(msgs.max, msgs.msgs, count)) != srs_success) {return srs_error_wrap(err, "queue dumps packets");}pprint->elapse();// pithy printif (pprint->can_print()) {sdk->kbps_sample(SRS_CONSTS_LOG_EDGE_PUBLISH, pprint->age(), count);}// ignore when no messages.if (count send_and_free_messages(msgs.msgs, count)) != srs_success) {return srs_error_wrap(err, "send messages");}}return err;}
(13)这个函数是接收来自推流客户端到edge边缘节点的数据 。也就是把数据放到队列里面去 。然后把这些数据通过上面的do_cycle去读取和发送到origin 。源码如下:
srs_error_t SrsEdgeForwarder::proxy(SrsCommonMessage* msg){srs_error_t err = srs_success;if (send_error_code != ERROR_SUCCESS) {return srs_error_new(send_error_code, "edge forwarder");}// the msg is auto free by source,// so we just ignore, or copy then send it.if (msg->size header.is_set_chunk_size()|| msg->header.is_window_ackledgement_size()|| msg->header.is_ackledgement()) {return err;}SrsSharedPtrMessage copy;if ((err = copy.create(msg)) != srs_success) {return srs_error_wrap(err, "create message");}copy.stream_id = sdk->sid();//接收数据,放入队列if ((err = queue->enqueue(copy.copy())) != srs_success) {return srs_error_wrap(err, "enqueue message");}return err;}
(14)在入队列,这里打印断点,输入命令:
b SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
c
如下界面:
(15)查看调用栈,输入命令:
bt
如下界面:
这个调用栈的流程 , 就是从从推流队列里去读取数据,给SrsEdgeForwarder,最后再给对应的协程执行do_cycle去执行 。
0SrsConfig::get_vhost_edge_origin(string vhost) at src/app/srs_app_config.cpp:50911 SrsEdgeForwarder::start() at src/app/srs_app_edge.cpp:4822 SrsPublishEdge::on_client_publish() at src/app/srs_app_edge.cpp:7773 SrsSource::on_edge_start_publish() at src/app/srs_app_source.cpp:25924 SrsRtmpConn::acquire_publish(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:9365 SrsRtmpConn::publishing(SrsSource* source)at src/app/srs_app_rtmp_conn.cpp:8226 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:5347SrsRtmpConn::service_cycle()at src/app/srs_app_rtmp_conn.cpp:3888 SrsRtmpConn::do_cycle()at src/app/srs_app_rtmp_conn.cpp:2099 SrsConnection::cycle()at src/app/srs_app_recv_thread.cpp:19810SrsRecvThread::cycle()at src/app/srs_app_st.cpp:19811 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:21312 _st_thread_main at sched.c:33713st_thread_create at sched.c:616
如果看过前面文章的朋友,应该知道,在RTMP推流时,如果是边缘节点和非边缘节点 , 走的流程是不一样 。源码如下:
srs_error_t SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg){srs_error_t err = srs_success;// for edge, directly proxy message to origin.//边缘节点if (info->edge) {if ((err = source->on_edge_proxy_publish(msg)) != srs_success) {return srs_error_wrap(err, "rtmp: proxy publish");}return err;}// process audio packetif (msg->header.is_audio()) {if ((err = source->on_audio(msg)) != srs_success) {return srs_error_wrap(err, "rtmp: consume audio");}return err;}// process video packetif (msg->header.is_video()) {if ((err = source->on_video(msg)) != srs_success) {return srs_error_wrap(err, "rtmp: consume video");}return err;}// process aggregate packetif (msg->header.is_aggregate()) {if ((err = source->on_aggregate(msg)) != srs_success) {return srs_error_wrap(err, "rtmp: consume aggregate");}return err;}// process onMetaDataif (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {SrsPacket* pkt = NULL;if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {return srs_error_wrap(err, "rtmp: decode message");}SrsAutoFree(SrsPacket, pkt);if (dynamic_cast(pkt)) {SrsOnMetaDataPacket* metadata = https://www.30zx.com/dynamic_cast(pkt);if ((err = source->on_meta_data(msg, metadata)) != srs_success) {return srs_error_wrap(err,"rtmp: consume metadata");}return err;}return err;}return err;}
3.接下来分析下Edge模式url实现单点登录,拉流的源码 。
(1)在该函数下,打印断点 , 输入命令:
b SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
b SrsEdgeIngester::SrsEdgeIngester()
b srs_error_t SrsEdgeIngester::do_cycle()
b srs_error_t SrsPlayEdge::on_client_play()
c
界面如下:
(2)启动拉流协程,源码如下:
srs_error_t SrsPlayEdge::on_client_play(){srs_error_t err = srs_success;// start ingest when init state.if (state == SrsEdgeStateInit) {state = SrsEdgeStatePlay;//启动拉流协程err = ingester->start();}return err;}
(3)运行到这里,要保证推流依然运行正常 。然后再去拉流 。
查看on_client_play的调用栈 , 输入命令:
【2 SRS流媒体服务器集群之Edge模式】bt
界面如下:
0 SrsPlayEdge::on_client_play() at src/app/srs_app_edge.cpp:6771SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg) at src/app/srs_app_source.cpp:25582 SrsRtmpConn::playing(SrsSource* source) at src/app/srs_app_rtmp_conn.cpp:6493 SrsRtmpConn::stream_service_cycle() at src/app/srs_app_rtmp_conn.cpp:5344 SrsRtmpConn::service_cycle()at src/app/srs_app_rtmp_conn.cpp:3885 SrsRtmpConn::do_cycle()at src/app/srs_app_rtmp_conn.cpp:2096 SrsConnection::cycle()at src/app/srs_app_conn.cpp:1717 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:1988 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:2139_st_thread_main at sched.c:33710st_thread_create at sched.c:616
从调用栈的关系可以看出,这里可以看出 , SrsSource::create_consumer这里需要判断是否是源站 , 如果有源站,那么就从源站这里去拉流,并启动on_client_play,源码如下图:
srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg){srs_error_t err = srs_success;consumer = new SrsConsumer(this, conn);consumers.push_back(consumer);srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);consumer->set_queue_size(queue_size);// if atc, update the sequence header to gop cache time.if (atc && !gop_cache->empty()) {if (meta->data()) {meta->data()->timestamp = srsu2ms(gop_cache->start_time());}if (meta->vsh()) {meta->vsh()->timestamp = srsu2ms(gop_cache->start_time());}if (meta->ash()) {meta->ash()->timestamp = srsu2ms(gop_cache->start_time());}}// If stream is publishing, dumps the sequence header and gop cache.if (hub->active()) {// Copy metadata and sequence header to consumer.if ((err = meta->dumps(consumer, atc, jitter_algorithm, dm, ds)) != srs_success) {return srs_error_wrap(err, "meta dumps");}// copy gop cache to client.if (dg && (err = gop_cache->dump(consumer, atc, jitter_algorithm)) != srs_success) {return srs_error_wrap(err, "gop cache dumps");}}// print status.if (dg) {srs_trace("create consumer, active=%d, queue_size=%.2f, jitter=%d", hub->active(), queue_size, jitter_algorithm);} else {srs_trace("create consumer, active=%d, ignore gop cache, jitter=%d", hub->active(), jitter_algorithm);}// for edge, when play edge stream, check the state//如果是源站 , 就从源站去拉流if (_srs_config->get_vhost_is_edge(req->vhost)) {// notice edge to start for the first client.if ((err = play_edge->on_client_play()) != srs_success) {return srs_error_wrap(err, "play edge");}}return err;}
(4)启动on_client_play(),从这里可以看出,这里只可以启动一次 , 源码如下:
srs_error_t SrsPlayEdge::on_client_play(){srs_error_t err = srs_success;// start ingest when init state.if (state == SrsEdgeStateInit) {state = SrsEdgeStatePlay;//要启动拉流origin-edge协程 , 一个source只会start一次err = ingester->start();}return err;}
(5)如果所有的拉流端都断开,那么需要有一个状态变更 。源码如下:
void SrsPlayEdge::on_all_client_stop(){// when all client disconnected,// and edge is ingesting origin stream, abort it.if (state == SrsEdgeStatePlay || state == SrsEdgeStateIngestConnected) {SrsEdgeState pstate = state;state = SrsEdgeStateIngestStopping;ingester->stop();state = SrsEdgeStateInit;srs_trace("edge change from %d to %d then %d (init).", pstate, SrsEdgeStateIngestStopping, state);return;}}
(6)查看SrsEdgeRtmpUpstream的调用栈
0 SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb) at src/app/srs_app_edge.cpp:761 SrsEdgeIngester::do_cycle()at src/app/srs_app_edge.cpp:2712 SrsEdgeIngester::cycle() at src/app/srs_app_edge.cpp:2433 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:1984 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:2135_st_thread_main at sched.c:3376st_thread_create at sched.c:616
这里会创建一个upstream 。源码如下:
srs_error_t SrsEdgeIngester::do_cycle(){srs_error_t err = srs_success;std::string redirect;while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "do cycle pull");}srs_freep(upstream);upstream = new SrsEdgeRtmpUpstream(redirect);if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) {return srs_error_wrap(err, "on source id changed");}if ((err = upstream->connect(req, lb)) != srs_success) {return srs_error_wrap(err, "connect upstream");}if ((err = edge->on_ingest_play()) != srs_success) {return srs_error_wrap(err, "notify edge play");}// set to larger timeout to read av data from origin.upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);err = ingest(redirect);// retry for rtmp 302 immediately.if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {int port;string server;upstream->selected(server, port);string url = req->get_stream_url();srs_warn("RTMP redirect %s from %s:%d to %s", url.c_str(), server.c_str(), port, redirect.c_str());srs_error_reset(err);continue;}if (srs_is_client_gracefully_close(err)) {srs_warn("origin disconnected, retry, error %s", srs_error_desc(err).c_str());srs_error_reset(err);}break;}return err;}
(7) 在前面的源码分析中,upstream对应的就是一个拉流客户端 。如下界面:
上面的函数源码,重点关注SrsEdgeIngester::ingest,打印断点,调试,输入命令:
b SrsEdgeIngester::ingest(string& redirect)
bt
界面如下:
查看调用栈,如下界面:
(8) SysEdgeIngester::do_cycle是一个推流主循环,主要是从边缘节点推流到源站的推流主循环工作 。
srs_error_t SrsEdgeIngester::ingest(string& redirect){srs_error_t err = srs_success;SrsPithyPrint* pprint = SrsPithyPrint::create_edge();SrsAutoFree(SrsPithyPrint, pprint);// we only use the redict once.// reset the redirect to empty, for maybe the origin changed.redirect = "";while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "thread quit");}pprint->elapse();// pithy printif (pprint->can_print()) {upstream->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age());}// read from client.SrsCommonMessage* msg = NULL;//接收数据if ((err = upstream->recv_message(&msg)) != srs_success) {return srs_error_wrap(err, "recv message");}srs_assert(msg);SrsAutoFree(SrsCommonMessage, msg);//重要的函数,推流if ((err = process_publish_message(msg, redirect)) != srs_success) {return srs_error_wrap(err, "process message");}}return err;}
(9)视频、音频、Metadata都是在这里处理,最终都是使用SrsSource里面对应的数据 , 所以这是一个重点分析的函数 。
srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, string& redirect){srs_error_t err = srs_success;// process audio packetif (msg->header.is_audio()) {if ((err = source->on_audio(msg)) != srs_success) {return srs_error_wrap(err, "source consume audio");}}// process video packetif (msg->header.is_video()) {if ((err = source->on_video(msg)) != srs_success) {return srs_error_wrap(err, "source consume video");}}// process aggregate packetif (msg->header.is_aggregate()) {if ((err = source->on_aggregate(msg)) != srs_success) {return srs_error_wrap(err, "source consume aggregate");}return err;}// process onMetaDataif (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {SrsPacket* pkt = NULL;if ((err = upstream->decode_message(msg, &pkt)) != srs_success) {return srs_error_wrap(err, "decode message");}SrsAutoFree(SrsPacket, pkt);if (dynamic_cast(pkt)) {SrsOnMetaDataPacket* metadata = https://www.30zx.com/dynamic_cast(pkt);if ((err = source->on_meta_data(msg, metadata)) != srs_success) {return srs_error_wrap(err,"source consume metadata");}return err;}return err;}// call messages, for example, reject, redirect.if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {SrsPacket* pkt = NULL;if ((err = upstream->decode_message(msg, &pkt)) != srs_success) {return srs_error_wrap(err, "decode message");}SrsAutoFree(SrsPacket, pkt);// RTMP 302 redirectif (dynamic_cast(pkt)) {SrsCallPacket* call = dynamic_cast(pkt);if (!call->arguments->is_object()) {return err;}SrsAmf0Any* prop = NULL;SrsAmf0Object* evt = call->arguments->to_object();if ((prop = evt->ensure_property_string("level")) == NULL) {return err;} else if (prop->to_str() != StatusLevelError) {return err;}if ((prop = evt->get_property("ex")) == NULL || !prop->is_object()) {return err;}SrsAmf0Object* ex = prop->to_object();// The redirect is tcUrl while redirect2 is RTMP URL.// https://github.com/ossrs/srs/issues/1575#issuecomment-574999798if ((prop = ex->ensure_property_string("redirect2")) == NULL) {// TODO: FIXME: Remove it when SRS3 released, it's temporarily support for SRS3 alpha versions(a0 to a8).if ((prop = ex->ensure_property_string("redirect")) == NULL) {return err;}}redirect = prop->to_str();return srs_error_new(ERROR_CONTROL_REDIRECT, "RTMP 302 redirect to %s", redirect.c_str());}}return err;}
(10) 这里以SrsSource::on_video举例,打印断点,并查看调用栈 , 输入命令:
b SrsSource::on_video(SrsCommonMessage* shared_video)
bt
调用栈界面如下:
srs_error_t SrsEdgeIngester::ingest(string& redirect) at src/app/srs_app_edge.cpp:3391 SrsEdgeIngester::do_cycle() at src/app/srs_app_edge.cpp:2822 SrsEdgeIngester::cycle() at src/app/srs_app_edge.cpp:2433 srs_error_t SrsSTCoroutine::cycle() at src/app/srs_app_st.cpp:1984 SrsSTCoroutine::pfn(void* arg) at src/app/srs_app_st.cpp:2135_st_thread_main at sched.c:3376st_thread_create at sched.c:616
通过这里的函数调用关系,可以知道,从源站拉回来后,最终还是通过SrsSource去分发 。
srs_error_t SrsSource::on_video(SrsCommonMessage* shared_video){srs_error_t err = srs_success;// monotically increase detect.if (!mix_correct && is_monotonically_increase) {if (last_packet_time > 0 && shared_video->header.timestamp header.timestamp;// drop any unknown header video.// @see https://github.com/ossrs/srs/issues/421if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) {char b0 = 0x00;if (shared_video->size > 0) {b0 = shared_video->payload[0];}srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0);return err;}// convert shared_video to msg, user should not use shared_video again.// the payload is transfer to msg, and set to NULL in shared_video.SrsSharedPtrMessage msg;if ((err = msg.create(shared_video)) != srs_success) {return srs_error_wrap(err, "create message");}// directly process the audio message.if (!mix_correct) {return on_video_imp(&msg);}// insert msg to the queue.mix_queue->push(msg.copy());// fetch someone from mix queue.SrsSharedPtrMessage* m = mix_queue->pop();if (!m) {return err;}// consume the monotonically increase message.if (m->is_audio()) {err = on_audio_imp(m);} else {err = on_video_imp(m);}srs_freep(m);return err;}
(11)前面讲过url实现单点登录 , 当有多个源站时,会选择一个正在运行的源站,通过函数调用lb->select(conf->args),体现如下:
srs_error_t SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb){srs_error_t err = srs_success;SrsRequest* req = r;std::string url;if (true) {//读取配置文件源站SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);// @see https://github.com/ossrs/srs/issues/79// when origin is error, for instance, server is shutdown,// then user remove the vhost then reload, the conf is empty.if (!conf) {return srs_error_new(ERROR_EDGE_VHOST_REMOVED, "vhost %s removed", req->vhost.c_str());}// select the origin.//选择源站std::string server = lb->select(conf->args);int port = SRS_CONSTS_RTMP_DEFAULT_PORT;srs_parse_hostport(server, server, port);// override the origin info by redirect.if (!redirect.empty()) {int _port;string _schema, _vhost, _app, _stream, _param, _host;srs_discovery_tc_url(redirect, _schema, _host, _vhost, _app, _stream, _port, _param);server = _host;port = _port;}// Remember the current selected server.selected_ip = server;selected_port = port;// support vhost tranform for edge,// @see https://github.com/ossrs/srs/issues/372std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);vhost = srs_string_replace(vhost, "[vhost]", req->vhost);url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);}srs_freep(sdk);srs_utime_t cto = SRS_EDGE_INGESTER_TIMEOUT;srs_utime_t sto = SRS_CONSTS_RTMP_PULSE;sdk = new SrsSimpleRtmpClient(url, cto, sto);if ((err = sdk->connect()) != srs_success) {return srs_error_wrap(err, "edge pull %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));}if ((err = sdk->play(_srs_config->get_chunk_size(req->vhost))) != srs_success) {return srs_error_wrap(err, "edge pull %s stream failed", url.c_str());}return err;}
4.总结
本篇文章重点分析了Edege模式的推拉流源码及调试过程,可以更加清楚认识Edege模式 。希望能够帮助到大家 。欢迎关注,转发,点赞 , 收藏,分享,评论区讨论 。
本文到此结束,希望对大家有所帮助!

猜你喜欢