diff --git a/plugins/obs-outputs/librtmp/rtmp.c b/plugins/obs-outputs/librtmp/rtmp.c index dd3acaf59..a9013e67b 100644 --- a/plugins/obs-outputs/librtmp/rtmp.c +++ b/plugins/obs-outputs/librtmp/rtmp.c @@ -487,6 +487,12 @@ RTMP_EnableWrite(RTMP *r) r->Link.protocol |= RTMP_FEATURE_WRITE; } +void +RTMP_EnableReconnect(RTMP *r) +{ + r->Link.protocol |= RTMP_FEATURE_RECONNECT; +} + double RTMP_GetDuration(RTMP *r) { @@ -1647,6 +1653,7 @@ SAVC(secureToken); SAVC(secureTokenResponse); SAVC(type); SAVC(nonprivate); +SAVC(capsEx); static int SendConnectPacket(RTMP *r, RTMPPacket *cp) @@ -1752,6 +1759,14 @@ SendConnectPacket(RTMP *r, RTMPPacket *cp) if (!enc) return FALSE; } + + if (r->Link.protocol & RTMP_FEATURE_RECONNECT) + { + enc = AMF_EncodeNamedNumber(enc, pend, &av_capsEx, RTMP_CAPS_RECONNECT); + if (!enc) + return FALSE; + } + if (enc + 3 >= pend) return FALSE; *enc++ = 0; diff --git a/plugins/obs-outputs/librtmp/rtmp.h b/plugins/obs-outputs/librtmp/rtmp.h index c1b8d9061..29ca4431c 100644 --- a/plugins/obs-outputs/librtmp/rtmp.h +++ b/plugins/obs-outputs/librtmp/rtmp.h @@ -187,6 +187,7 @@ extern "C" #define RTMP_FEATURE_MFP 0x08 /* not yet supported */ #define RTMP_FEATURE_WRITE 0x10 /* publish, not play */ #define RTMP_FEATURE_HTTP2 0x20 /* server-side rtmpt */ +#define RTMP_FEATURE_RECONNECT 0x40 /* reconnect supported */ #define RTMP_PROTOCOL_UNDEFINED -1 #define RTMP_PROTOCOL_RTMP 0 @@ -197,6 +198,8 @@ extern "C" #define RTMP_PROTOCOL_RTMPTS (RTMP_FEATURE_HTTP|RTMP_FEATURE_SSL) #define RTMP_PROTOCOL_RTMFP RTMP_FEATURE_MFP +#define RTMP_CAPS_RECONNECT 1 + #define RTMP_DEFAULT_CHUNKSIZE 128 /* needs to fit largest number of bytes recv() may return */ @@ -516,6 +519,7 @@ extern "C" void RTMP_TLS_Free(RTMP *r); void RTMP_Free(RTMP *r); void RTMP_EnableWrite(RTMP *r); + void RTMP_EnableReconnect(RTMP *r); int RTMP_LibVersion(void); void RTMP_UserInterrupt(void); /* user typed Ctrl-C */ diff --git a/plugins/obs-outputs/rtmp-stream.c b/plugins/obs-outputs/rtmp-stream.c index da5980072..5c3276511 100644 --- a/plugins/obs-outputs/rtmp-stream.c +++ b/plugins/obs-outputs/rtmp-stream.c @@ -204,6 +204,8 @@ fail: static void rtmp_stream_stop(void *data, uint64_t ts) { struct rtmp_stream *stream = data; + stream->reconnect_requested = 0; + dstr_init(&stream->reconnect_path); // reconnect cleanup if (stopping(stream) && ts != 0) return; @@ -250,6 +252,32 @@ static inline bool get_next_packet(struct rtmp_stream *stream, return new_packet; } +static inline bool peek_next_packet(struct rtmp_stream *stream, + struct encoder_packet *packet) +{ + bool new_packet = false; + + pthread_mutex_lock(&stream->packets_mutex); + if (stream->packets.size) { + deque_peek_front(&stream->packets, packet, + sizeof(struct encoder_packet)); + new_packet = true; + } + pthread_mutex_unlock(&stream->packets_mutex); + + return new_packet; +} + +static inline void free_next_packet(struct rtmp_stream *stream) +{ + pthread_mutex_lock(&stream->packets_mutex); + if (stream->packets.size) { + deque_pop_front(&stream->packets, NULL, + sizeof(struct encoder_packet)); + } + pthread_mutex_unlock(&stream->packets_mutex); +} + static bool process_recv_data(struct rtmp_stream *stream, size_t size) { UNUSED_PARAMETER(size); @@ -268,7 +296,62 @@ static bool process_recv_data(struct rtmp_stream *stream, size_t size) } if (packet.m_body) { - /* do processing here */ + /* received RTMP commands handling */ + while (packet.m_nBodySize > + 11) { // fast "onStatus" check, speedup + if (packet.m_body[0] != AMF_STRING) + break; + int len = (packet.m_body[1] << 8) + packet.m_body[2]; + if (strncmp(&packet.m_body[3], "onStatus", len) != 0) + break; + + // it's ok, let's make a full but slower parsing + AMFObject onStatus; + int nRes = AMF_Decode(&onStatus, packet.m_body, + packet.m_nBodySize, + FALSE); // onStatus + if (nRes == -1) + break; + + AVal method; + AMFProp_GetString(AMF_GetProp(&onStatus, NULL, 0), + &method); + assert((method.av_len == 8) && + (strcmp(method.av_val, "onStatus") == 0)); + double transactionId = AMFProp_GetNumber( + AMF_GetProp(&onStatus, NULL, 1)); + (void)transactionId; + + // Info Object parameters + AMFObject info; + AMFProp_GetObject(AMF_GetProp(&onStatus, NULL, 3), + &info); + + // To reconnect the level MUST be set to “status”. + AVal level; + static const AVal av_level = {"level", + sizeof("level") - 1}; + AMFProp_GetString(AMF_GetProp(&info, &av_level, -1), + &level); + if (level.av_len != 6 || strcmp(level.av_val, "status")) + break; + + AVal tcUrl; + static const AVal av_tcUrl = {"tcUrl", + sizeof("tcUrl") - 1}; + AMFProp_GetString(AMF_GetProp(&info, &av_tcUrl, -1), + &tcUrl); + + if (tcUrl.av_len) // URL is present + dstr_copy(&stream->reconnect_path, + tcUrl.av_val); + else + dstr_copy(&stream->reconnect_path, ""); + + // mark a reconnect requested + stream->reconnect_requested = 1; + break; + } RTMPPacket_Free(&packet); } return true; @@ -417,6 +500,9 @@ static int send_packet(struct rtmp_stream *stream, if (handle_socket_read(stream)) return -1; + if (stream->reconnect_requested && packet->keyframe) + return -1; // reconnect + flv_packet_mux(packet, is_header ? 0 : stream->start_dts_offset, &data, &size, is_header); @@ -447,6 +533,9 @@ static int send_packet_ex(struct rtmp_stream *stream, if (handle_socket_read(stream)) return -1; + if (stream->reconnect_requested && packet->keyframe) + return -1; // reconnect on new keyframe + if (is_header) { flv_packet_start(packet, stream->video_codec[idx], &data, &size, idx); @@ -672,11 +761,12 @@ static void *send_thread(void *data) break; } - if (!get_next_packet(stream, &packet)) + if (!peek_next_packet(stream, &packet)) continue; if (stopping(stream)) { if (can_shutdown_stream(stream, &packet)) { + free_next_packet(stream); obs_encoder_packet_release(&packet); break; } @@ -685,6 +775,7 @@ static void *send_thread(void *data) if (!stream->sent_headers) { if (!send_headers(stream)) { os_atomic_set_bool(&stream->disconnected, true); + obs_encoder_packet_release(&packet); break; } } @@ -710,9 +801,12 @@ static void *send_thread(void *data) } if (sent < 0) { + obs_encoder_packet_release(&packet); + os_atomic_set_bool(&stream->disconnected, true); break; - } + } else + free_next_packet(stream); if (stream->dbr_enabled) { dbr_frame.send_end = os_gettime_ns(); @@ -768,7 +862,8 @@ static void *send_thread(void *data) obs_output_end_data_capture(stream->output); } - free_packets(stream); + if (stopping(stream)) + free_packets(stream); os_event_reset(stream->stop_event); os_atomic_set_bool(&stream->active, false); stream->sent_headers = false; @@ -1237,6 +1332,7 @@ static int try_connect(struct rtmp_stream *stream) return OBS_OUTPUT_BAD_PATH; RTMP_EnableWrite(&stream->rtmp); + RTMP_EnableReconnect(&stream->rtmp); dstr_copy(&stream->encoder_name, "FMLE/3.0 (compatible; FMSc/1.0)"); @@ -1319,9 +1415,16 @@ static bool init_connect(struct rtmp_stream *stream) stream->got_first_packet = false; settings = obs_output_get_settings(stream->output); + dstr_copy(&stream->path, - obs_service_get_connect_info( - service, OBS_SERVICE_CONNECT_INFO_SERVER_URL)); + dstr_is_empty(&stream->reconnect_path) + ? obs_service_get_connect_info( + service, + OBS_SERVICE_CONNECT_INFO_SERVER_URL) + : stream->reconnect_path.array); + stream->reconnect_requested = 0; + dstr_init(&stream->reconnect_path); + dstr_copy(&stream->key, obs_service_get_connect_info( service, OBS_SERVICE_CONNECT_INFO_STREAM_KEY)); diff --git a/plugins/obs-outputs/rtmp-stream.h b/plugins/obs-outputs/rtmp-stream.h index 3ef0cf9a4..39f8ee42f 100644 --- a/plugins/obs-outputs/rtmp-stream.h +++ b/plugins/obs-outputs/rtmp-stream.h @@ -84,6 +84,10 @@ struct rtmp_stream { struct dstr bind_ip; socklen_t addrlen_hint; /* hint IPv4 vs IPv6 */ + /* reconnect feature */ + volatile bool reconnect_requested; + struct dstr reconnect_path; + /* frame drop variables */ int64_t drop_threshold_usec; int64_t pframe_drop_threshold_usec;