diff options
| author | Tim Redfern <tim@eclectronics.org> | 2013-12-29 12:19:38 +0000 |
|---|---|---|
| committer | Tim Redfern <tim@eclectronics.org> | 2013-12-29 12:19:38 +0000 |
| commit | f7813a5324be39d13ab536c245d15dfc602a7849 (patch) | |
| tree | fad99148b88823d34a5df2f0a25881a002eb291b /ffmpeg/libavformat/rtmpproto.c | |
| parent | b7a5a477b8ff4d4e3028b9dfb9a9df0a41463f92 (diff) | |
basic type mechanism working
Diffstat (limited to 'ffmpeg/libavformat/rtmpproto.c')
| -rw-r--r-- | ffmpeg/libavformat/rtmpproto.c | 588 |
1 files changed, 368 insertions, 220 deletions
diff --git a/ffmpeg/libavformat/rtmpproto.c b/ffmpeg/libavformat/rtmpproto.c index d73e015..a4d7f0e 100644 --- a/ffmpeg/libavformat/rtmpproto.c +++ b/ffmpeg/libavformat/rtmpproto.c @@ -1,6 +1,6 @@ /* * RTMP network protocol - * Copyright (c) 2009 Kostya Shishkov + * Copyright (c) 2009 Konstantin Shishkov * * This file is part of FFmpeg. * @@ -48,13 +48,12 @@ #include <zlib.h> #endif -//#define DEBUG - #define APP_MAX_LENGTH 1024 #define PLAYPATH_MAX_LENGTH 256 #define TCURL_MAX_LENGTH 512 #define FLASHVER_MAX_LENGTH 64 #define RTMP_PKTDATA_DEFAULT_SIZE 4096 +#define RTMP_HEADER 11 /** RTMP protocol handler state */ typedef enum { @@ -62,8 +61,10 @@ typedef enum { STATE_HANDSHAKED, ///< client has performed handshake STATE_FCPUBLISH, ///< client FCPublishing stream (for output) STATE_PLAYING, ///< client has started receiving multimedia data from server + STATE_SEEKING, ///< client has started the seek operation. Back on STATE_PLAYING when the time comes STATE_PUBLISHING, ///< client has started sending multimedia data to server (for output) STATE_RECEIVING, ///< received a publish command (for input) + STATE_SENDING, ///< received a play command (for output) STATE_STOPPED, ///< the broadcast has been stopped } ClientState; @@ -76,7 +77,8 @@ typedef struct TrackedMethod { typedef struct RTMPContext { const AVClass *class; URLContext* stream; ///< TCP stream used in interactions with RTMP server - RTMPPacket prev_pkt[2][RTMP_CHANNELS]; ///< packet history used when reading and sending packets + RTMPPacket *prev_pkt[2]; ///< packet history used when reading and sending packets ([0] for reading, [1] for writing) + int nb_prev_pkt[2]; ///< number of elements in prev_pkt int in_chunk_size; ///< size of the chunks incoming RTMP packets are divided into int out_chunk_size; ///< size of the chunks outgoing RTMP packets are divided into int is_input; ///< input/output flag @@ -85,7 +87,7 @@ typedef struct RTMPContext { char *app; ///< name of application char *conn; ///< append arbitrary AMF data to the Connect message ClientState state; ///< current state - int main_channel_id; ///< an additional channel ID which is used for some invocations + int stream_id; ///< ID assigned by the server for the stream uint8_t* flv_data; ///< buffer with data for demuxer int flv_size; ///< current buffer size int flv_off; ///< number of bytes read from current buffer @@ -95,7 +97,7 @@ typedef struct RTMPContext { uint32_t bytes_read; ///< number of bytes read from server uint32_t last_bytes_read; ///< number of bytes read last reported to server int skip_bytes; ///< number of bytes to skip from the input FLV stream in the next write call - uint8_t flv_header[11]; ///< partial incoming flv packet header + uint8_t flv_header[RTMP_HEADER]; ///< partial incoming flv packet header int flv_header_bytes; ///< number of initialized bytes in flv_header int nb_invokes; ///< keeps track of invoke messages char* tcurl; ///< url of the target stream @@ -150,15 +152,16 @@ static const uint8_t rtmp_server_key[] = { static int add_tracked_method(RTMPContext *rt, const char *name, int id) { - void *ptr; + int err; if (rt->nb_tracked_methods + 1 > rt->tracked_methods_size) { rt->tracked_methods_size = (rt->nb_tracked_methods + 1) * 2; - ptr = av_realloc(rt->tracked_methods, - rt->tracked_methods_size * sizeof(*rt->tracked_methods)); - if (!ptr) - return AVERROR(ENOMEM); - rt->tracked_methods = ptr; + if ((err = av_reallocp(&rt->tracked_methods, rt->tracked_methods_size * + sizeof(*rt->tracked_methods))) < 0) { + rt->nb_tracked_methods = 0; + rt->tracked_methods_size = 0; + return err; + } } rt->tracked_methods[rt->nb_tracked_methods].name = av_strdup(name); @@ -186,7 +189,7 @@ static int find_tracked_method(URLContext *s, RTMPPacket *pkt, int offset, int ret; int i; - bytestream2_init(&gbc, pkt->data + offset, pkt->data_size - offset); + bytestream2_init(&gbc, pkt->data + offset, pkt->size - offset); if ((ret = ff_amf_read_number(&gbc, &pkt_id)) < 0) return ret; @@ -224,7 +227,7 @@ static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track) double pkt_id; int len; - bytestream2_init(&gbc, pkt->data, pkt->data_size); + bytestream2_init(&gbc, pkt->data, pkt->size); if ((ret = ff_amf_read_string(&gbc, name, sizeof(name), &len)) < 0) goto fail; @@ -236,7 +239,7 @@ static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track) } ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); fail: ff_rtmp_packet_destroy(pkt); return ret; @@ -385,7 +388,7 @@ static int gen_connect(URLContext *s, RTMPContext *rt) } } - pkt.data_size = p - pkt.data; + pkt.size = p - pkt.data; return rtmp_send_packet(rt, &pkt, 1); } @@ -403,10 +406,10 @@ static int read_connect(URLContext *s, RTMPContext *rt) GetByteContext gbc; if ((ret = ff_rtmp_packet_read(rt->stream, &pkt, rt->in_chunk_size, - rt->prev_pkt[1])) < 0) + &rt->prev_pkt[0], &rt->nb_prev_pkt[0])) < 0) return ret; cp = pkt.data; - bytestream2_init(&gbc, cp, pkt.data_size); + bytestream2_init(&gbc, cp, pkt.size); if (ff_amf_read_string(&gbc, command, sizeof(command), &stringlen)) { av_log(s, AV_LOG_ERROR, "Unable to read command string\n"); ff_rtmp_packet_destroy(&pkt); @@ -437,9 +440,9 @@ static int read_connect(URLContext *s, RTMPContext *rt) return ret; p = pkt.data; bytestream_put_be32(&p, rt->server_bw); - pkt.data_size = p - pkt.data; + pkt.size = p - pkt.data; ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); if (ret < 0) return ret; @@ -450,9 +453,9 @@ static int read_connect(URLContext *s, RTMPContext *rt) p = pkt.data; bytestream_put_be32(&p, rt->server_bw); bytestream_put_byte(&p, 2); // dynamic - pkt.data_size = p - pkt.data; + pkt.size = p - pkt.data; ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); if (ret < 0) return ret; @@ -466,7 +469,7 @@ static int read_connect(URLContext *s, RTMPContext *rt) bytestream_put_be16(&p, 0); // 0 -> Stream Begin bytestream_put_be32(&p, 0); ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); if (ret < 0) return ret; @@ -479,7 +482,7 @@ static int read_connect(URLContext *s, RTMPContext *rt) p = pkt.data; bytestream_put_be32(&p, rt->out_chunk_size); ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); if (ret < 0) return ret; @@ -512,9 +515,9 @@ static int read_connect(URLContext *s, RTMPContext *rt) ff_amf_write_number(&p, 0); ff_amf_write_object_end(&p); - pkt.data_size = p - pkt.data; + pkt.size = p - pkt.data; ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); if (ret < 0) return ret; @@ -527,9 +530,9 @@ static int read_connect(URLContext *s, RTMPContext *rt) ff_amf_write_number(&p, 0); ff_amf_write_null(&p); ff_amf_write_number(&p, 8192); - pkt.data_size = p - pkt.data; + pkt.size = p - pkt.data; ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&pkt); return ret; @@ -652,7 +655,7 @@ static int gen_delete_stream(URLContext *s, RTMPContext *rt) ff_amf_write_string(&p, "deleteStream"); ff_amf_write_number(&p, ++rt->nb_invokes); ff_amf_write_null(&p); - ff_amf_write_number(&p, rt->main_channel_id); + ff_amf_write_number(&p, rt->stream_id); return rtmp_send_packet(rt, &pkt, 0); } @@ -672,7 +675,7 @@ static int gen_buffer_time(URLContext *s, RTMPContext *rt) p = pkt.data; bytestream_put_be16(&p, 3); - bytestream_put_be32(&p, rt->main_channel_id); + bytestream_put_be32(&p, rt->stream_id); bytestream_put_be32(&p, rt->client_buffer_time); return rtmp_send_packet(rt, &pkt, 0); @@ -690,18 +693,41 @@ static int gen_play(URLContext *s, RTMPContext *rt) av_log(s, AV_LOG_DEBUG, "Sending play command for '%s'\n", rt->playpath); - if ((ret = ff_rtmp_packet_create(&pkt, RTMP_VIDEO_CHANNEL, RTMP_PT_INVOKE, + if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SOURCE_CHANNEL, RTMP_PT_INVOKE, 0, 29 + strlen(rt->playpath))) < 0) return ret; - pkt.extra = rt->main_channel_id; + pkt.extra = rt->stream_id; p = pkt.data; ff_amf_write_string(&p, "play"); ff_amf_write_number(&p, ++rt->nb_invokes); ff_amf_write_null(&p); ff_amf_write_string(&p, rt->playpath); - ff_amf_write_number(&p, rt->live); + ff_amf_write_number(&p, rt->live * 1000); + + return rtmp_send_packet(rt, &pkt, 1); +} + +static int gen_seek(URLContext *s, RTMPContext *rt, int64_t timestamp) +{ + RTMPPacket pkt; + uint8_t *p; + int ret; + + av_log(s, AV_LOG_DEBUG, "Sending seek command for timestamp %"PRId64"\n", + timestamp); + + if ((ret = ff_rtmp_packet_create(&pkt, 3, RTMP_PT_INVOKE, 0, 26)) < 0) + return ret; + + pkt.extra = rt->stream_id; + + p = pkt.data; + ff_amf_write_string(&p, "seek"); + ff_amf_write_number(&p, 0); //no tracking back responses + ff_amf_write_null(&p); //as usual, the first null param + ff_amf_write_number(&p, timestamp); //where we want to jump return rtmp_send_packet(rt, &pkt, 1); } @@ -721,7 +747,7 @@ static int gen_publish(URLContext *s, RTMPContext *rt) 0, 30 + strlen(rt->playpath))) < 0) return ret; - pkt.extra = rt->main_channel_id; + pkt.extra = rt->stream_id; p = pkt.data; ff_amf_write_string(&p, "publish"); @@ -742,9 +768,9 @@ static int gen_pong(URLContext *s, RTMPContext *rt, RTMPPacket *ppkt) uint8_t *p; int ret; - if (ppkt->data_size < 6) { + if (ppkt->size < 6) { av_log(s, AV_LOG_ERROR, "Too short ping packet (%d)\n", - ppkt->data_size); + ppkt->size); return AVERROR_INVALIDDATA; } @@ -1323,7 +1349,7 @@ static int rtmp_send_hs_packet(RTMPContext* rt, uint32_t first_int, int inoutsize; AV_WB32(arraydata, first_int); - AV_WB32(arraydata + 4, first_int); + AV_WB32(arraydata + 4, second_int); inoutsize = ffurl_write(rt->stream, arraydata, RTMP_HANDSHAKE_PACKET_SIZE); if (inoutsize != RTMP_HANDSHAKE_PACKET_SIZE) { @@ -1372,8 +1398,6 @@ static int rtmp_server_handshake(URLContext *s, RTMPContext *rt) av_log(s, AV_LOG_ERROR, "RTMP Handshake C1 Error\n"); return ret; } - if (zeroes) - av_log(s, AV_LOG_WARNING, "Erroneous C1 Message zero != 0\n"); /* Send S1 */ /* By now same epoch will be sent */ hs_my_epoch = hs_epoch; @@ -1418,10 +1442,10 @@ static int handle_chunk_size(URLContext *s, RTMPPacket *pkt) RTMPContext *rt = s->priv_data; int ret; - if (pkt->data_size < 4) { + if (pkt->size < 4) { av_log(s, AV_LOG_ERROR, "Too short chunk size change packet (%d)\n", - pkt->data_size); + pkt->size); return AVERROR_INVALIDDATA; } @@ -1429,7 +1453,7 @@ static int handle_chunk_size(URLContext *s, RTMPPacket *pkt) /* Send the same chunk size change packet back to the server, * setting the outgoing chunk size to the same as the incoming one. */ if ((ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size, - rt->prev_pkt[1])) < 0) + &rt->prev_pkt[1], &rt->nb_prev_pkt[1])) < 0) return ret; rt->out_chunk_size = AV_RB32(pkt->data); } @@ -1451,9 +1475,9 @@ static int handle_ping(URLContext *s, RTMPPacket *pkt) RTMPContext *rt = s->priv_data; int t, ret; - if (pkt->data_size < 2) { + if (pkt->size < 2) { av_log(s, AV_LOG_ERROR, "Too short ping packet (%d)\n", - pkt->data_size); + pkt->size); return AVERROR_INVALIDDATA; } @@ -1477,10 +1501,10 @@ static int handle_client_bw(URLContext *s, RTMPPacket *pkt) { RTMPContext *rt = s->priv_data; - if (pkt->data_size < 4) { + if (pkt->size < 4) { av_log(s, AV_LOG_ERROR, "Client bandwidth report packet is less than 4 bytes long (%d)\n", - pkt->data_size); + pkt->size); return AVERROR_INVALIDDATA; } @@ -1501,10 +1525,10 @@ static int handle_server_bw(URLContext *s, RTMPPacket *pkt) { RTMPContext *rt = s->priv_data; - if (pkt->data_size < 4) { + if (pkt->size < 4) { av_log(s, AV_LOG_ERROR, "Too short server bandwidth report packet (%d)\n", - pkt->data_size); + pkt->size); return AVERROR_INVALIDDATA; } @@ -1587,6 +1611,8 @@ static int do_llnw_auth(RTMPContext *rt, const char *user, const char *nonce) av_md5_update(md5, method, strlen(method)); av_md5_update(md5, ":/", 2); av_md5_update(md5, rt->app, strlen(rt->app)); + if (!strchr(rt->app, '/')) + av_md5_update(md5, "/_definst_", strlen("/_definst_")); av_md5_final(md5, hash); ff_data_to_hex(hashstr2, hash, 16, 1); hashstr2[32] = '\0'; @@ -1704,7 +1730,7 @@ static int handle_connect_error(URLContext *s, const char *desc) static int handle_invoke_error(URLContext *s, RTMPPacket *pkt) { RTMPContext *rt = s->priv_data; - const uint8_t *data_end = pkt->data + pkt->data_size; + const uint8_t *data_end = pkt->data + pkt->size; char *tracked_method = NULL; int level = AV_LOG_ERROR; uint8_t tmpstr[256]; @@ -1737,13 +1763,84 @@ static int handle_invoke_error(URLContext *s, RTMPPacket *pkt) return ret; } +static int write_begin(URLContext *s) +{ + RTMPContext *rt = s->priv_data; + PutByteContext pbc; + RTMPPacket spkt = { 0 }; + int ret; + + // Send Stream Begin 1 + if ((ret = ff_rtmp_packet_create(&spkt, RTMP_NETWORK_CHANNEL, + RTMP_PT_PING, 0, 6)) < 0) { + av_log(s, AV_LOG_ERROR, "Unable to create response packet\n"); + return ret; + } + + bytestream2_init_writer(&pbc, spkt.data, spkt.size); + bytestream2_put_be16(&pbc, 0); // 0 -> Stream Begin + bytestream2_put_be32(&pbc, rt->nb_streamid); + + ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size, + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); + + ff_rtmp_packet_destroy(&spkt); + + return ret; +} + +static int write_status(URLContext *s, RTMPPacket *pkt, + const char *status, const char *filename) +{ + RTMPContext *rt = s->priv_data; + RTMPPacket spkt = { 0 }; + char statusmsg[128]; + uint8_t *pp; + int ret; + + if ((ret = ff_rtmp_packet_create(&spkt, RTMP_SYSTEM_CHANNEL, + RTMP_PT_INVOKE, 0, + RTMP_PKTDATA_DEFAULT_SIZE)) < 0) { + av_log(s, AV_LOG_ERROR, "Unable to create response packet\n"); + return ret; + } + + pp = spkt.data; + spkt.extra = pkt->extra; + ff_amf_write_string(&pp, "onStatus"); + ff_amf_write_number(&pp, 0); + ff_amf_write_null(&pp); + + ff_amf_write_object_start(&pp); + ff_amf_write_field_name(&pp, "level"); + ff_amf_write_string(&pp, "status"); + ff_amf_write_field_name(&pp, "code"); + ff_amf_write_string(&pp, status); + ff_amf_write_field_name(&pp, "description"); + snprintf(statusmsg, sizeof(statusmsg), + "%s is now published", filename); + ff_amf_write_string(&pp, statusmsg); + ff_amf_write_field_name(&pp, "details"); + ff_amf_write_string(&pp, filename); + ff_amf_write_field_name(&pp, "clientid"); + snprintf(statusmsg, sizeof(statusmsg), "%s", LIBAVFORMAT_IDENT); + ff_amf_write_string(&pp, statusmsg); + ff_amf_write_object_end(&pp); + + spkt.size = pp - spkt.data; + ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size, + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); + ff_rtmp_packet_destroy(&spkt); + + return ret; +} + static int send_invoke_response(URLContext *s, RTMPPacket *pkt) { RTMPContext *rt = s->priv_data; double seqnum; char filename[64]; char command[64]; - char statusmsg[128]; int stringlen; char *pchar; const uint8_t *p = pkt->data; @@ -1752,7 +1849,7 @@ static int send_invoke_response(URLContext *s, RTMPPacket *pkt) GetByteContext gbc; int ret; - bytestream2_init(&gbc, p, pkt->data_size); + bytestream2_init(&gbc, p, pkt->size); if (ff_amf_read_string(&gbc, command, sizeof(command), &stringlen)) { av_log(s, AV_LOG_ERROR, "Error in PT_INVOKE\n"); @@ -1796,52 +1893,20 @@ static int send_invoke_response(URLContext *s, RTMPPacket *pkt) pp = spkt.data; ff_amf_write_string(&pp, "onFCPublish"); } else if (!strcmp(command, "publish")) { - PutByteContext pbc; - // Send Stream Begin 1 - if ((ret = ff_rtmp_packet_create(&spkt, RTMP_NETWORK_CHANNEL, - RTMP_PT_PING, 0, 6)) < 0) { - av_log(s, AV_LOG_ERROR, "Unable to create response packet\n"); - return ret; - } - pp = spkt.data; - bytestream2_init_writer(&pbc, pp, spkt.data_size); - bytestream2_put_be16(&pbc, 0); // 0 -> Stream Begin - bytestream2_put_be32(&pbc, rt->nb_streamid); - ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size, - rt->prev_pkt[1]); - ff_rtmp_packet_destroy(&spkt); + ret = write_begin(s); if (ret < 0) return ret; // Send onStatus(NetStream.Publish.Start) - if ((ret = ff_rtmp_packet_create(&spkt, RTMP_SYSTEM_CHANNEL, - RTMP_PT_INVOKE, 0, - RTMP_PKTDATA_DEFAULT_SIZE)) < 0) { - av_log(s, AV_LOG_ERROR, "Unable to create response packet\n"); + return write_status(s, pkt, "NetStream.Publish.Start", + filename); + } else if (!strcmp(command, "play")) { + ret = write_begin(s); + if (ret < 0) return ret; - } - spkt.extra = pkt->extra; - pp = spkt.data; - ff_amf_write_string(&pp, "onStatus"); - ff_amf_write_number(&pp, 0); - ff_amf_write_null(&pp); - - ff_amf_write_object_start(&pp); - ff_amf_write_field_name(&pp, "level"); - ff_amf_write_string(&pp, "status"); - ff_amf_write_field_name(&pp, "code"); - ff_amf_write_string(&pp, "NetStream.Publish.Start"); - ff_amf_write_field_name(&pp, "description"); - snprintf(statusmsg, sizeof(statusmsg), - "%s is now published", filename); - ff_amf_write_string(&pp, statusmsg); - ff_amf_write_field_name(&pp, "details"); - ff_amf_write_string(&pp, filename); - ff_amf_write_field_name(&pp, "clientid"); - snprintf(statusmsg, sizeof(statusmsg), "%s", LIBAVFORMAT_IDENT); - ff_amf_write_string(&pp, statusmsg); - ff_amf_write_object_end(&pp); - + rt->state = STATE_SENDING; + return write_status(s, pkt, "NetStream.Play.Start", + filename); } else { if ((ret = ff_rtmp_packet_create(&spkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE, 0, @@ -1863,9 +1928,9 @@ static int send_invoke_response(URLContext *s, RTMPPacket *pkt) * if a client creates more than 2^32 - 2 streams. */ } } - spkt.data_size = pp - spkt.data; + spkt.size = pp - spkt.data; ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size, - rt->prev_pkt[1]); + &rt->prev_pkt[1], &rt->nb_prev_pkt[1]); ff_rtmp_packet_destroy(&spkt); return ret; } @@ -1884,7 +1949,7 @@ static int handle_invoke_result(URLContext *s, RTMPPacket *pkt) return ret; } - if (!memcmp(tracked_method, "connect", 7)) { + if (!strcmp(tracked_method, "connect")) { if (!rt->is_input) { if ((ret = gen_release_stream(s, rt)) < 0) goto fail; @@ -1910,12 +1975,12 @@ static int handle_invoke_result(URLContext *s, RTMPPacket *pkt) goto fail; } } - } else if (!memcmp(tracked_method, "createStream", 12)) { + } else if (!strcmp(tracked_method, "createStream")) { //extract a number from the result if (pkt->data[10] || pkt->data[19] != 5 || pkt->data[20]) { av_log(s, AV_LOG_WARNING, "Unexpected reply on connect()\n"); } else { - rt->main_channel_id = av_int2double(AV_RB64(pkt->data + 21)); + rt->stream_id = av_int2double(AV_RB64(pkt->data + 21)); } if (!rt->is_input) { @@ -1937,8 +2002,8 @@ fail: static int handle_invoke_status(URLContext *s, RTMPPacket *pkt) { RTMPContext *rt = s->priv_data; - const uint8_t *data_end = pkt->data + pkt->data_size; - const uint8_t *ptr = pkt->data + 11; + const uint8_t *data_end = pkt->data + pkt->size; + const uint8_t *ptr = pkt->data + RTMP_HEADER; uint8_t tmpstr[256]; int i, t; @@ -1951,8 +2016,12 @@ static int handle_invoke_status(URLContext *s, RTMPPacket *pkt) t = ff_amf_get_field_value(ptr, data_end, "level", tmpstr, sizeof(tmpstr)); if (!t && !strcmp(tmpstr, "error")) { - if (!ff_amf_get_field_value(ptr, data_end, - "description", tmpstr, sizeof(tmpstr))) + t = ff_amf_get_field_value(ptr, data_end, + "description", tmpstr, sizeof(tmpstr)); + if (t || !tmpstr[0]) + t = ff_amf_get_field_value(ptr, data_end, "code", + tmpstr, sizeof(tmpstr)); + if (!t) av_log(s, AV_LOG_ERROR, "Server error: %s\n", tmpstr); return -1; } @@ -1962,6 +2031,7 @@ static int handle_invoke_status(URLContext *s, RTMPPacket *pkt) if (!t && !strcmp(tmpstr, "NetStream.Play.Stop")) rt->state = STATE_STOPPED; if (!t && !strcmp(tmpstr, "NetStream.Play.UnpublishNotify")) rt->state = STATE_STOPPED; if (!t && !strcmp(tmpstr, "NetStream.Publish.Start")) rt->state = STATE_PUBLISHING; + if (!t && !strcmp(tmpstr, "NetStream.Seek.Notify")) rt->state = STATE_PLAYING; return 0; } @@ -1972,23 +2042,24 @@ static int handle_invoke(URLContext *s, RTMPPacket *pkt) int ret = 0; //TODO: check for the messages sent for wrong state? - if (!memcmp(pkt->data, "\002\000\006_error", 9)) { + if (ff_amf_match_string(pkt->data, pkt->size, "_error")) { if ((ret = handle_invoke_error(s, pkt)) < 0) return ret; - } else if (!memcmp(pkt->data, "\002\000\007_result", 10)) { + } else if (ff_amf_match_string(pkt->data, pkt->size, "_result")) { if ((ret = handle_invoke_result(s, pkt)) < 0) return ret; - } else if (!memcmp(pkt->data, "\002\000\010onStatus", 11)) { + } else if (ff_amf_match_string(pkt->data, pkt->size, "onStatus")) { if ((ret = handle_invoke_status(s, pkt)) < 0) return ret; - } else if (!memcmp(pkt->data, "\002\000\010onBWDone", 11)) { + } else if (ff_amf_match_string(pkt->data, pkt->size, "onBWDone")) { if ((ret = gen_check_bw(s, rt)) < 0) return ret; - } else if (!memcmp(pkt->data, "\002\000\015releaseStream", 16) || - !memcmp(pkt->data, "\002\000\011FCPublish", 12) || - !memcmp(pkt->data, "\002\000\007publish", 10) || - !memcmp(pkt->data, "\002\000\010_checkbw", 11) || - !memcmp(pkt->data, "\002\000\014createStream", 15)) { + } else if (ff_amf_match_string(pkt->data, pkt->size, "releaseStream") || + ff_amf_match_string(pkt->data, pkt->size, "FCPublish") || + ff_amf_match_string(pkt->data, pkt->size, "publish") || + ff_amf_match_string(pkt->data, pkt->size, "play") || + ff_amf_match_string(pkt->data, pkt->size, "_checkbw") || + ff_amf_match_string(pkt->data, pkt->size, "createStream")) { if ((ret = send_invoke_response(s, pkt)) < 0) return ret; } @@ -1996,65 +2067,75 @@ static int handle_invoke(URLContext *s, RTMPPacket *pkt) return ret; } -static int handle_notify(URLContext *s, RTMPPacket *pkt) { +static int update_offset(RTMPContext *rt, int size) +{ + int old_flv_size; + + // generate packet header and put data into buffer for FLV demuxer + if (rt->flv_off < rt->flv_size) { + // There is old unread data in the buffer, thus append at the end + old_flv_size = rt->flv_size; + rt->flv_size += size; + } else { + // All data has been read, write the new data at the start of the buffer + old_flv_size = 0; + rt->flv_size = size; + rt->flv_off = 0; + } + + return old_flv_size; +} + +static int append_flv_data(RTMPContext *rt, RTMPPacket *pkt, int skip) +{ + int old_flv_size, ret; + PutByteContext pbc; + const uint8_t *data = pkt->data + skip; + const int size = pkt->size - skip; + uint32_t ts = pkt->timestamp; + + old_flv_size = update_offset(rt, size + 15); + + if ((ret = av_reallocp(&rt->flv_data, rt->flv_size)) < 0) { + rt->flv_size = rt->flv_off = 0; + return ret; + } + bytestream2_init_writer(&pbc, rt->flv_data, rt->flv_size); + bytestream2_skip_p(&pbc, old_flv_size); + bytestream2_put_byte(&pbc, pkt->type); + bytestream2_put_be24(&pbc, size); + bytestream2_put_be24(&pbc, ts); + bytestream2_put_byte(&pbc, ts >> 24); + bytestream2_put_be24(&pbc, 0); + bytestream2_put_buffer(&pbc, data, size); + bytestream2_put_be32(&pbc, 0); + + return 0; +} + +static int handle_notify(URLContext *s, RTMPPacket *pkt) +{ RTMPContext *rt = s->priv_data; - const uint8_t *p = NULL; - uint8_t *cp = NULL; uint8_t commandbuffer[64]; char statusmsg[128]; - int stringlen; + int stringlen, ret, skip = 0; GetByteContext gbc; - PutByteContext pbc; - uint32_t ts; - int old_flv_size; - const uint8_t *datatowrite; - unsigned datatowritelength; - p = pkt->data; - bytestream2_init(&gbc, p, pkt->data_size); + bytestream2_init(&gbc, pkt->data, pkt->size); if (ff_amf_read_string(&gbc, commandbuffer, sizeof(commandbuffer), &stringlen)) return AVERROR_INVALIDDATA; + + // Skip the @setDataFrame string and validate it is a notification if (!strcmp(commandbuffer, "@setDataFrame")) { - datatowrite = gbc.buffer; - datatowritelength = bytestream2_get_bytes_left(&gbc); - if (ff_amf_read_string(&gbc, statusmsg, - sizeof(statusmsg), &stringlen)) + skip = gbc.buffer - pkt->data; + ret = ff_amf_read_string(&gbc, statusmsg, + sizeof(statusmsg), &stringlen); + if (ret < 0) return AVERROR_INVALIDDATA; - if (strcmp(statusmsg, "onMetaData")) { - av_log(s, AV_LOG_INFO, "Expecting onMetadata but got %s\n", - statusmsg); - return 0; - } - - /* Provide ECMAArray to flv */ - ts = pkt->timestamp; - - // generate packet header and put data into buffer for FLV demuxer - if (rt->flv_off < rt->flv_size) { - old_flv_size = rt->flv_size; - rt->flv_size += datatowritelength + 15; - } else { - old_flv_size = 0; - rt->flv_size = datatowritelength + 15; - rt->flv_off = 0; - } - - cp = av_realloc(rt->flv_data, rt->flv_size); - if (!cp) - return AVERROR(ENOMEM); - rt->flv_data = cp; - bytestream2_init_writer(&pbc, cp, rt->flv_size); - bytestream2_skip_p(&pbc, old_flv_size); - bytestream2_put_byte(&pbc, pkt->type); - bytestream2_put_be24(&pbc, datatowritelength); - bytestream2_put_be24(&pbc, ts); - bytestream2_put_byte(&pbc, ts >> 24); - bytestream2_put_be24(&pbc, 0); - bytestream2_put_buffer(&pbc, datatowrite, datatowritelength); - bytestream2_put_be32(&pbc, 0); } - return 0; + + return append_flv_data(rt, pkt, skip); } /** @@ -2108,6 +2189,55 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt) return 0; } +static int handle_metadata(RTMPContext *rt, RTMPPacket *pkt) +{ + int ret, old_flv_size, type; + const uint8_t *next; + uint8_t *p; + uint32_t size; + uint32_t ts, cts, pts = 0; + + old_flv_size = update_offset(rt, pkt->size); + + if ((ret = av_reallocp(&rt->flv_data, rt->flv_size)) < 0) { + rt->flv_size = rt->flv_off = 0; + return ret; + } + + next = pkt->data; + p = rt->flv_data + old_flv_size; + + /* copy data while rewriting timestamps */ + ts = pkt->timestamp; + + while (next - pkt->data < pkt->size - RTMP_HEADER) { + type = bytestream_get_byte(&next); + size = bytestream_get_be24(&next); + cts = bytestream_get_be24(&next); + cts |= bytestream_get_byte(&next) << 24; + if (!pts) + pts = cts; + ts += cts - pts; + pts = cts; + if (size + 3 + 4 > pkt->data + pkt->size - next) + break; + bytestream_put_byte(&p, type); + bytestream_put_be24(&p, size); + bytestream_put_be24(&p, ts); + bytestream_put_byte(&p, ts >> 24); + memcpy(p, next, size + 3 + 4); + next += size + 3 + 4; + p += size + 3 + 4; + } + if (p != rt->flv_data + rt->flv_size) { + av_log(NULL, AV_LOG_WARNING, "Incomplete flv packets in " + "RTMP_PT_METADATA packet\n"); + rt->flv_size = p - rt->flv_data; + } + + return 0; +} + /** * Interact with the server by receiving and sending RTMP packets until * there is some significant data (media data or expected status notification). @@ -2123,10 +2253,6 @@ static int get_packet(URLContext *s, int for_header) { RTMPContext *rt = s->priv_data; int ret; - uint8_t *p; - const uint8_t *next; - uint32_t data_size; - uint32_t ts, cts, pts=0; if (rt->state == STATE_STOPPED) return AVERROR_EOF; @@ -2134,7 +2260,8 @@ static int get_packet(URLContext *s, int for_header) for (;;) { RTMPPacket rpkt = { 0 }; if ((ret = ff_rtmp_packet_read(rt->stream, &rpkt, - rt->in_chunk_size, rt->prev_pkt[0])) <= 0) { + rt->in_chunk_size, &rt->prev_pkt[0], + &rt->nb_prev_pkt[0])) <= 0) { if (ret == 0) { return AVERROR(EAGAIN); } else { @@ -2150,6 +2277,17 @@ static int get_packet(URLContext *s, int for_header) } ret = rtmp_parse_result(s, rt, &rpkt); + + // At this point we must check if we are in the seek state and continue + // with the next packet. handle_invoke will get us out of this state + // when the right message is encountered + if (rt->state == STATE_SEEKING) { + ff_rtmp_packet_destroy(&rpkt); + // We continue, let the natural flow of things happen: + // AVERROR(EAGAIN) or handle_invoke gets us out of here + continue; + } + if (ret < 0) {//serious error in current packet ff_rtmp_packet_destroy(&rpkt); return ret; @@ -2164,62 +2302,25 @@ static int get_packet(URLContext *s, int for_header) } if (for_header && (rt->state == STATE_PLAYING || rt->state == STATE_PUBLISHING || + rt->state == STATE_SENDING || rt->state == STATE_RECEIVING)) { ff_rtmp_packet_destroy(&rpkt); return 0; } - if (!rpkt.data_size || !rt->is_input) { + if (!rpkt.size || !rt->is_input) { ff_rtmp_packet_destroy(&rpkt); continue; } - if (rpkt.type == RTMP_PT_VIDEO || rpkt.type == RTMP_PT_AUDIO || - (rpkt.type == RTMP_PT_NOTIFY && !memcmp("\002\000\012onMetaData", rpkt.data, 13))) { - ts = rpkt.timestamp; - - // generate packet header and put data into buffer for FLV demuxer - rt->flv_off = 0; - rt->flv_size = rpkt.data_size + 15; - rt->flv_data = p = av_realloc(rt->flv_data, rt->flv_size); - bytestream_put_byte(&p, rpkt.type); - bytestream_put_be24(&p, rpkt.data_size); - bytestream_put_be24(&p, ts); - bytestream_put_byte(&p, ts >> 24); - bytestream_put_be24(&p, 0); - bytestream_put_buffer(&p, rpkt.data, rpkt.data_size); - bytestream_put_be32(&p, 0); + if (rpkt.type == RTMP_PT_VIDEO || rpkt.type == RTMP_PT_AUDIO) { + ret = append_flv_data(rt, &rpkt, 0); ff_rtmp_packet_destroy(&rpkt); - return 0; + return ret; } else if (rpkt.type == RTMP_PT_NOTIFY) { ret = handle_notify(s, &rpkt); ff_rtmp_packet_destroy(&rpkt); - if (ret) { - av_log(s, AV_LOG_ERROR, "Handle notify error\n"); - return ret; - } - return 0; + return ret; } else if (rpkt.type == RTMP_PT_METADATA) { - // we got raw FLV data, make it available for FLV demuxer - rt->flv_off = 0; - rt->flv_size = rpkt.data_size; - rt->flv_data = av_realloc(rt->flv_data, rt->flv_size); - /* rewrite timestamps */ - next = rpkt.data; - ts = rpkt.timestamp; - while (next - rpkt.data < rpkt.data_size - 11) { - next++; - data_size = bytestream_get_be24(&next); - p=next; - cts = bytestream_get_be24(&next); - cts |= bytestream_get_byte(&next) << 24; - if (pts==0) - pts=cts; - ts += cts - pts; - pts = cts; - bytestream_put_be24(&p, ts); - bytestream_put_byte(&p, ts >> 24); - next += data_size + 3 + 4; - } - memcpy(rt->flv_data, rpkt.data, rpkt.data_size); + ret = handle_metadata(rt, &rpkt); ff_rtmp_packet_destroy(&rpkt); return 0; } @@ -2230,17 +2331,22 @@ static int get_packet(URLContext *s, int for_header) static int rtmp_close(URLContext *h) { RTMPContext *rt = h->priv_data; - int ret = 0; + int ret = 0, i, j; if (!rt->is_input) { rt->flv_data = NULL; - if (rt->out_pkt.data_size) + if (rt->out_pkt.size) ff_rtmp_packet_destroy(&rt->out_pkt); if (rt->state > STATE_FCPUBLISH) ret = gen_fcunpublish_stream(h, rt); } if (rt->state > STATE_HANDSHAKED) ret = gen_delete_stream(h, rt); + for (i = 0; i < 2; i++) { + for (j = 0; j < rt->nb_prev_pkt[i]; j++) + ff_rtmp_packet_destroy(&rt->prev_pkt[i][j]); + av_freep(&rt->prev_pkt[i]); + } free_tracked_methods(rt); av_freep(&rt->flv_data); @@ -2276,6 +2382,13 @@ static int rtmp_open(URLContext *s, const char *uri, int flags) hostname, sizeof(hostname), &port, path, sizeof(path), s->filename); + if (strchr(path, ' ')) { + av_log(s, AV_LOG_WARNING, + "Detected librtmp style URL parameters, these aren't supported " + "by the libavformat internal RTMP handler currently enabled. " + "See the documentation for the correct way to pass parameters.\n"); + } + if (auth[0]) { char *ptr = strchr(auth, ':'); if (ptr) { @@ -2445,29 +2558,33 @@ reconnect: } else { if (read_connect(s, s->priv_data) < 0) goto fail; - rt->is_input = 1; } do { ret = get_packet(s, 1); - } while (ret == EAGAIN); + } while (ret == AVERROR(EAGAIN)); if (ret < 0) goto fail; if (rt->do_reconnect) { + int i; ffurl_close(rt->stream); rt->stream = NULL; rt->do_reconnect = 0; rt->nb_invokes = 0; - memset(rt->prev_pkt, 0, sizeof(rt->prev_pkt)); + for (i = 0; i < 2; i++) + memset(rt->prev_pkt[i], 0, + sizeof(**rt->prev_pkt) * rt->nb_prev_pkt[i]); free_tracked_methods(rt); goto reconnect; } if (rt->is_input) { + int err; // generate FLV header for demuxer rt->flv_size = 13; - rt->flv_data = av_realloc(rt->flv_data, rt->flv_size); + if ((err = av_reallocp(&rt->flv_data, rt->flv_size)) < 0) + return err; rt->flv_off = 0; memcpy(rt->flv_data, "FLV\1\5\0\0\0\011\0\0\0\0", rt->flv_size); } else { @@ -2514,6 +2631,26 @@ static int rtmp_read(URLContext *s, uint8_t *buf, int size) return orig_size; } +static int64_t rtmp_seek(URLContext *s, int stream_index, int64_t timestamp, + int flags) +{ + RTMPContext *rt = s->priv_data; + int ret; + av_log(s, AV_LOG_DEBUG, + "Seek on stream index %d at timestamp %"PRId64" with flags %08x\n", + stream_index, timestamp, flags); + if ((ret = gen_seek(s, rt, timestamp)) < 0) { + av_log(s, AV_LOG_ERROR, + "Unable to send seek command on stream index %d at timestamp " + "%"PRId64" with flags %08x\n", + stream_index, timestamp, flags); + return ret; + } + rt->flv_off = rt->flv_size; + rt->state = STATE_SEEKING; + return timestamp; +} + static int rtmp_write(URLContext *s, const uint8_t *buf, int size) { RTMPContext *rt = s->priv_data; @@ -2533,13 +2670,14 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size) continue; } - if (rt->flv_header_bytes < 11) { + if (rt->flv_header_bytes < RTMP_HEADER) { const uint8_t *header = rt->flv_header; - int copy = FFMIN(11 - rt->flv_header_bytes, size_temp); + int copy = FFMIN(RTMP_HEADER - rt->flv_header_bytes, size_temp); + int channel = RTMP_AUDIO_CHANNEL; bytestream_get_buffer(&buf_temp, rt->flv_header + rt->flv_header_bytes, copy); rt->flv_header_bytes += copy; size_temp -= copy; - if (rt->flv_header_bytes < 11) + if (rt->flv_header_bytes < RTMP_HEADER) break; pkttype = bytestream_get_byte(&header); @@ -2549,20 +2687,27 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size) bytestream_get_be24(&header); rt->flv_size = pktsize; + if (pkttype == RTMP_PT_VIDEO) + channel = RTMP_VIDEO_CHANNEL; + //force 12bytes header if (((pkttype == RTMP_PT_VIDEO || pkttype == RTMP_PT_AUDIO) && ts == 0) || pkttype == RTMP_PT_NOTIFY) { if (pkttype == RTMP_PT_NOTIFY) pktsize += 16; - rt->prev_pkt[1][RTMP_SOURCE_CHANNEL].channel_id = 0; + if ((ret = ff_rtmp_check_alloc_array(&rt->prev_pkt[1], + &rt->nb_prev_pkt[1], + channel)) < 0) + return ret; + rt->prev_pkt[1][channel].channel_id = 0; } //this can be a big packet, it's better to send it right here - if ((ret = ff_rtmp_packet_create(&rt->out_pkt, RTMP_SOURCE_CHANNEL, + if ((ret = ff_rtmp_packet_create(&rt->out_pkt, channel, pkttype, ts, pktsize)) < 0) return ret; - rt->out_pkt.extra = rt->main_channel_id; + rt->out_pkt.extra = rt->stream_id; rt->flv_data = rt->out_pkt.data; if (pkttype == RTMP_PT_NOTIFY) @@ -2614,7 +2759,8 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size) if ((ret = ff_rtmp_packet_read_internal(rt->stream, &rpkt, rt->in_chunk_size, - rt->prev_pkt[0], c)) <= 0) + &rt->prev_pkt[0], + &rt->nb_prev_pkt[0], c)) <= 0) return ret; if ((ret = rtmp_parse_result(s, rt, &rpkt)) < 0) @@ -2649,6 +2795,7 @@ static const AVOption rtmp_options[] = { {"rtmp_swfverify", "URL to player swf file, compute hash/size automatically.", OFFSET(swfverify), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC}, {"rtmp_tcurl", "URL of the target stream. Defaults to proto://host[:port]/app.", OFFSET(tcurl), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC|ENC}, {"rtmp_listen", "Listen for incoming rtmp connections", OFFSET(listen), AV_OPT_TYPE_INT, {.i64 = 0}, INT_MIN, INT_MAX, DEC, "rtmp_listen" }, + {"listen", "Listen for incoming rtmp connections", OFFSET(listen), AV_OPT_TYPE_INT, {.i64 = 0}, INT_MIN, INT_MAX, DEC, "rtmp_listen" }, {"timeout", "Maximum timeout (in seconds) to wait for incoming connections. -1 is infinite. Implies -rtmp_listen 1", OFFSET(listen_timeout), AV_OPT_TYPE_INT, {.i64 = -1}, INT_MIN, INT_MAX, DEC, "rtmp_listen" }, { NULL }, }; @@ -2665,6 +2812,7 @@ URLProtocol ff_##flavor##_protocol = { \ .name = #flavor, \ .url_open = rtmp_open, \ .url_read = rtmp_read, \ + .url_read_seek = rtmp_seek, \ .url_write = rtmp_write, \ .url_close = rtmp_close, \ .priv_data_size = sizeof(RTMPContext), \ |
