基于海康威视 ISUP5.0 协议完成对 NVR 设备接入的两种方案

基于 ISUP5.0 协议实现的两种集成海康威视 NVR 设备的方案:针对 PS 格式封装数据进行解析提取音视频数据实现实时预览的方案与基于流媒体服务器搭配内网穿透实现的拉流方案。

两种常见的基于 ISUP5.0 实现 NVR 设备接入的方案,实现视频实时预览、回放、远程控制等功能。

集成 NVR 设备的实时预览功能

在安防监控领域,海康威视的产品应用广泛。海康威视的 ISUP5.0(Intelligent Service Unified Platform 5.0)为 NVR(Network Video Recorder)设备的接入提供了解决方案。因为海外设备不支持 GB28181,ISUP5.0 协议主要用于海外市场。

方案一:解析海康威视录像数据提取音视频数据流​

这一方案基于海康威视官方提供的 SDK 二次开发实现而来。SDK 可以在官网这里下载。

通过 SDK 发起预览视频请求,接收并处理数据,在官方文档中对此也有详细说明:

isup 视频预览流程

接收到的数据是海康威视基于 PS 格式封装过的音视频数据流,所以需要对其进行解析。

1. PS 流简介​

PS(Program Stream)流是一种将音视频信息封装在一起的格式,常用于多媒体数据的传输与存储。在海康威视的监控系统中,PS 流是一种常见的音视频复合流格式。它主要包含多个数据包,这些数据包按照特定的结构组织,以便在网络中高效传输以及在接收端进行准确解析。​

2. 解析步骤​

  • 检索特定的起始码,以识别 PS 数据包的开头,依此划定流的不同部分。
  • 一旦识别出 PS 数据包,则需要从中提取 PES(分组基本流)数据包。这些数据包包含实际的音频和视频数据。
  • 根据音频流和视频流的流标识符对它们进行区分,并从 PES 数据包中提取原始的音频和视频数据。​
  • 如果音频数据是以 A-law 格式编码的,会将其解码为 PCM 格式。此外,还可以通过提升音量和应用清晰度增强处理来对音频进行优化(此为可选操作)。视频则为 H264。

3. 代码示例(以 Python 为例)​

class PSParser:
    """PS流解析器"""
    def __init__(self, debug=False, enhance_audio=True):  # 默认开启音频增强
        self.is_fix_ps_file = False
        self.buffer = bytearray()  # 用于存储未处理完的数据
        self.debug = debug  # 调试模式
        self.enhance_audio = enhance_audio  # 是否增强音频
        self.VOLUME_BOOST = 1.2  # 音量提升倍数
        self.CLARITY_ENHANCE = True  # 是否增强清晰度
        
        # 常量定义
        self.SEQUENCE_END_CODE = 0x000001b7
        self.ISO_11172_END_CODE = 0x000001b9
        self.PROGRAM_STREAM_PACKET_START_CODE = 0x000001ba
        self.PROGRAM_STREAM_SYSTEM_HEADER_START_CODE = 0x000001bb
        self.PROGRAM_STREAM_MAP = 0x000001bc
        self.PRIVATE_STREAM_1 = 0x000001bd
        self.PADDING_STREAM = 0x000001be
        self.PRIVATE_STREAM_2 = 0x000001bf
        
        # 流类型定义
        self.STREAM_TYPE_VIDEO_MPEG1 = 0x01
        self.STREAM_TYPE_VIDEO_MPEG2 = 0x02
        self.STREAM_TYPE_AUDIO_MPEG1 = 0x03
        self.STREAM_TYPE_AUDIO_MPEG2 = 0x04
        self.STREAM_TYPE_PRIVATE_SECTION = 0x05
        self.STREAM_TYPE_PRIVATE_DATA = 0x06
        self.STREAM_TYPE_AUDIO_AAC = 0x0f
        self.STREAM_TYPE_VIDEO_MPEG4 = 0x10
        self.STREAM_TYPE_VIDEO_H264 = 0x1b
        self.STREAM_TYPE_VIDEO_H265 = 0x24
        self.STREAM_TYPE_VIDEO_CAVS = 0x42
        self.STREAM_TYPE_VIDEO_SVAC = 0x80
        self.STREAM_TYPE_AUDIO_AC3 = 0x81
        self.STREAM_TYPE_AUDIO_ALAW = 0x90  # A-law音频类型
        
        # 音视频流ID范围
        self.VIDEO_STREAM_MIN = 0xE0  # 视频流起始ID (0xE0-0xEF)
        self.VIDEO_STREAM_MAX = 0xEF
        self.AUDIO_STREAM_MIN = 0xC0  # 音频流起始ID (0xC0-0xDF)
        self.AUDIO_STREAM_MAX = 0xDF
        
        # 音频参数
        self.AUDIO_SAMPLE_RATE = 8000  # 采样率
        self.AUDIO_CHANNELS = 1        # 通道数
        self.AUDIO_SAMPLE_BITS = 16    # 采样位数

    def log(self, message):
        """输出调试信息"""
        if self.debug:
            print(message)

    def get_next_start_code(self, data: bytes, start_pos: int, code_type: int) -> int:
        """查找下一个起始码"""
        for i in range(start_pos, len(data) - 4):
            if (data[i] == 0x00 and data[i + 1] == 0x00 and 
                data[i + 2] == 0x01 and data[i + 3] == code_type):
                return i
        return -1

    def get_next_start_code_range(self, data: bytes, start_pos: int, 
                                code_type_min: int, code_type_max: int) -> int:
        """在指定范围内查找下一个起始码"""
        for i in range(start_pos, len(data) - 4):
            if (data[i] == 0x00 and data[i + 1] == 0x00 and 
                data[i + 2] == 0x01 and 
                code_type_min <= data[i + 3] <= code_type_max):
                return i
        return -1

    def process_stream(self, data: Union[bytes, bytearray, int], buffer_size: Union[DWORD, int]) -> Optional[StreamData]:
        """
        处理流式PS数据,返回解析出的音视频数据
        
        Args:
            data: 输入的PS流数据或指向数据的指针
            buffer_size: 码流头部或码流数据缓冲区大小(DWORD类型或整数)
            
        Returns:
            Optional[StreamData]: 解析出的音视频数据,如果没有解析出数据则返回None
        """
        try:
            # 处理整数类型的data(指针)
            if isinstance(data, int):
                if data == 0:
                    return None
                
                # 获取实际的buffer_size值
                actual_buffer_size = buffer_size.value if hasattr(buffer_size, 'value') else buffer_size
                
                try:
                    # 使用ctypes从指针读取数据
                    data_ptr = cast(data, POINTER(c_char * actual_buffer_size))
                    data = bytes(data_ptr.contents)
                except Exception as e:
                    # 尝试使用string_at
                    try:
                        data = string_at(data, actual_buffer_size)
                    except Exception as e2:
                        return None
            elif not isinstance(data, (bytes, bytearray)):
                # 尝试转换为bytes
                try:
                    data = bytes(data)
                except:
                    return None
                
            # 将新数据添加到缓存
            # 检查buffer_size类型,支持DWORD和整数
            if hasattr(buffer_size, 'value'):
                actual_size = min(buffer_size.value, len(data))  # 使用buffer_size限制数据大小
            else:
                actual_size = min(buffer_size, len(data))  # 直接使用整数值
                
            self.buffer.extend(data[:actual_size])
            
            # 如果数据太少,等待更多数据
            if len(self.buffer) < 24:
                return None
                
            stream_data = StreamData()
            video_data = bytearray()
            audio_data = bytearray()
            pos = 0
            packet_start = 0
            
            while pos + 4 <= len(self.buffer):
                # 查找PS包起始码
                pos = self.get_next_start_code(self.buffer, pos, 0xBA)
                if pos < 0:
                    break
                    
                # 找到了一个完整的PS包
                if pos > packet_start:
                    packet_data = self.buffer[packet_start:pos]
                    # 处理PS包中的PES包
                    media_data = self._process_ps_packet_stream(packet_data)
                    if media_data.video:
                        video_data.extend(media_data.video)
                    if media_data.audio:
                        audio_data.extend(media_data.audio)
                
                packet_start = pos
                pos += 4
            
            # 保留未处理完的数据
            if packet_start > 0:
                self.buffer = self.buffer[packet_start:]
            
            if video_data:
                stream_data.video = bytes(video_data)
                
            if audio_data:
                # 将A-law音频数据转换为PCM
                pcm_data = AudioDecoder.alaw_to_pcm(audio_data)
                
                # 如果启用音频增强,则进行处理
                if self.enhance_audio:
                    pcm_data = AudioDecoder.enhance_audio(
                        pcm_data, 
                        volume_boost=self.VOLUME_BOOST,
                        clarity_enhance=self.CLARITY_ENHANCE
                    )
                
                stream_data.audio = pcm_data
                
            return stream_data if (stream_data.video or stream_data.audio) else None
        except Exception as e:
            if self.debug:
                print(f"Error in process_stream: {str(e)}")
            return None

上述代码展示了一个简单的 PS 流读取示例,通过检查数据包的前几个字节来识别 PS 流的开始部分。在实际应用中,解析过程将更为复杂,需要深入分析 PS 流的格式规范,包括对视频和音频信息的深度解析。

4. 预览

当解析出音视频之后,就可以通过 websocket 或者其他方式,推送数据到前端页面进行播放,比如 JMuxerPCMPlayer 等播放器。

var jmuxer;
var pcmPlayer;
var ws;
var socketURL = 'ws://192.168.31.7:9999/api/v1/ws/stream/E82622073/1';
var reconnectAttempts = 0;
var maxReconnectAttempts = 5;
var reconnectInterval = 2000; // 2秒
var lastVideoTimestamp = 0;
var lastAudioTimestamp = 0;
var bufferThreshold = 3; // 减少视频缓冲帧数
var videoBuffer = [];
var audioBuffer = [];
var isBuffering = true;
var isPlaying = false;
var fps = 25; // 默认帧率
var frameDuration = 40; // 默认帧持续时间(ms)
var audioSampleRate = 8000; // 默认音频采样率
var audioChannels = 1; // 默认音频通道数
var syncThreshold = 30; // 进一步降低同步阈值
var baseTimestamp = 0; // 基准时间戳
var startPlayTime = 0; // 开始播放的时间
var audioQueue = []; // 音频数据队列
var maxAudioDelay = 100; // 进一步降低最大音频延迟
var audioBufferSize = 512; // 进一步减小音频缓冲区大小
var audioLatency = -500; // 预设负延迟补偿,让音频提前播放
var initialAudioDelay = 2000; // 初始音频延迟(ms),用于预缓冲

function initPlayers() {
    if (jmuxer) {
        jmuxer.destroy();
    }
    
    jmuxer = new JMuxer({
        node: 'player',
        mode: 'video',
        flushingTime: frameDuration,
        fps: fps,
        debug: false
    });
    
    if (pcmPlayer) {
        pcmPlayer.destroy();
    }
    
    pcmPlayer = new PCMPlayer({
        inputCodec: 'Int16',
        channels: audioChannels,
        sampleRate: audioSampleRate,
        flushingTime: 20, // 减少刷新时间
        bufferSize: audioBufferSize
    });
    
    // 重置同步相关变量
    baseTimestamp = 0;
    startPlayTime = 0;
    lastVideoTimestamp = 0;
    lastAudioTimestamp = 0;
    audioQueue = [];
    audioLatency = -500; // 初始化为负延迟
    isBuffering = true;
    isPlaying = false;
}

function getCurrentTime() {
    return performance.now();
}

function processVideoData(data, timestamp, isKeyFrame) {
    if (!startPlayTime && isKeyFrame) {
        startPlayTime = getCurrentTime();
        baseTimestamp = timestamp;
        // 重置音频延迟补偿
        audioLatency = 0;
    }
    
    if (isBuffering) {
        videoBuffer.push({
            data: new Uint8Array(data),
            timestamp: timestamp,
            isKeyFrame: isKeyFrame
        });
        
        const hasKeyFrame = videoBuffer.some(frame => frame.isKeyFrame);
        if (videoBuffer.length >= bufferThreshold && hasKeyFrame) {
            isBuffering = false;
            // 按时间戳排序
            videoBuffer.sort((a, b) => a.timestamp - b.timestamp);
            for (let frame of videoBuffer) {
                jmuxer.feed({
                    video: frame.data,
                    duration: frameDuration
                });
            }
            videoBuffer = [];
            isPlaying = true;
            console.log('开始播放视频');
        }
    } else {
        if (startPlayTime) {
            const relativeTimestamp = timestamp - baseTimestamp;
            const systemTime = getCurrentTime() - startPlayTime;
            const diff = relativeTimestamp - systemTime;
            
            // 调整视频播放速度
            const video = document.getElementById('player');
            if (Math.abs(diff) > syncThreshold) {
                if (diff > 0) {
                    // 视频超前,稍微减慢
                    video.playbackRate = 0.9; // 更激进的速度调整
                } else {
                    // 视频滞后,稍微加快
                    video.playbackRate = 1.1; // 更激进的速度调整
                }
            } else {
                video.playbackRate = 1.0;
            }
        }
        
        jmuxer.feed({
            video: new Uint8Array(data),
            duration: frameDuration
        });
    }

}

function processAudioData(data, timestamp) {
    if (!startPlayTime || !isPlaying) {
        return; // 等待视频开始播放
    }
    
    const relativeTimestamp = timestamp - baseTimestamp;
    const systemTime = getCurrentTime() - startPlayTime;
    const diff = relativeTimestamp - systemTime;
    
    // 将音频数据和时间戳加入队列
    audioQueue.push({
        data: new Int16Array(data),
        timestamp: timestamp
    });
    
    // 按时间戳排序
    audioQueue.sort((a, b) => a.timestamp - b.timestamp);
    
    // 处理音频队列
    while (audioQueue.length > 0) {
        const audioData = audioQueue[0];
        const audioDiff = (audioData.timestamp - baseTimestamp) - (getCurrentTime() - startPlayTime + audioLatency);
        
        if (audioDiff > maxAudioDelay) {
            // 音频延迟太大,丢弃数据并增加延迟补偿
            audioQueue.shift();
            audioLatency = Math.max(-1000, audioLatency - 100); // 更激进地减少延迟补偿
            continue;
        }
        
        if (audioDiff < -maxAudioDelay) {
            // 音频太滞后,跳过并减少延迟补偿
            audioQueue.shift();
            audioLatency = Math.min(0, audioLatency + 100); // 更激进地增加延迟补偿
            continue;
        }
        
        // 处理音频数据
        const samples = audioData.data;
        if (Math.abs(audioDiff) > syncThreshold) {
            // 需要调整音频速度
            if (audioDiff > 0) {
                // 音频超前,跳过更多样本
                const skipSamples = Math.min(Math.floor(audioDiff * audioSampleRate / 1000), samples.length / 2);
                if (skipSamples < samples.length) {
                    pcmPlayer.feed(samples.slice(skipSamples));
                }
            } else {
                // 音频滞后,更激进地加快播放速度
                const speedupFactor = 1.2; // 加快20%
                const resampledLength = Math.floor(samples.length / speedupFactor);
                const resampledData = new Int16Array(resampledLength);
                
                for (let i = 0; i < resampledLength; i++) {
                    const originalIndex = Math.floor(i * speedupFactor);
                    resampledData[i] = samples[originalIndex];
                }
                
                pcmPlayer.feed(resampledData);
            }
        } else {
            // 正常播放
            pcmPlayer.feed(samples);
        }
        
        audioQueue.shift();
    }
    
    // 限制队列长度
    if (audioQueue.length > 3) { // 进一步减少队列最大长度
        audioQueue.splice(0, audioQueue.length - 3);
    }
}

function connectWebSocket() {
    if (ws && ws.readyState === WebSocket.OPEN) {
        ws.close();
    }
    
    // 重置状态
    videoBuffer = [];
    audioBuffer = [];
    audioQueue = [];
    isBuffering = true;
    isPlaying = false;
    baseTimestamp = 0;
    startPlayTime = 0;
    
    ws = new WebSocket(socketURL);
    ws.binaryType = 'arraybuffer';
    
    ws.addEventListener('open', function() {
        console.log('WebSocket connected');
        reconnectAttempts = 0;
    });
    
    ws.addEventListener('message', function(event) {
        try {
            if (typeof event.data === 'string') {
                const config = JSON.parse(event.data);
                if (config.type === 'config') {
                    fps = config.fps || fps;
                    frameDuration = Math.floor(1000 / fps);
                    audioSampleRate = config.audioSampleRate || audioSampleRate;
                    audioChannels = config.audioChannels || audioChannels;
                    initPlayers();
                    console.log(`配置更新: fps=${fps}, frameDuration=${frameDuration}ms, audioSampleRate=${audioSampleRate}, audioChannels=${audioChannels}`);
                }
                return;
            }
            
            const arrayBuffer = event.data;
            const dataView = new DataView(arrayBuffer);
            const messageType = dataView.getUint8(0);
            const timestamp = Number(dataView.getBigUint64(1, false));
            const isKeyFrame = dataView.getUint8(9) === 1;
            const dataLength = dataView.getUint32(10, false);
            const actualData = arrayBuffer.slice(14, 14 + dataLength);
            
            if (messageType === 1) {
                lastVideoTimestamp = timestamp;
                processVideoData(actualData, timestamp, isKeyFrame);
            } else if (messageType === 2) {
                lastAudioTimestamp = timestamp;
                processAudioData(actualData, timestamp);
            }
        } catch (e) {
            console.error('Error processing message:', e);
            if (e.toString().includes('decode') || e.toString().includes('NAL')) {
                console.log('重置播放器以恢复播放');
                initPlayers();
                isBuffering = true;
                videoBuffer = [];
                audioBuffer = [];
                audioQueue = [];
            }
        }
    });
    
    ws.addEventListener('error', function(e) {
        console.error('WebSocket Error:', e);
        attemptReconnect();
    });
    
    ws.addEventListener('close', function() {
        console.log('WebSocket closed');
        if (isPlaying) {
            attemptReconnect();
        }
    });
}

function attemptReconnect() {
    if (reconnectAttempts < maxReconnectAttempts) {
        reconnectAttempts++;
        setTimeout(function() {
            initPlayers();
            connectWebSocket();
        }, reconnectInterval);
    }
}

window.onload = function() {
    initPlayers();
    connectWebSocket();
};

window.onbeforeunload = function() {
    if (ws) {
        ws.close();
    }
    if (jmuxer) {
        jmuxer.destroy();
    }
    if (pcmPlayer) {
        pcmPlayer.destroy();
    }
};

方案二:直接通过 RTSP 进行拉流​

1. RTSP 简介​

RTSP(Real – Time Streaming Protocol)是一种应用层协议,用于控制实时媒体数据的传输,如音视频流。它允许客户端从服务器(如 NVR 设备)请求媒体流,并对媒体流的播放、暂停、快进等操作进行控制。在海康威视的监控体系中,RTSP 是一种常用的获取实时音视频流的方式。​

2. 拉流流程​

  • 配置 NVR 设备:登录 NVR 设备的管理界面,在网络配置或视频流配置部分,确保 RTSP 服务已启用,并设置合适的端口(通常默认为 554)。同时,配置好用户名和密码,用于客户端进行身份验证。不同型号的 NVR 设备配置界面可能略有差异,但基本的配置选项类似。​
  • 确定 RTSP 地址格式:海康威视 NVR 设备的 RTSP 地址通常遵循一定的格式,例如:rtsp://username:password@nvr_ip:port/mpeg/ch33/sub/av_stream。其中,username和password是在 NVR 设备上配置的登录用户名和密码,nvr_ip是 NVR 设备的 IP 地址,port是 RTSP 服务端口,mpeg 是指定格式,ch33是要获取的视频通道号, 33 表示第一个通道, sub 指定子码流。例如,若 NVR 设备的 IP 为 192.168.1.100,用户名是 admin,密码是 123456,视频通道号为 1,RTSP 地址则为rtsp://admin:123456@192.168.1.100:554/mpeg/ch33/sub/av_stream。​
  • 客户端拉流:在客户端应用程序中,使用支持 RTSP 协议的库或组件来发起拉流请求。例如,在基于 FFmpeg 的应用中,可以使用 FFmpeg 的相关 API 函数。​

3. 内网穿透实现(通过硬件设备)​

由于 NVR 设备可能位于内网环境中,而客户端可能需要从外网访问,此时需要进行内网穿透。一种常见的方式是使用硬件设备进行内网穿透,如使用具有内网穿透功能的路由器或专门的内网穿透硬件设备。​

  • 硬件设备设置:将硬件设备连接到内网环境中,并进行相应的配置。配置过程通常需要保证设备与 NVR 设备处于同个局域网,使其能够与内网中的 NVR 设备和其他网络设备正常通信。然后,在硬件设备的管理界面中,配置内网穿透规则。例如,将 NVR 设备的 RTSP 服务端口(如 554)映射到硬件设备的公网 IP 地址的某个端口上。​
  • 端口映射与转发:硬件设备会将来自外网的针对特定端口(如映射后的端口)的请求,转发到内网中 NVR 设备的 RTSP 服务端口。这样,客户端在外网通过访问硬件设备的公网 IP 地址和映射端口,硬件设备会将请求转发给内网中的 NVR 设备,从而实现从外网访问内网中的 NVR 设备的 RTSP 流。在配置端口映射时,需要注意选择合适的公网端口,避免与其他服务端口冲突,并确保硬件设备的安全性,如设置访问密码或进行 IP 地址过滤等。​

通过以上两种方案,可以基于海康威视 ISUP5.0 实现 NVR 设备的接入,满足不同场景下对 NVR 设备音视频数据获取和处理的需求。在实际应用中,需要根据具体的网络环境、业务需求和技术能力选择合适的方案,并进行合理的配置和优化。​

留下评论

您的邮箱地址不会被公开。 必填项已用 * 标注