From f7813a5324be39d13ab536c245d15dfc602a7849 Mon Sep 17 00:00:00 2001 From: Tim Redfern Date: Sun, 29 Dec 2013 12:19:38 +0000 Subject: basic type mechanism working --- ffmpeg/libavformat/udp.c | 139 ++++++++++++++++++++++++++++------------------- 1 file changed, 84 insertions(+), 55 deletions(-) (limited to 'ffmpeg/libavformat/udp.c') diff --git a/ffmpeg/libavformat/udp.c b/ffmpeg/libavformat/udp.c index e8a01db..047ff7c 100644 --- a/ffmpeg/libavformat/udp.c +++ b/ffmpeg/libavformat/udp.c @@ -84,23 +84,24 @@ typedef struct { char *local_addr; int packet_size; int timeout; + struct sockaddr_storage local_addr_storage; } UDPContext; #define OFFSET(x) offsetof(UDPContext, x) #define D AV_OPT_FLAG_DECODING_PARAM #define E AV_OPT_FLAG_ENCODING_PARAM static const AVOption options[] = { -{"buffer_size", "Socket buffer size in bytes", OFFSET(buffer_size), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E }, -{"localport", "Set local port to bind to", OFFSET(local_port), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E }, -{"localaddr", "Choose local IP address", OFFSET(local_addr), AV_OPT_TYPE_STRING, {.str = ""}, 0, 0, D|E }, -{"pkt_size", "Set size of UDP packets", OFFSET(packet_size), AV_OPT_TYPE_INT, {.i64 = 1472}, 0, INT_MAX, D|E }, -{"reuse", "Explicitly allow or disallow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D|E }, -{"ttl", "Set the time to live value (for multicast only)", OFFSET(ttl), AV_OPT_TYPE_INT, {.i64 = 16}, 0, INT_MAX, E }, -{"connect", "Should connect() be called on socket", OFFSET(is_connected), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D|E }, +{"buffer_size", "set packet buffer size in bytes", OFFSET(buffer_size), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E }, +{"localport", "set local port to bind to", OFFSET(local_port), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E }, +{"localaddr", "choose local IP address", OFFSET(local_addr), AV_OPT_TYPE_STRING, {.str = ""}, 0, 0, D|E }, +{"pkt_size", "set size of UDP packets", OFFSET(packet_size), AV_OPT_TYPE_INT, {.i64 = 1472}, 0, INT_MAX, D|E }, +{"reuse", "explicitly allow or disallow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D|E }, +{"ttl", "set the time to live value (for multicast only)", OFFSET(ttl), AV_OPT_TYPE_INT, {.i64 = 16}, 0, INT_MAX, E }, +{"connect", "set if connect() should be called on socket", OFFSET(is_connected), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D|E }, /* TODO 'sources', 'block' option */ -{"fifo_size", "Set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D }, -{"overrun_nonfatal", "Survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D }, -{"timeout", "In read mode: if no data arrived in more than this time interval, raise error", OFFSET(timeout), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D }, +{"fifo_size", "set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D }, +{"overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D }, +{"timeout", "set raise error timeout (only in read mode)", OFFSET(timeout), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D }, {NULL} }; @@ -140,14 +141,17 @@ static int udp_set_multicast_ttl(int sockfd, int mcastTTL, return 0; } -static int udp_join_multicast_group(int sockfd, struct sockaddr *addr) +static int udp_join_multicast_group(int sockfd, struct sockaddr *addr,struct sockaddr *local_addr) { #ifdef IP_ADD_MEMBERSHIP if (addr->sa_family == AF_INET) { struct ip_mreq mreq; mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr; - mreq.imr_interface.s_addr= INADDR_ANY; + if (local_addr) + mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr; + else + mreq.imr_interface.s_addr= INADDR_ANY; if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) { log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)"); return -1; @@ -169,14 +173,17 @@ static int udp_join_multicast_group(int sockfd, struct sockaddr *addr) return 0; } -static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr) +static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr,struct sockaddr *local_addr) { #ifdef IP_DROP_MEMBERSHIP if (addr->sa_family == AF_INET) { struct ip_mreq mreq; mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr; - mreq.imr_interface.s_addr= INADDR_ANY; + if (local_addr) + mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr; + else + mreq.imr_interface.s_addr= INADDR_ANY; if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) { log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)"); return -1; @@ -237,7 +244,7 @@ static int udp_set_multicast_sources(int sockfd, struct sockaddr *addr, int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6; struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0, SOCK_DGRAM, AF_UNSPEC, - AI_NUMERICHOST); + 0); if (!sourceaddr) return AVERROR(ENOENT); @@ -267,7 +274,7 @@ static int udp_set_multicast_sources(int sockfd, struct sockaddr *addr, struct ip_mreq_source mreqs; struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0, SOCK_DGRAM, AF_UNSPEC, - AI_NUMERICHOST); + 0); if (!sourceaddr) return AVERROR(ENOENT); if (sourceaddr->ai_addr->sa_family != AF_INET) { @@ -326,7 +333,7 @@ static int udp_socket_create(UDPContext *s, struct sockaddr_storage *addr, if (res0 == 0) goto fail; for (res = res0; res; res=res->ai_next) { - udp_fd = socket(res->ai_family, SOCK_DGRAM, 0); + udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, 0); if (udp_fd != -1) break; log_net_error(NULL, AV_LOG_ERROR, "socket"); } @@ -494,6 +501,27 @@ end: } #endif +static int parse_source_list(char *buf, char **sources, int *num_sources, + int max_sources) +{ + char *source_start; + + source_start = buf; + while (1) { + char *next = strchr(source_start, ','); + if (next) + *next = '\0'; + sources[*num_sources] = av_strdup(source_start); + if (!sources[*num_sources]) + return AVERROR(ENOMEM); + source_start = next + 1; + (*num_sources)++; + if (*num_sources >= max_sources || !next) + break; + } + return 0; +} + /* put it in UDP context */ /* return non zero if error */ static int udp_open(URLContext *h, const char *uri, int flags) @@ -507,8 +535,8 @@ static int udp_open(URLContext *h, const char *uri, int flags) struct sockaddr_storage my_addr; socklen_t len; int reuse_specified = 0; - int i, include = 0, num_sources = 0; - char *sources[32]; + int i, num_include_sources = 0, num_exclude_sources = 0; + char *include_sources[32], *exclude_sources[32]; h->is_streamed = 1; @@ -562,31 +590,26 @@ static int udp_open(URLContext *h, const char *uri, int flags) if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) { av_strlcpy(localaddr, buf, sizeof(localaddr)); } - if (av_find_info_tag(buf, sizeof(buf), "sources", p)) - include = 1; - if (include || av_find_info_tag(buf, sizeof(buf), "block", p)) { - char *source_start; - - source_start = buf; - while (1) { - char *next = strchr(source_start, ','); - if (next) - *next = '\0'; - sources[num_sources] = av_strdup(source_start); - if (!sources[num_sources]) - goto fail; - source_start = next + 1; - num_sources++; - if (num_sources >= FF_ARRAY_ELEMS(sources) || !next) - break; - } + if (av_find_info_tag(buf, sizeof(buf), "sources", p)) { + if (parse_source_list(buf, include_sources, &num_include_sources, + FF_ARRAY_ELEMS(include_sources))) + goto fail; + } + if (av_find_info_tag(buf, sizeof(buf), "block", p)) { + if (parse_source_list(buf, exclude_sources, &num_exclude_sources, + FF_ARRAY_ELEMS(exclude_sources))) + goto fail; } if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p)) s->timeout = strtol(buf, NULL, 10); } /* handling needed to support options picking from both AVOption and URL */ s->circular_buffer_size *= 188; - h->max_packet_size = s->packet_size; + if (flags & AVIO_FLAG_WRITE) { + h->max_packet_size = s->packet_size; + } else { + h->max_packet_size = UDP_MAX_PKT_SIZE; + } h->rw_timeout = s->timeout; /* fill the dest addr */ @@ -608,6 +631,8 @@ static int udp_open(URLContext *h, const char *uri, int flags) if (udp_fd < 0) goto fail; + s->local_addr_storage=my_addr; //store for future multicast join + /* Follow the requested reuse option, unless it's multicast in which * case enable reuse unless explicitly disabled. */ @@ -644,20 +669,20 @@ static int udp_open(URLContext *h, const char *uri, int flags) } if (h->flags & AVIO_FLAG_READ) { /* input */ - if (num_sources == 0 || !include) { - if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr) < 0) - goto fail; - - if (num_sources) { - if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, sources, num_sources, 0) < 0) - goto fail; - } - } else if (include && num_sources) { - if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, sources, num_sources, 1) < 0) + if (num_include_sources && num_exclude_sources) { + av_log(h, AV_LOG_ERROR, "Simultaneously including and excluding multicast sources is not supported\n"); + goto fail; + } + if (num_include_sources) { + if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, include_sources, num_include_sources, 1) < 0) goto fail; } else { - av_log(NULL, AV_LOG_ERROR, "invalid udp settings: inclusive multicast but no sources given\n"); - goto fail; + if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage) < 0) + goto fail; + } + if (num_exclude_sources) { + if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, exclude_sources, num_exclude_sources, 0) < 0) + goto fail; } } } @@ -686,8 +711,10 @@ static int udp_open(URLContext *h, const char *uri, int flags) } } - for (i = 0; i < num_sources; i++) - av_freep(&sources[i]); + for (i = 0; i < num_include_sources; i++) + av_freep(&include_sources[i]); + for (i = 0; i < num_exclude_sources; i++) + av_freep(&exclude_sources[i]); s->udp_fd = udp_fd; @@ -727,8 +754,10 @@ static int udp_open(URLContext *h, const char *uri, int flags) if (udp_fd >= 0) closesocket(udp_fd); av_fifo_free(s->fifo); - for (i = 0; i < num_sources; i++) - av_freep(&sources[i]); + for (i = 0; i < num_include_sources; i++) + av_freep(&include_sources[i]); + for (i = 0; i < num_exclude_sources; i++) + av_freep(&exclude_sources[i]); return AVERROR(EIO); } @@ -818,7 +847,7 @@ static int udp_close(URLContext *h) int ret; if (s->is_multicast && (h->flags & AVIO_FLAG_READ)) - udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr); + udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage); closesocket(s->udp_fd); #if HAVE_PTHREAD_CANCEL if (s->thread_started) { -- cgit v1.2.3