Comments (4)
This is a srt-rs bug--feel free to close your bugreport on ac-ffmpeg. Is it okay with you if I adapt this as an example to put in the repo?
Anyways, I hooked up your code to a UDP socket coming straight from
ffmpeg -re -stream_loop -1 -i ~/Videos/test.ts -c:v copy -c:a copy -f mpegts 'udp://127.0.0.1:1234?pkt_size=1316'
And it works
UDP input code
// Run with
// ffmpeg -re -stream_loop -1 -i ~/Videos/test.ts -c:v copy -c:a copy -f mpegts 'udp://127.0.0.1:1234?pkt_size=1316'
// cargo run --example=ffmpeg_ac (I put this file in srt-tokio/examples/ffmpeg_ac)
// python3 -m http.server
// ffplay http://localhost:8000/segments/dash.mpd
use ac_ffmpeg::{
codec::{CodecParameters, VideoCodecParametersBuilder},
format::{
demuxer::{Demuxer, InputFormat},
io::IO,
muxer::{Muxer, OutputFormat},
},
Error,
};
use anyhow::{Context, Result};
use bytes::Bytes;
use futures::Stream;
use srt_tokio::{SrtSocket, SrtSocketBuilder};
use std::{
fs::{self, File},
io::{self, Read},
sync::mpsc::{channel, Receiver, Sender},
time::Instant,
};
use tokio_stream::StreamExt;
use tokio_util::{codec::BytesCodec, udp::UdpFramed};
fn open_output(path: &str, elementary_streams: &[CodecParameters]) -> Result<Muxer<File>, Error> {
let output_format = OutputFormat::find_by_name("dash")
.ok_or_else(|| Error::new(format!("unable to guess output format for file: {}", path)))?;
let output = File::create(path)
.map_err(|err| Error::new(format!("unable to create output file {}: {}", path, err)))?;
let io = IO::from_seekable_write_stream(output);
let mut muxer_builder = Muxer::builder()
.set_option("url", path)
.set_option("use_timeline", "1")
.set_option("use_template", "1")
.set_option("hls_playlist", "1")
.set_option("streaming", "1")
.set_option("remove_at_exit", "1")
.set_option("window_size", "5")
.set_option("seg_duration", "6")
.set_option("adaptation_sets", "id=0,streams=v id=1,streams=a");
for codec_parameters in elementary_streams {
muxer_builder.add_stream(codec_parameters)?;
}
muxer_builder.build(io, output_format)
}
struct ByteReceiver {
rx: Receiver<bytes::Bytes>,
prev: Option<(bytes::Bytes, usize)>,
}
impl ByteReceiver {
fn new(rx: Receiver<bytes::Bytes>) -> Self {
Self { rx, prev: None }
}
}
impl Read for ByteReceiver {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if let Some(ref mut prev) = self.prev {
let limit = std::cmp::min(prev.0.len() - prev.1, buf.len());
buf[..limit].copy_from_slice(&prev.0[prev.1..prev.1 + limit]);
if prev.1 + limit < prev.0.len() {
prev.1 += limit;
} else {
self.prev = None
}
Ok(limit)
} else if let Ok(bytes) = self.rx.recv() {
let limit = std::cmp::min(bytes.len(), buf.len());
buf[..limit].copy_from_slice(&bytes[..limit]);
if buf.len() < bytes.len() {
self.prev = Some((bytes, buf.len()));
}
Ok(limit)
} else {
Ok(0)
}
}
}
fn handle_input(rx: impl Read) -> Result<()> {
let io = IO::from_read_stream(rx);
let format = InputFormat::find_by_name("mpegts").context("mpegts input format")?;
let mut demuxer = Demuxer::builder()
.input_format(Some(format))
.build(io)?
.find_stream_info(None)
.map_err(|(_, err)| err)?;
for (index, stream) in demuxer.streams().iter().enumerate() {
let params = stream.codec_parameters();
println!("Stream #{}:", index);
println!(" duration: {:?}", stream.duration().as_f64());
let tb = stream.time_base();
println!(" time base: {} / {}", tb.num(), tb.den());
if let Some(params) = params.as_audio_codec_parameters() {
println!(" type: audio");
println!(" codec: {}", params.decoder_name().unwrap_or("N/A"));
println!(" sample format: {}", params.sample_format().name());
println!(" sample rate: {}", params.sample_rate());
println!(" channels: {}", params.channel_layout().channels());
println!(" bitrate: {}", params.bit_rate());
} else if let Some(params) = params.as_video_codec_parameters() {
println!(" type: video");
println!(" codec: {}", params.decoder_name().unwrap_or("N/A"));
println!(" width: {}", params.width());
println!(" height: {}", params.height());
println!(" pixel format: {}", params.pixel_format().name());
} else {
println!(" type: unknown");
}
}
let mut codec_parameters = demuxer
.streams()
.iter()
.map(|stream| stream.codec_parameters())
.collect::<Vec<_>>();
for cp in &mut codec_parameters {
if let Some(vcp) = cp.as_video_codec_parameters() {
*cp = VideoCodecParametersBuilder::from(vcp.clone())
.bit_rate(10_000_000)
.build()
.into();
}
}
let mut muxer = open_output("segments/dash.mpd", &codec_parameters)?;
while let Some(packet) = demuxer.take()? {
if let Err(e) = muxer.push(packet) {
println!("Err: {}", e);
}
}
println!("Flushing muxer...");
muxer.flush()?;
println!("Muxer flushed.");
Ok(())
}
async fn handle_socket(
mut socket: impl Stream<Item = io::Result<(Instant, Bytes)>> + Unpin,
tx: Sender<bytes::Bytes>,
) -> Result<usize> {
let mut count = 0;
while let Some(packet) = socket.next().await {
let (_instant, bytes) = packet?;
tx.send(bytes)?;
count += 1;
}
Ok(count)
}
#[tokio::main]
async fn main() -> Result<()> {
let binding = SrtSocketBuilder::new_listen()
.local_port(3333)
.build_multiplexed()
.await?;
tokio::pin!(binding);
fs::create_dir_all("segments")?;
// handle_input(std::fs::File::open("/home/russelltg/Videos/test.ts").unwrap()).unwrap();
let socket = UdpFramed::new(
tokio::net::UdpSocket::bind("0.0.0.0:1234").await.unwrap(),
BytesCodec::new(),
)
.map(|f| f.map(|(by, _sa)| (Instant::now(), Bytes::from(by))));
println!("Socket bound");
let (tx, rx) = channel();
let f1 = tokio::task::spawn_blocking(move || handle_input(ByteReceiver::new(rx)));
let f2 = handle_socket(socket, tx);
let (r1, r2) = tokio::join!(f1, f2);
// while let Some(Ok(socket)) = binding.next().await {
// tokio::spawn(async move {
// let socket_id = socket.settings().remote_sockid.0;
// let (tx, rx) = channel();
// let f1 = tokio::task::spawn_blocking(move || handle_input(ByteReceiver::new(rx)));
// let f2 = async {
// let client_desc = format!(
// "(ip_port: {}, sockid: {}, stream_id: {:?})",
// socket.settings().remote,
// socket_id,
// socket.settings().stream_id,
// );
// println!("New client connected: {}", client_desc);
// let count = handle_socket(socket, tx).await?;
// println!(
// "Client {} disconnected, received {:?} packets",
// client_desc, count
// );
// Ok::<_, anyhow::Error>(())
// };
// let (r1, r2) = tokio::join!(f1, f2);
// if let Err(e) = r1 {
// println!("Error in input handler: {}", e);
// }
// if let Err(e) = r2 {
// println!("Error in socket handler: {}", e);
// }
// });
// }
println!("\nServer closed");
Ok(())
}
from srt-rs.
Love to start seeing more complex usecases!
I'll take a deeper look in a second, but srt-transmit has a similar problem when writing to stdout/file, where the interface needs AsyncRead
but we have a Stream
. I use FramedWrite
, but that's for the Async*
traits. You can see the source for that at
srt-rs/srt-transmit/src/main.rs
Line 478 in d2a9ee0
from srt-rs.
Thanks for investigating! :) Yes, feel free to add it as an example.
from srt-rs.
Should be good now, it's added as an example at srt-tokio/examples/srt2hls_ffmpeg.rs
. It has instructions on how to run it
from srt-rs.
Related Issues (20)
- The receiver stops receiving data unexpectedly.
- Limit send buffer size HOT 4
- duplicate the srt-live-transmit srt url syntax HOT 1
- Not compatiable with SRT < 1.3.0 (Support HSv4) HOT 17
- Multiplex server drops a client after a couple of seconds.
- Too late packets. HOT 28
- Multithread connections HOT 3
- Gathering statistics on SrtListener is blocked unless all clients are dropped HOT 2
- Possibility to variate the latency HOT 2
- Use url > 2.1.0 HOT 11
- Release new version on crates.io HOT 6
- Use dependabot to track and automate dependencies update
- Implement key size mismatch HOT 7
- Handle server rejection properly
- tokio::net::lookup_host does not resolve
- Unclear debugging when buffers are too small HOT 2
- snip snip snip
- Build fail
- Transfer file HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from srt-rs.