0
0
mirror of https://github.com/obsproject/obs-studio.git synced 2024-09-19 20:32:15 +02:00

obs-outputs: Add eRTMP reconnect

This commit is contained in:
Yuriy Chumak 2024-08-12 19:11:10 +03:00 committed by Uri
parent 129d4f2f3f
commit cac6f62348
4 changed files with 132 additions and 6 deletions

View File

@ -487,6 +487,12 @@ RTMP_EnableWrite(RTMP *r)
r->Link.protocol |= RTMP_FEATURE_WRITE;
}
void
RTMP_EnableReconnect(RTMP *r)
{
r->Link.protocol |= RTMP_FEATURE_RECONNECT;
}
double
RTMP_GetDuration(RTMP *r)
{
@ -1647,6 +1653,7 @@ SAVC(secureToken);
SAVC(secureTokenResponse);
SAVC(type);
SAVC(nonprivate);
SAVC(capsEx);
static int
SendConnectPacket(RTMP *r, RTMPPacket *cp)
@ -1752,6 +1759,14 @@ SendConnectPacket(RTMP *r, RTMPPacket *cp)
if (!enc)
return FALSE;
}
if (r->Link.protocol & RTMP_FEATURE_RECONNECT)
{
enc = AMF_EncodeNamedNumber(enc, pend, &av_capsEx, RTMP_CAPS_RECONNECT);
if (!enc)
return FALSE;
}
if (enc + 3 >= pend)
return FALSE;
*enc++ = 0;

View File

@ -187,6 +187,7 @@ extern "C"
#define RTMP_FEATURE_MFP 0x08 /* not yet supported */
#define RTMP_FEATURE_WRITE 0x10 /* publish, not play */
#define RTMP_FEATURE_HTTP2 0x20 /* server-side rtmpt */
#define RTMP_FEATURE_RECONNECT 0x40 /* reconnect supported */
#define RTMP_PROTOCOL_UNDEFINED -1
#define RTMP_PROTOCOL_RTMP 0
@ -197,6 +198,8 @@ extern "C"
#define RTMP_PROTOCOL_RTMPTS (RTMP_FEATURE_HTTP|RTMP_FEATURE_SSL)
#define RTMP_PROTOCOL_RTMFP RTMP_FEATURE_MFP
#define RTMP_CAPS_RECONNECT 1
#define RTMP_DEFAULT_CHUNKSIZE 128
/* needs to fit largest number of bytes recv() may return */
@ -516,6 +519,7 @@ extern "C"
void RTMP_TLS_Free(RTMP *r);
void RTMP_Free(RTMP *r);
void RTMP_EnableWrite(RTMP *r);
void RTMP_EnableReconnect(RTMP *r);
int RTMP_LibVersion(void);
void RTMP_UserInterrupt(void); /* user typed Ctrl-C */

View File

@ -204,6 +204,8 @@ fail:
static void rtmp_stream_stop(void *data, uint64_t ts)
{
struct rtmp_stream *stream = data;
stream->reconnect_requested = 0;
dstr_init(&stream->reconnect_path); // reconnect cleanup
if (stopping(stream) && ts != 0)
return;
@ -250,6 +252,32 @@ static inline bool get_next_packet(struct rtmp_stream *stream,
return new_packet;
}
static inline bool peek_next_packet(struct rtmp_stream *stream,
struct encoder_packet *packet)
{
bool new_packet = false;
pthread_mutex_lock(&stream->packets_mutex);
if (stream->packets.size) {
deque_peek_front(&stream->packets, packet,
sizeof(struct encoder_packet));
new_packet = true;
}
pthread_mutex_unlock(&stream->packets_mutex);
return new_packet;
}
static inline void free_next_packet(struct rtmp_stream *stream)
{
pthread_mutex_lock(&stream->packets_mutex);
if (stream->packets.size) {
deque_pop_front(&stream->packets, NULL,
sizeof(struct encoder_packet));
}
pthread_mutex_unlock(&stream->packets_mutex);
}
static bool process_recv_data(struct rtmp_stream *stream, size_t size)
{
UNUSED_PARAMETER(size);
@ -268,7 +296,62 @@ static bool process_recv_data(struct rtmp_stream *stream, size_t size)
}
if (packet.m_body) {
/* do processing here */
/* received RTMP commands handling */
while (packet.m_nBodySize >
11) { // fast "onStatus" check, speedup
if (packet.m_body[0] != AMF_STRING)
break;
int len = (packet.m_body[1] << 8) + packet.m_body[2];
if (strncmp(&packet.m_body[3], "onStatus", len) != 0)
break;
// it's ok, let's make a full but slower parsing
AMFObject onStatus;
int nRes = AMF_Decode(&onStatus, packet.m_body,
packet.m_nBodySize,
FALSE); // onStatus
if (nRes == -1)
break;
AVal method;
AMFProp_GetString(AMF_GetProp(&onStatus, NULL, 0),
&method);
assert((method.av_len == 8) &&
(strcmp(method.av_val, "onStatus") == 0));
double transactionId = AMFProp_GetNumber(
AMF_GetProp(&onStatus, NULL, 1));
(void)transactionId;
// Info Object parameters
AMFObject info;
AMFProp_GetObject(AMF_GetProp(&onStatus, NULL, 3),
&info);
// To reconnect the level MUST be set to “status”.
AVal level;
static const AVal av_level = {"level",
sizeof("level") - 1};
AMFProp_GetString(AMF_GetProp(&info, &av_level, -1),
&level);
if (level.av_len != 6 || strcmp(level.av_val, "status"))
break;
AVal tcUrl;
static const AVal av_tcUrl = {"tcUrl",
sizeof("tcUrl") - 1};
AMFProp_GetString(AMF_GetProp(&info, &av_tcUrl, -1),
&tcUrl);
if (tcUrl.av_len) // URL is present
dstr_copy(&stream->reconnect_path,
tcUrl.av_val);
else
dstr_copy(&stream->reconnect_path, "");
// mark a reconnect requested
stream->reconnect_requested = 1;
break;
}
RTMPPacket_Free(&packet);
}
return true;
@ -417,6 +500,9 @@ static int send_packet(struct rtmp_stream *stream,
if (handle_socket_read(stream))
return -1;
if (stream->reconnect_requested && packet->keyframe)
return -1; // reconnect
flv_packet_mux(packet, is_header ? 0 : stream->start_dts_offset, &data,
&size, is_header);
@ -447,6 +533,9 @@ static int send_packet_ex(struct rtmp_stream *stream,
if (handle_socket_read(stream))
return -1;
if (stream->reconnect_requested && packet->keyframe)
return -1; // reconnect on new keyframe
if (is_header) {
flv_packet_start(packet, stream->video_codec[idx], &data, &size,
idx);
@ -672,11 +761,12 @@ static void *send_thread(void *data)
break;
}
if (!get_next_packet(stream, &packet))
if (!peek_next_packet(stream, &packet))
continue;
if (stopping(stream)) {
if (can_shutdown_stream(stream, &packet)) {
free_next_packet(stream);
obs_encoder_packet_release(&packet);
break;
}
@ -685,6 +775,7 @@ static void *send_thread(void *data)
if (!stream->sent_headers) {
if (!send_headers(stream)) {
os_atomic_set_bool(&stream->disconnected, true);
obs_encoder_packet_release(&packet);
break;
}
}
@ -710,9 +801,12 @@ static void *send_thread(void *data)
}
if (sent < 0) {
obs_encoder_packet_release(&packet);
os_atomic_set_bool(&stream->disconnected, true);
break;
}
} else
free_next_packet(stream);
if (stream->dbr_enabled) {
dbr_frame.send_end = os_gettime_ns();
@ -768,7 +862,8 @@ static void *send_thread(void *data)
obs_output_end_data_capture(stream->output);
}
free_packets(stream);
if (stopping(stream))
free_packets(stream);
os_event_reset(stream->stop_event);
os_atomic_set_bool(&stream->active, false);
stream->sent_headers = false;
@ -1237,6 +1332,7 @@ static int try_connect(struct rtmp_stream *stream)
return OBS_OUTPUT_BAD_PATH;
RTMP_EnableWrite(&stream->rtmp);
RTMP_EnableReconnect(&stream->rtmp);
dstr_copy(&stream->encoder_name, "FMLE/3.0 (compatible; FMSc/1.0)");
@ -1319,9 +1415,16 @@ static bool init_connect(struct rtmp_stream *stream)
stream->got_first_packet = false;
settings = obs_output_get_settings(stream->output);
dstr_copy(&stream->path,
obs_service_get_connect_info(
service, OBS_SERVICE_CONNECT_INFO_SERVER_URL));
dstr_is_empty(&stream->reconnect_path)
? obs_service_get_connect_info(
service,
OBS_SERVICE_CONNECT_INFO_SERVER_URL)
: stream->reconnect_path.array);
stream->reconnect_requested = 0;
dstr_init(&stream->reconnect_path);
dstr_copy(&stream->key,
obs_service_get_connect_info(
service, OBS_SERVICE_CONNECT_INFO_STREAM_KEY));

View File

@ -84,6 +84,10 @@ struct rtmp_stream {
struct dstr bind_ip;
socklen_t addrlen_hint; /* hint IPv4 vs IPv6 */
/* reconnect feature */
volatile bool reconnect_requested;
struct dstr reconnect_path;
/* frame drop variables */
int64_t drop_threshold_usec;
int64_t pframe_drop_threshold_usec;