使用JavaCV编码音视频并通过MQTT传输

JavaCV Github地址:https://github.com/bytedeco/javacv
MQTT网站:https://www.eclipse.org/paho/clients/python/docs/
Mosquitto网站:https://mosquitto.org/
Paho Github地址:https://github.com/eclipse/paho.mqtt.java

JavaCV是OpenCV的Java实现,目前的版本已经集成了OpenCV和FFMPEG,所以不需要额外安装。

MQTT是一个客户端服务端架构的发布/订阅模式的消息传输协议。它的设计思想是轻巧、开放、简单、规范,易于实现。这些特点使得它对很多场景来说都是很好的选择,特别是对于受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT)。Mosquitto与Paho都是MQTT协议的实现。

目前实验室在做一个项目,需要采集音视频等数据并传输,而且要求使用MQTT。MQTT本身是为String类型消息而设计的,传输音视频这种数据流并不是很方便,但是为了和其他传感器数据一起传输,所以还是要解决问题。具体方案是用OpenCV获取摄像头数据,用javax.sound获取音频数据,将音视频用H.264进行编码,再通过MQTT发送至Broker,订阅端从Broker处获得数据并解码播放。

发布端的部分代码:

// 开启摄像头
FFmpegFramegrabber grabber = FrameGrabber.createDefault(0);
grabber.start();

//获取一帧图像,确定视频的长和宽
OpenCVFrameConverter.ToIplImage converter = new OpenCVFrameConverter.ToIplImage();
IplImage grabbedImage = converter.convert(grabber.grab());
width = grabbedImage.width();
height = grabbedImage.height();

//初始化编码器
ByteArrayOutputStream bos = new ByteArrayOutputStream();
// JavaCV支持以流/文件/推流地址作为Recorder/Grabber的输入,这一点很方便
FFmpegFrameRecorder recorderr = new FFmpegFrameRecorder(bos, width, height);

// 视频设置
recorder.setFormat("flv"); // 确定视频格式
recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264); // 确定编码格式
// recorder.setPixelFormat(org.bytedeco.javacpp.avutil.AV_PIX_FMT_BGR24); // 设置像素格式,可以不设置
recorder.setFrameRate(frameRate); // 设定帧率
//recorder.setVideoBitrate(2000000); //设定固定比特率
recorder.setVideoOption("tune", "zerolatency"); // 设置编码方式为实时,取消B帧,降低编码延迟
recorder.setVideoOption("preset", "ultrafast"); // 设置编码速度为最快,降低/压缩率
// recorder.setVideoOption("crf", "25"); // 设置固定位元率系数,可以不设置

// 音频设置
recorder.setAudioOption("crf", "0");  
recorder.setAudioQuality(0); // 设置音频质量
recorder.setAudioBitrate(192000); // 设置音频比特率
recorder.setSampleRate(44100); // 设置采样频率
recorder.setAudioChannels(2); // 设置声道数
recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC); //设置音频编码格式

//启动编码器
recorder.start();

// 启动音频编码线程
Audio a = new Audio();
a.start();

// 启动发布线程
Publish p = new Publish();
p.start();

开始编码

CanvasFrame cFrame = new CanvasFrame("Capture Preview", CanvasFrame.getDefaultGamma() / grabber.getGamma());
Frame capturedFrame = null;

long startTime = 0;
long vvideoTS = 0;
// While we are capturing...
while ((capturedFrame = grabber.grab()) != null) {
	if (cFrame.isVisible()) {
		// Show our frame in the preview
		cFrame.showImage(capturedFrame); // 显示当前帧
	}

	// Let's define our start time...
	// This needs to be initialized as close to when we'll use it as
	// possible,
	// as the delta from assignment to computed time could be too high
	if (startTime == 0) // 初始化为系统当前时间
		startTime = System.currentTimeMillis();

	// Create timestamp for this frame
	videoTS = 1000 * (System.currentTimeMillis() - startTime); // 时间戳

	// Check for AV drift
	if (videoTS > recorder.getTimestamp()) {
		System.out.println("Lip-flap correction: " + videoTS + " : " + recorder.getTimestamp() + " -> "
				+ (videoTS - recorder.getTimestamp()));

		// We tell the recorder to write this frame at this timestamp
		recorder.setTimestamp(videoTS); // 设置当前编码帧的时间戳

	}

	// Send the frame to the org.bytedeco.javacv.FFmpegFrameRecorder
	recorder.record(capturedFrame); // 编码
	bos.flush();
}

cFrame.dispose();
recorder.stop();
grabber.stop();

音频部分

// 音频获取及编码应使用另一线程
class Audio extends Thread {

	public void run() {
		{
			// 前半部分网上也有很多类似代码,所以不再多做注释
			// Pick a format...
			// NOTE: It is better to enumerate the formats that the system supports,
			// because getLine() can error out with any particular format...
			// For us: 44.1 sample rate, 16 bits, stereo, signed, little endian
			AudioFormat audioFormat = new AudioFormat(44100.0F, 16, 2, true, false);

			// Get TargetDataLine with that format
			Mixer.Info[] minfoSet = AudioSystem.getMixerInfo();
			Mixer mixer = AudioSystem.getMixer(minfoSet[4]);
			DataLine.Info dataLineInfo = new DataLine.Info(TargetDataLine.class, audioFormat);

			try {
				// Open and start capturing audio
				// It's possible to have more control over the chosen audio device with this
				// line:
				// TargetDataLine line = (TargetDataLine)mixer.getLine(dataLineInfo);
				TargetDataLine line = (TargetDataLine) AudioSystem.getLine(dataLineInfo);
				line.open(audioFormat);
				line.start();

				int sampleRate = (int) audioFormat.getSampleRate();
				int numChannels = audioFormat.getChannels();

				// Let's initialize our audio buffer...
				int audioBufferSize = sampleRate * numChannels;
				byte[] audioBytes = new byte[audioBufferSize];

				// 使用ScheduledThreadPoolExecutor使编码线程执行时间更精确
				// Using a ScheduledThreadPoolExecutor vs a while loop with
				// a Thread.sleep will allow
				// us to get around some OS specific timing issues, and keep
				// to a more precise
				// clock as the fixed rate accounts for garbage collection
				// time, etc
				// a similar approach could be used for the webcam capture
				// as well, if you wish
				ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);
				exec.scheduleAtFixedRate(new Runnable() {
					@Override
					public void run() {
						try {
							// Read from the line... non-blocking
							int nBytesRead = 0;
							while (nBytesRead == 0) {
								nBytesRead = line.read(audioBytes, 0, line.available());
							}

							// Since we specified 16 bits in the AudioFormat,
							// we need to convert our read byte[] to short[]
							// (see source from FFmpegFrameRecorder.recordSamples for AV_SAMPLE_FMT_S16)
							// Let's initialize our short[] array
							int nSamplesRead = nBytesRead / 2;
							short[] samples = new short[nSamplesRead];

							// Let's wrap our short[] into a ShortBuffer and
							// pass it to recordSamples
							ByteBuffer.wrap(audioBytes).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(samples);
							ShortBuffer sBuff = ShortBuffer.wrap(samples, 0, nSamplesRead);

							// recorder is instance of
							// org.bytedeco.javacv.FFmpegFrameRecorder
							recorder.recordSamples(sampleRate, numChannels, sBuff); // 编码音频
						} catch (org.bytedeco.javacv.FrameRecorder.Exception e) {
							e.printStackTrace();
						}
					}
				}, 0, (long) 1000 / 25, TimeUnit.MILLISECONDS); // 按照帧率确定音频编码频率
			} catch (LineUnavailableException e1) {
				e1.printStackTrace();
			}
		}
	}
}

发布数据部分

// 另一线程负责通过MQTT发布数据
class Publish extends Thread {

	public void run() {
		try {
			mqttPub mp = new mqttPub();
			while (!this.isInterrupted()) {
				if (bos.size() > 0) {
					mp.publish(bos.toByteArray());
					bos.reset();
					Thread.sleep(200);
				}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

class mqttPub implements MqttCallback {
	MqttClient client;
	MqttMessage message;

	// 连接至broker,设定本机名称
	public mqttPub() {
		try {
			client = new MqttClient("tcp://localhost:1883", "publisher");
			client.connect();
			System.out.println("Publisher connected!");
			message = new MqttMessage();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public void publish(byte[] data) {
		try {
			
			message.setPayload(data);
			client.publish("h264", message); // 设定主题并向broker发布内容。
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	// 一下三个函数分别是断开连接,收到数据和发送完成之后执行的回调函数,可以按需修改
	@Override
	public void connectionLost(Throwable cause) {
		// TODO Auto-generated method stub
	}

	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {
		System.out.println(message);
	}

	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {
		System.out.print("Published " + data.length + " Bytes.\n");
	}
}

未完待续

发表评论

电子邮件地址不会被公开。 必填项已用*标注