Coder Social home page Coder Social logo

Comments (4)

russelltg avatar russelltg commented on June 12, 2024 1

This is a srt-rs bug--feel free to close your bugreport on ac-ffmpeg. Is it okay with you if I adapt this as an example to put in the repo?

Anyways, I hooked up your code to a UDP socket coming straight from

ffmpeg -re -stream_loop -1 -i ~/Videos/test.ts -c:v copy -c:a copy -f mpegts 'udp://127.0.0.1:1234?pkt_size=1316'

And it works

UDP input code
// Run with
// ffmpeg -re -stream_loop -1 -i ~/Videos/test.ts -c:v copy -c:a copy -f mpegts 'udp://127.0.0.1:1234?pkt_size=1316'
// cargo run --example=ffmpeg_ac (I put this file in srt-tokio/examples/ffmpeg_ac)
// python3 -m http.server
// ffplay http://localhost:8000/segments/dash.mpd

use ac_ffmpeg::{
    codec::{CodecParameters, VideoCodecParametersBuilder},
    format::{
        demuxer::{Demuxer, InputFormat},
        io::IO,
        muxer::{Muxer, OutputFormat},
    },
    Error,
};
use anyhow::{Context, Result};
use bytes::Bytes;
use futures::Stream;
use srt_tokio::{SrtSocket, SrtSocketBuilder};
use std::{
    fs::{self, File},
    io::{self, Read},
    sync::mpsc::{channel, Receiver, Sender},
    time::Instant,
};
use tokio_stream::StreamExt;
use tokio_util::{codec::BytesCodec, udp::UdpFramed};

fn open_output(path: &str, elementary_streams: &[CodecParameters]) -> Result<Muxer<File>, Error> {
    let output_format = OutputFormat::find_by_name("dash")
        .ok_or_else(|| Error::new(format!("unable to guess output format for file: {}", path)))?;

    let output = File::create(path)
        .map_err(|err| Error::new(format!("unable to create output file {}: {}", path, err)))?;

    let io = IO::from_seekable_write_stream(output);

    let mut muxer_builder = Muxer::builder()
        .set_option("url", path)
        .set_option("use_timeline", "1")
        .set_option("use_template", "1")
        .set_option("hls_playlist", "1")
        .set_option("streaming", "1")
        .set_option("remove_at_exit", "1")
        .set_option("window_size", "5")
        .set_option("seg_duration", "6")
        .set_option("adaptation_sets", "id=0,streams=v id=1,streams=a");

    for codec_parameters in elementary_streams {
        muxer_builder.add_stream(codec_parameters)?;
    }

    muxer_builder.build(io, output_format)
}

struct ByteReceiver {
    rx: Receiver<bytes::Bytes>,
    prev: Option<(bytes::Bytes, usize)>,
}

impl ByteReceiver {
    fn new(rx: Receiver<bytes::Bytes>) -> Self {
        Self { rx, prev: None }
    }
}

impl Read for ByteReceiver {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        if let Some(ref mut prev) = self.prev {
            let limit = std::cmp::min(prev.0.len() - prev.1, buf.len());
            buf[..limit].copy_from_slice(&prev.0[prev.1..prev.1 + limit]);
            if prev.1 + limit < prev.0.len() {
                prev.1 += limit;
            } else {
                self.prev = None
            }
            Ok(limit)
        } else if let Ok(bytes) = self.rx.recv() {
            let limit = std::cmp::min(bytes.len(), buf.len());
            buf[..limit].copy_from_slice(&bytes[..limit]);
            if buf.len() < bytes.len() {
                self.prev = Some((bytes, buf.len()));
            }
            Ok(limit)
        } else {
            Ok(0)
        }
    }
}

fn handle_input(rx: impl Read) -> Result<()> {
    let io = IO::from_read_stream(rx);
    let format = InputFormat::find_by_name("mpegts").context("mpegts input format")?;
    let mut demuxer = Demuxer::builder()
        .input_format(Some(format))
        .build(io)?
        .find_stream_info(None)
        .map_err(|(_, err)| err)?;

    for (index, stream) in demuxer.streams().iter().enumerate() {
        let params = stream.codec_parameters();

        println!("Stream #{}:", index);
        println!("  duration: {:?}", stream.duration().as_f64());
        let tb = stream.time_base();
        println!("  time base: {} / {}", tb.num(), tb.den());

        if let Some(params) = params.as_audio_codec_parameters() {
            println!("  type: audio");
            println!("  codec: {}", params.decoder_name().unwrap_or("N/A"));
            println!("  sample format: {}", params.sample_format().name());
            println!("  sample rate: {}", params.sample_rate());
            println!("  channels: {}", params.channel_layout().channels());
            println!("  bitrate: {}", params.bit_rate());
        } else if let Some(params) = params.as_video_codec_parameters() {
            println!("  type: video");
            println!("  codec: {}", params.decoder_name().unwrap_or("N/A"));
            println!("  width: {}", params.width());
            println!("  height: {}", params.height());
            println!("  pixel format: {}", params.pixel_format().name());
        } else {
            println!("  type: unknown");
        }
    }

    let mut codec_parameters = demuxer
        .streams()
        .iter()
        .map(|stream| stream.codec_parameters())
        .collect::<Vec<_>>();

    for cp in &mut codec_parameters {
        if let Some(vcp) = cp.as_video_codec_parameters() {
            *cp = VideoCodecParametersBuilder::from(vcp.clone())
                .bit_rate(10_000_000)
                .build()
                .into();
        }
    }

    let mut muxer = open_output("segments/dash.mpd", &codec_parameters)?;

    while let Some(packet) = demuxer.take()? {
        if let Err(e) = muxer.push(packet) {
            println!("Err: {}", e);
        }
    }

    println!("Flushing muxer...");

    muxer.flush()?;

    println!("Muxer flushed.");

    Ok(())
}

async fn handle_socket(
    mut socket: impl Stream<Item = io::Result<(Instant, Bytes)>> + Unpin,
    tx: Sender<bytes::Bytes>,
) -> Result<usize> {
    let mut count = 0;
    while let Some(packet) = socket.next().await {
        let (_instant, bytes) = packet?;
        tx.send(bytes)?;
        count += 1;
    }

    Ok(count)
}

#[tokio::main]
async fn main() -> Result<()> {
    let binding = SrtSocketBuilder::new_listen()
        .local_port(3333)
        .build_multiplexed()
        .await?;
    tokio::pin!(binding);

    fs::create_dir_all("segments")?;

    // handle_input(std::fs::File::open("/home/russelltg/Videos/test.ts").unwrap()).unwrap();

    let socket = UdpFramed::new(
        tokio::net::UdpSocket::bind("0.0.0.0:1234").await.unwrap(),
        BytesCodec::new(),
    )
    .map(|f| f.map(|(by, _sa)| (Instant::now(), Bytes::from(by))));
    println!("Socket bound");
    let (tx, rx) = channel();
    let f1 = tokio::task::spawn_blocking(move || handle_input(ByteReceiver::new(rx)));
    let f2 = handle_socket(socket, tx);
    let (r1, r2) = tokio::join!(f1, f2);

    // while let Some(Ok(socket)) = binding.next().await {
    //     tokio::spawn(async move {
    //         let socket_id = socket.settings().remote_sockid.0;
    //         let (tx, rx) = channel();
    //         let f1 = tokio::task::spawn_blocking(move || handle_input(ByteReceiver::new(rx)));
    //         let f2 = async {
    //             let client_desc = format!(
    //                 "(ip_port: {}, sockid: {}, stream_id: {:?})",
    //                 socket.settings().remote,
    //                 socket_id,
    //                 socket.settings().stream_id,
    //             );
    //             println!("New client connected: {}", client_desc);
    //             let count = handle_socket(socket, tx).await?;
    //             println!(
    //                 "Client {} disconnected, received {:?} packets",
    //                 client_desc, count
    //             );
    //             Ok::<_, anyhow::Error>(())
    //         };
    //         let (r1, r2) = tokio::join!(f1, f2);

    //         if let Err(e) = r1 {
    //             println!("Error in input handler: {}", e);
    //         }
    //         if let Err(e) = r2 {
    //             println!("Error in socket handler: {}", e);
    //         }
    //     });
    // }

    println!("\nServer closed");

    Ok(())
}

from srt-rs.

russelltg avatar russelltg commented on June 12, 2024

Love to start seeing more complex usecases!

I'll take a deeper look in a second, but srt-transmit has a similar problem when writing to stdout/file, where the interface needs AsyncRead but we have a Stream. I use FramedWrite, but that's for the Async* traits. You can see the source for that at

Ok(FramedWrite::new(tokio::io::stdout(), BytesCodec::new())

from srt-rs.

niklaskorz avatar niklaskorz commented on June 12, 2024

Thanks for investigating! :) Yes, feel free to add it as an example.

from srt-rs.

russelltg avatar russelltg commented on June 12, 2024

Should be good now, it's added as an example at srt-tokio/examples/srt2hls_ffmpeg.rs. It has instructions on how to run it

from srt-rs.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.