|
|
@@ -0,0 +1,711 @@
|
|
|
+package org.jetlinks.community.media.service;
|
|
|
+
|
|
|
+
|
|
|
+import cn.hutool.core.util.StrUtil;
|
|
|
+import cn.hutool.json.JSONArray;
|
|
|
+import cn.hutool.json.JSONObject;
|
|
|
+import cn.hutool.json.JSONUtil;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.hswebframework.web.crud.service.GenericReactiveCrudService;
|
|
|
+import org.jetlinks.community.media.bean.SSRCInfo;
|
|
|
+import org.jetlinks.community.media.bean.StreamInfo;
|
|
|
+import org.jetlinks.community.media.contanst.VideoManagerConstants;
|
|
|
+import org.jetlinks.community.media.bean.SsrcConfig;
|
|
|
+import org.jetlinks.community.media.entity.MediaDevice;
|
|
|
+import org.jetlinks.community.media.session.VideoStreamSessionManager;
|
|
|
+import org.jetlinks.community.media.zlm.ZLMRESTfulUtils;
|
|
|
+import org.jetlinks.community.media.zlm.ZLMRTPServerFactory;
|
|
|
+import org.jetlinks.community.media.zlm.ZLMServerConfig;
|
|
|
+import org.jetlinks.community.media.zlm.entity.MediaServerItem;
|
|
|
+import org.jetlinks.community.utils.RedisUtil;
|
|
|
+import org.jetlinks.core.cluster.ClusterManager;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.boot.CommandLineRunner;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+import reactor.core.publisher.Mono;
|
|
|
+
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 媒体服务器节点管理
|
|
|
+ */
|
|
|
+@Service
|
|
|
+@Slf4j
|
|
|
+public class LocalMediaServerItemService extends GenericReactiveCrudService<MediaServerItem, String> implements CommandLineRunner {
|
|
|
+
|
|
|
+// private final SipConfig sipConfig;
|
|
|
+
|
|
|
+ @Value("${server.ssl.enabled:false}")
|
|
|
+ private boolean sslEnabled;
|
|
|
+
|
|
|
+ @Value("${server.port}")
|
|
|
+ private Integer serverPort;
|
|
|
+
|
|
|
+
|
|
|
+ private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
|
+
|
|
|
+ private final RedisUtil redisUtil;
|
|
|
+
|
|
|
+ private final String serverId;
|
|
|
+
|
|
|
+ private final ZLMRESTfulUtils zlmresTfulUtils;
|
|
|
+
|
|
|
+ private final ZLMRTPServerFactory zlmrtpServerFactory;
|
|
|
+
|
|
|
+ private final VideoStreamSessionManager streamSession;
|
|
|
+ @Autowired
|
|
|
+ public LocalMediaServerItemService(
|
|
|
+ ClusterManager clusterManager,
|
|
|
+ RedisUtil redisUtil,
|
|
|
+ ZLMRESTfulUtils zlmresTfulUtils,
|
|
|
+ ZLMRTPServerFactory zlmrtpServerFactory,
|
|
|
+ VideoStreamSessionManager streamSession) {
|
|
|
+ this.serverId= clusterManager.getCurrentServerId();
|
|
|
+ this.redisUtil=redisUtil;
|
|
|
+ this.zlmresTfulUtils=zlmresTfulUtils;
|
|
|
+ this.zlmrtpServerFactory=zlmrtpServerFactory;
|
|
|
+ this.streamSession=streamSession;
|
|
|
+ }
|
|
|
+
|
|
|
+ // @Autowired
|
|
|
+// private final UserSetup userSetup;
|
|
|
+//
|
|
|
+
|
|
|
+//
|
|
|
+// @Autowired
|
|
|
+// private final MediaServerMapper mediaServerMapper;
|
|
|
+//
|
|
|
+// @Autowired
|
|
|
+// private final VideoStreamSessionManager streamSession;
|
|
|
+//
|
|
|
+//
|
|
|
+// @Autowired
|
|
|
+// private final RedisUtil redisUtil;
|
|
|
+//
|
|
|
+// @Autowired
|
|
|
+// private final IVideoManagerStorager storager;
|
|
|
+//
|
|
|
+// @Autowired
|
|
|
+// private final IStreamProxyService streamProxyService;
|
|
|
+//
|
|
|
+// @Autowired
|
|
|
+// private final EventPublisher publisher;
|
|
|
+//
|
|
|
+// @Autowired
|
|
|
+// JedisUtil jedisUtil;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 初始化媒体流服务器信息,将所有信息放入缓存中
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void run(String... args) throws Exception {
|
|
|
+ log.info("[缓存初始化] Media Server ");
|
|
|
+ //查询所有媒体服务器数据
|
|
|
+ this.createQuery()
|
|
|
+ .fetch()
|
|
|
+ .filter(item->StrUtil.isNotEmpty(item.getId()))
|
|
|
+ .doOnNext(mediaServerItem->{
|
|
|
+ if (mediaServerItem.getSsrcConfig() == null) {
|
|
|
+ //对ssrc进行更新 todo
|
|
|
+ SsrcConfig ssrcConfig = new SsrcConfig(mediaServerItem.getId(), null,"340200000");
|
|
|
+ mediaServerItem.setSsrcConfig(ssrcConfig);
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .doOnNext(mediaServerItem->redisUtil.set(VideoManagerConstants.MEDIA_SERVER_PREFIX + serverId + "_" + mediaServerItem.getId(),mediaServerItem))
|
|
|
+ .subscribe();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId) {
|
|
|
+ return openRTPServer(mediaServerItem, streamId, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean isPlayback) {
|
|
|
+ if (mediaServerItem == null || mediaServerItem.getId() == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ // 获取mediaServer可用的ssrc
|
|
|
+ String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + serverId + "_" + mediaServerItem.getId();
|
|
|
+
|
|
|
+ SsrcConfig ssrcConfig = mediaServerItem.getSsrcConfig();
|
|
|
+ if (ssrcConfig == null) {
|
|
|
+ log.info("media server [ {} ] ssrcConfig is null", mediaServerItem.getId());
|
|
|
+ return null;
|
|
|
+ }else {
|
|
|
+ String ssrc = null;
|
|
|
+ if (isPlayback) {
|
|
|
+ ssrc = ssrcConfig.getPlayBackSsrc();
|
|
|
+ }else {
|
|
|
+ ssrc = ssrcConfig.getPlaySsrc();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (streamId == null) {
|
|
|
+ streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase();
|
|
|
+ }
|
|
|
+ int rtpServerPort = mediaServerItem.getRtpProxyPort();
|
|
|
+ if (mediaServerItem.isRtpEnable()) {
|
|
|
+ rtpServerPort = zlmrtpServerFactory.createRTPServer(mediaServerItem, streamId);
|
|
|
+ }
|
|
|
+ redisUtil.set(key, mediaServerItem);
|
|
|
+ return SSRCInfo.of(rtpServerPort, ssrc, streamId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public void closeRTPServer(MediaDevice device, String channelId) {
|
|
|
+ String mediaServerId = streamSession.getMediaServerId(device.getId(), channelId);
|
|
|
+ MediaServerItem mediaServerItem = this.getOne(mediaServerId);
|
|
|
+ if (mediaServerItem != null) {
|
|
|
+ //更新sstc信息
|
|
|
+ String streamId = String.format("%s_%s", device.getId(), channelId);
|
|
|
+ zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
|
|
|
+ releaseSsrc(mediaServerItem, streamSession.getSSRC(device.getId(), channelId));
|
|
|
+ }
|
|
|
+ streamSession.remove(device.getId(), channelId);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public void releaseSsrc(MediaServerItem mediaServerItem, String ssrc) {
|
|
|
+ if (mediaServerItem == null || ssrc == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ SsrcConfig ssrcConfig = mediaServerItem.getSsrcConfig();
|
|
|
+ ssrcConfig.releaseSsrc(ssrc);
|
|
|
+ mediaServerItem.setSsrcConfig(ssrcConfig);
|
|
|
+ String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + serverId+ "_" + mediaServerItem.getId();
|
|
|
+ redisUtil.set(key, mediaServerItem);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * zlm 重启后重置他的推流信息, TODO 给正在使用的设备发送停止命令
|
|
|
+ */
|
|
|
+ private void clearRTPServer(MediaServerItem mediaServerItem) {
|
|
|
+// mediaServerItem.setSsrcConfig(new SsrcConfig(mediaServerItem.getId(), null, sipConfig.getDomain()));
|
|
|
+ mediaServerItem.setSsrcConfig(new SsrcConfig(mediaServerItem.getId(), null,"340200000"));
|
|
|
+ redisUtil.zAdd(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + serverId, mediaServerItem.getId(), 0);
|
|
|
+ }
|
|
|
+//
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public void update(MediaServerItem mediaSerItem) {
|
|
|
+// mediaServerMapper.update(mediaSerItem);
|
|
|
+// MediaServerItem mediaServerItemInRedis = getOne(mediaSerItem.getId());
|
|
|
+// MediaServerItem mediaServerItemInDataBase = mediaServerMapper.queryOne(mediaSerItem.getId());
|
|
|
+// if (mediaServerItemInRedis != null && mediaServerItemInRedis.getSsrcConfig() != null) {
|
|
|
+// mediaServerItemInDataBase.setSsrcConfig(mediaServerItemInRedis.getSsrcConfig());
|
|
|
+// }else {
|
|
|
+// mediaServerItemInDataBase.setSsrcConfig(
|
|
|
+// new SsrcConfig(
|
|
|
+// mediaServerItemInDataBase.getId(),
|
|
|
+// null,
|
|
|
+// sipConfig.getDomain()
|
|
|
+// )
|
|
|
+// );
|
|
|
+// }
|
|
|
+// String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + mediaServerItemInDataBase.getId();
|
|
|
+// redisUtil.set(key, mediaServerItemInDataBase);
|
|
|
+// }
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public List<MediaServerItem> getAll() {
|
|
|
+// List<MediaServerItem> result = new ArrayList<>();
|
|
|
+// List<Object> mediaServerKeys = redisUtil.scan(String.format("%S*", VideoManagerConstants.MEDIA_SERVER_PREFIX+ userSetup.getServerId() + "_" ));
|
|
|
+// String onlineKey = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId();
|
|
|
+// for (Object mediaServerKey : mediaServerKeys) {
|
|
|
+// String key = (String) mediaServerKey;
|
|
|
+// MediaServerItem mediaServerItem = (MediaServerItem) redisUtil.get(key);
|
|
|
+// // 检查状态
|
|
|
+// if (redisUtil.zScore(onlineKey, mediaServerItem.getId()) != null) {
|
|
|
+// mediaServerItem.setStatus(true);
|
|
|
+// }
|
|
|
+// result.add(mediaServerItem);
|
|
|
+// }
|
|
|
+// result.sort((serverItem1, serverItem2)->{
|
|
|
+// int sortResult = 0;
|
|
|
+// try {
|
|
|
+// sortResult = format.parse(serverItem1.getCreateTime()).compareTo(format.parse(serverItem2.getCreateTime()));
|
|
|
+// } catch (ParseException e) {
|
|
|
+// e.printStackTrace();
|
|
|
+// }
|
|
|
+// return sortResult;
|
|
|
+// });
|
|
|
+// return result;
|
|
|
+// }
|
|
|
+//
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public List<MediaServerItem> getAllFromDatabase() {
|
|
|
+// return mediaServerMapper.queryAll();
|
|
|
+// }
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public List<MediaServerItem> getAllOnline() {
|
|
|
+// String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId();
|
|
|
+// Set<String> mediaServerIdSet = redisUtil.zRevRange(key, 0, -1);
|
|
|
+//
|
|
|
+// List<MediaServerItem> result = new ArrayList<>();
|
|
|
+// if (mediaServerIdSet != null && mediaServerIdSet.size() > 0) {
|
|
|
+// for (String mediaServerId : mediaServerIdSet) {
|
|
|
+// String serverKey = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + mediaServerId;
|
|
|
+// result.add((MediaServerItem) redisUtil.get(serverKey));
|
|
|
+// }
|
|
|
+// }
|
|
|
+// Collections.reverse(result);
|
|
|
+// return result;
|
|
|
+// }
|
|
|
+//
|
|
|
+ /**
|
|
|
+ * 获取单个zlm服务器
|
|
|
+ * @param mediaServerId 服务id
|
|
|
+ * @return MediaServerItem
|
|
|
+ */
|
|
|
+ public MediaServerItem getOne(String mediaServerId) {
|
|
|
+ if (mediaServerId == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + serverId + "_" + mediaServerId;
|
|
|
+ return (MediaServerItem)redisUtil.get(key);
|
|
|
+ }
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public MediaServerItem getOneByHostAndPort(String host, int port) {
|
|
|
+// return mediaServerMapper.queryOneByHostAndPort(host, port);
|
|
|
+// }
|
|
|
+//
|
|
|
+
|
|
|
+ //todo
|
|
|
+ public Mono<MediaServerItem> getDefaultMediaServer() {
|
|
|
+// return mediaServerMapper.queryDefault();
|
|
|
+ return Mono.empty();
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<Void> clearMediaServerForOnline() {
|
|
|
+ String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + serverId;
|
|
|
+ return Mono.fromRunnable(()->redisUtil.del(key));
|
|
|
+ }
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public WVPResult<String> add(MediaServerItem mediaServerItem) {
|
|
|
+// WVPResult<String> result = new WVPResult<>();
|
|
|
+// mediaServerItem.setCreateTime(this.format.format(System.currentTimeMillis()));
|
|
|
+// mediaServerItem.setUpdateTime(this.format.format(System.currentTimeMillis()));
|
|
|
+// mediaServerItem.setHookAliveInterval(120);
|
|
|
+// JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
|
|
|
+// if (responseJSON != null) {
|
|
|
+// JSONArray data = responseJSON.getJSONArray("data");
|
|
|
+// if (data != null && data.size() > 0) {
|
|
|
+// ZLMServerConfig zlmServerConfig= JSON.parseObject(JSON.toJSONString(data.get(0)), ZLMServerConfig.class);
|
|
|
+// if (mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId()) != null) {
|
|
|
+// result.setCode(-1);
|
|
|
+// result.setMsg("保存失败,媒体服务ID [ " + zlmServerConfig.getGeneralMediaServerId() + " ] 已存在,请修改媒体服务器配置");
|
|
|
+// return result;
|
|
|
+// }
|
|
|
+// mediaServerItem.setId(zlmServerConfig.getGeneralMediaServerId());
|
|
|
+// zlmServerConfig.setIp(mediaServerItem.getIp());
|
|
|
+// mediaServerMapper.add(mediaServerItem);
|
|
|
+// zlmServerOnline(zlmServerConfig);
|
|
|
+// result.setCode(0);
|
|
|
+// result.setMsg("success");
|
|
|
+// }else {
|
|
|
+// result.setCode(-1);
|
|
|
+// result.setMsg("连接失败");
|
|
|
+// }
|
|
|
+//
|
|
|
+// }else {
|
|
|
+// result.setCode(-1);
|
|
|
+// result.setMsg("连接失败");
|
|
|
+// }
|
|
|
+// return result;
|
|
|
+// }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理zlm上线 todo
|
|
|
+ * @param zlmServerConfig zlm上线携带的参数
|
|
|
+ */
|
|
|
+ public Mono<Void> zlmServerOnline(ZLMServerConfig zlmServerConfig) {
|
|
|
+ log.info("[ ZLM:{} ]-[ {}:{} ]已连接",
|
|
|
+ zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
|
|
|
+ //根据id主键查询
|
|
|
+ return this.findById(zlmServerConfig.getGeneralMediaServerId())
|
|
|
+ .switchIfEmpty(this.createQuery()
|
|
|
+ .where(MediaServerItem::getIp,zlmServerConfig.getIp())
|
|
|
+ .where(MediaServerItem::getHttpPort,zlmServerConfig.getHttpPort())
|
|
|
+ .fetchOne())
|
|
|
+ .switchIfEmpty(Mono.fromRunnable(()->
|
|
|
+ log.warn("[未注册的zlm] 拒接接入:来自{}:{}", zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() )).
|
|
|
+ then(Mono.empty()))
|
|
|
+ .doOnNext(serverItem->{
|
|
|
+ serverItem.setHookAliveInterval(zlmServerConfig.getHookAliveInterval());
|
|
|
+ if (serverItem.getHttpPort() == 0) {
|
|
|
+ serverItem.setHttpPort(zlmServerConfig.getHttpPort());
|
|
|
+ }
|
|
|
+ if (serverItem.getHttpSSlPort() == 0) {
|
|
|
+ serverItem.setHttpSSlPort(zlmServerConfig.getHttpSSLport());
|
|
|
+ }
|
|
|
+ if (serverItem.getRtmpPort() == 0) {
|
|
|
+ serverItem.setRtmpPort(zlmServerConfig.getRtmpPort());
|
|
|
+ }
|
|
|
+ if (serverItem.getRtmpSSlPort() == 0) {
|
|
|
+ serverItem.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort());
|
|
|
+ }
|
|
|
+ if (serverItem.getRtspPort() == 0) {
|
|
|
+ serverItem.setRtspPort(zlmServerConfig.getRtspPort());
|
|
|
+ }
|
|
|
+ if (serverItem.getRtspSSLPort() == 0) {
|
|
|
+ serverItem.setRtspSSLPort(zlmServerConfig.getRtspSSlport());
|
|
|
+ }
|
|
|
+ if (serverItem.getRtpProxyPort() == 0) {
|
|
|
+ serverItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort());
|
|
|
+ }
|
|
|
+ serverItem.setStatus(true);
|
|
|
+ })
|
|
|
+ //更新zlm服务器信息
|
|
|
+ .doOnNext(serverItem->{
|
|
|
+ if (StrUtil.isEmpty(serverItem.getId())) {
|
|
|
+ serverItem.setId(zlmServerConfig.getGeneralMediaServerId());
|
|
|
+ this.createUpdate()
|
|
|
+ .set(serverItem)
|
|
|
+ .where(MediaServerItem::getIp,zlmServerConfig.getIp())
|
|
|
+ .where(MediaServerItem::getHttpPort,zlmServerConfig.getHttpPort())
|
|
|
+ .execute()
|
|
|
+ .subscribe();
|
|
|
+ }else {
|
|
|
+ this.createUpdate()
|
|
|
+ .set(serverItem)
|
|
|
+ .where(MediaServerItem::getId,serverItem.getId())
|
|
|
+ .execute()
|
|
|
+ .subscribe();
|
|
|
+ }
|
|
|
+ })
|
|
|
+ //更新zlm服务器缓存信息
|
|
|
+ .doOnNext(serverItem->{
|
|
|
+ String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + serverId + "_" + zlmServerConfig.getGeneralMediaServerId();
|
|
|
+ if (redisUtil.get(key) == null) {
|
|
|
+ //todo
|
|
|
+// SsrcConfig ssrcConfig = new SsrcConfig(zlmServerConfig.getGeneralMediaServerId(), null, sipconfig.getDomain());
|
|
|
+ SsrcConfig ssrcConfig = new SsrcConfig(zlmServerConfig.getGeneralMediaServerId(), null," sipconfig.getDomain()");
|
|
|
+ serverItem.setSsrcConfig(ssrcConfig);
|
|
|
+ }else {
|
|
|
+ MediaServerItem mediaServerItemInRedis = (MediaServerItem)redisUtil.get(key);
|
|
|
+ serverItem.setSsrcConfig(mediaServerItemInRedis.getSsrcConfig());
|
|
|
+ }
|
|
|
+ redisUtil.set(key, serverItem);
|
|
|
+ })
|
|
|
+ //更新zlm服务器在线信息
|
|
|
+ .doOnNext(this::resetOnlineServerItem)
|
|
|
+ .doOnNext(serverItem-> setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable())))
|
|
|
+ //更新zlm服务器存活心跳
|
|
|
+ .flatMap(serverItem -> updateMediaServerKeepalive(serverItem.getId(), null))
|
|
|
+
|
|
|
+ .then()
|
|
|
+
|
|
|
+
|
|
|
+ ;
|
|
|
+ //todo
|
|
|
+// publisher.zlmOnlineEventPublish(serverItem.getId());
|
|
|
+
|
|
|
+ }
|
|
|
+//
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public void zlmServerOffline(String mediaServerId) {
|
|
|
+// delete(mediaServerId);
|
|
|
+// }
|
|
|
+//
|
|
|
+
|
|
|
+ private void resetOnlineServerItem(MediaServerItem serverItem) {
|
|
|
+ // 更新缓存
|
|
|
+ String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + serverId;
|
|
|
+ // 使用zset的分数作为当前并发量, 默认值设置为0
|
|
|
+ if (redisUtil.zScore(key, serverItem.getId()) == null) { // 不存在则设置默认值 已存在则重置
|
|
|
+ redisUtil.zAdd(key, serverItem.getId(), 0L);
|
|
|
+ // 查询服务流数量
|
|
|
+ zlmresTfulUtils.getMediaList(serverItem, null, null, "rtmp",(mediaList ->{
|
|
|
+ Integer code = mediaList.getInt("code");
|
|
|
+ if (code == 0) {
|
|
|
+ JSONArray data = mediaList.getJSONArray("data");
|
|
|
+ if (data != null) {
|
|
|
+ redisUtil.zAdd(key, serverItem.getId(), data.size());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }));
|
|
|
+ }else {
|
|
|
+ clearRTPServer(serverItem);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 媒体服务器通道在线数
|
|
|
+ * @param mediaServerId
|
|
|
+ */
|
|
|
+ public Mono<Void> addCount(String mediaServerId) {
|
|
|
+ return Mono.fromRunnable(()->{
|
|
|
+ if (mediaServerId == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + serverId;
|
|
|
+ redisUtil.zIncrScore(key, mediaServerId, 1);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<Void> removeCount(String mediaServerId) {
|
|
|
+ return Mono.fromRunnable(()->{
|
|
|
+ String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + serverId;
|
|
|
+ redisUtil.zIncrScore(key, mediaServerId, - 1);
|
|
|
+ });
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取负载最低的节点
|
|
|
+ * @return MediaServerItem
|
|
|
+ */
|
|
|
+ public Mono<MediaServerItem> getMediaServerForMinimumLoad() {
|
|
|
+ String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + serverId;
|
|
|
+
|
|
|
+ if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) {
|
|
|
+ log.info("获取负载最低的节点时无在线节点");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取分数最低的,及并发最低的
|
|
|
+ Set<Object> objects = redisUtil.ZRange(key, 0, -1);
|
|
|
+ ArrayList<Object> mediaServerObjectS = new ArrayList<>(objects);
|
|
|
+
|
|
|
+ String mediaServerId = (String)mediaServerObjectS.get(0);
|
|
|
+ return Mono.just(getOne(mediaServerId));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 对zlm服务器进行基础配置
|
|
|
+ * @param mediaServerItem 服务ID
|
|
|
+ * @param restart 是否重启zlm
|
|
|
+ */
|
|
|
+ public void setZLMConfig(MediaServerItem mediaServerItem, boolean restart) {
|
|
|
+ log.info("[ ZLM:{} ]-[ {}:{} ]设置zlm",
|
|
|
+ mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
|
|
|
+ String protocol = sslEnabled ? "https" : "http";
|
|
|
+ String hookPrex = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort);
|
|
|
+ String recordHookPrex = null;
|
|
|
+ if (mediaServerItem.getRecordAssistPort() != 0) {
|
|
|
+ recordHookPrex = String.format("http://127.0.0.1:%s/api/record", mediaServerItem.getRecordAssistPort());
|
|
|
+ }
|
|
|
+ Map<String, Object> param = new HashMap<>();
|
|
|
+ param.put("api.secret",mediaServerItem.getSecret()); // -profile:v Baseline
|
|
|
+ param.put("ffmpeg.cmd","%s -fflags nobuffer -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s");
|
|
|
+ param.put("hook.enable","1");
|
|
|
+ param.put("hook.on_flow_report","");
|
|
|
+ param.put("hook.on_play",String.format("%s/on_play", hookPrex));
|
|
|
+ param.put("hook.on_http_access","");
|
|
|
+ param.put("hook.on_publish", String.format("%s/on_publish", hookPrex));
|
|
|
+ param.put("hook.on_record_mp4",recordHookPrex != null? String.format("%s/on_record_mp4", recordHookPrex): "");
|
|
|
+ param.put("hook.on_record_ts","");
|
|
|
+ param.put("hook.on_rtsp_auth","");
|
|
|
+ param.put("hook.on_rtsp_realm","");
|
|
|
+ param.put("hook.on_server_started",String.format("%s/on_server_started", hookPrex));
|
|
|
+ param.put("hook.on_shell_login",String.format("%s/on_shell_login", hookPrex));
|
|
|
+ param.put("hook.on_stream_changed",String.format("%s/on_stream_changed", hookPrex));
|
|
|
+ param.put("hook.on_stream_none_reader",String.format("%s/on_stream_none_reader", hookPrex));
|
|
|
+ param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrex));
|
|
|
+ param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrex));
|
|
|
+ param.put("hook.timeoutSec","20");
|
|
|
+ param.put("general.streamNoneReaderDelayMS","-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() );
|
|
|
+
|
|
|
+ JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param);
|
|
|
+
|
|
|
+ if (responseJSON != null && responseJSON.getInt("code") == 0) {
|
|
|
+ if (restart) {
|
|
|
+ log.info("[ ZLM:{} ]-[ {}:{} ]设置zlm成功, 开始重启以保证配置生效",
|
|
|
+ mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
|
|
|
+ zlmresTfulUtils.restartServer(mediaServerItem);
|
|
|
+ }else {
|
|
|
+ log.info("[ ZLM:{} ]-[ {}:{} ]设置zlm成功",
|
|
|
+ mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ }else {
|
|
|
+ log.info("[ ZLM:{} ]-[ {}:{} ]设置zlm失败",
|
|
|
+ mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+//
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public WVPResult<MediaServerItem> checkMediaServer(String ip, int port, String secret) {
|
|
|
+// WVPResult<MediaServerItem> result = new WVPResult<>();
|
|
|
+// if (mediaServerMapper.queryOneByHostAndPort(ip, port) != null) {
|
|
|
+// result.setCode(-1);
|
|
|
+// result.setMsg("此连接已存在");
|
|
|
+// return result;
|
|
|
+// }
|
|
|
+// MediaServerItem mediaServerItem = new MediaServerItem();
|
|
|
+// mediaServerItem.setIp(ip);
|
|
|
+// mediaServerItem.setHttpPort(port);
|
|
|
+// mediaServerItem.setSecret(secret);
|
|
|
+// JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
|
|
|
+// if (responseJSON == null) {
|
|
|
+// result.setCode(-1);
|
|
|
+// result.setMsg("连接失败");
|
|
|
+// return result;
|
|
|
+// }
|
|
|
+// JSONArray data = responseJSON.getJSONArray("data");
|
|
|
+// ZLMServerConfig zlmServerConfig = JSON.parseObject(JSON.toJSONString(data.get(0)), ZLMServerConfig.class);
|
|
|
+// if (zlmServerConfig == null) {
|
|
|
+// result.setCode(-1);
|
|
|
+// result.setMsg("读取配置失败");
|
|
|
+// return result;
|
|
|
+// }
|
|
|
+// if (mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId()) != null) {
|
|
|
+// result.setCode(-1);
|
|
|
+// result.setMsg("媒体服务ID [" + zlmServerConfig.getGeneralMediaServerId() + " ] 已存在,请修改媒体服务器配置");
|
|
|
+// return result;
|
|
|
+// }
|
|
|
+// mediaServerItem.setHttpSSlPort(zlmServerConfig.getHttpPort());
|
|
|
+// mediaServerItem.setRtmpPort(zlmServerConfig.getRtmpPort());
|
|
|
+// mediaServerItem.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort());
|
|
|
+// mediaServerItem.setRtspPort(zlmServerConfig.getRtspPort());
|
|
|
+// mediaServerItem.setRtspSSLPort(zlmServerConfig.getRtspSSlport());
|
|
|
+// mediaServerItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort());
|
|
|
+// mediaServerItem.setStreamIp(ip);
|
|
|
+// mediaServerItem.setHookIp(sipConfig.getIp());
|
|
|
+// mediaServerItem.setSdpIp(ip);
|
|
|
+// mediaServerItem.setStreamNoneReaderDelayMS(zlmServerConfig.getGeneralStreamNoneReaderDelayMS());
|
|
|
+// result.setCode(0);
|
|
|
+// result.setMsg("成功");
|
|
|
+// result.setData(mediaServerItem);
|
|
|
+// return result;
|
|
|
+// }
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public boolean checkMediaRecordServer(String ip, int port) {
|
|
|
+// boolean result = false;
|
|
|
+// OkHttpClient client = new OkHttpClient();
|
|
|
+// String url = String.format("http://%s:%s/index/api/record", ip, port);
|
|
|
+//
|
|
|
+// FormBody.Builder builder = new FormBody.Builder();
|
|
|
+//
|
|
|
+// Request request = new Request.Builder()
|
|
|
+// .get()
|
|
|
+// .url(url)
|
|
|
+// .build();
|
|
|
+// try {
|
|
|
+// Response response = client.newCall(request).execute();
|
|
|
+// if (response != null) {
|
|
|
+// result = true;
|
|
|
+// }
|
|
|
+// } catch (Exception e) {}
|
|
|
+//
|
|
|
+// return result;
|
|
|
+// }
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public void delete(String id) {
|
|
|
+// redisUtil.zRemove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId(), id);
|
|
|
+// String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + id;
|
|
|
+// redisUtil.del(key);
|
|
|
+// }
|
|
|
+//
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 设置ZML服务器的心跳时间
|
|
|
+ * @param mediaServerId
|
|
|
+ * @param data
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public Mono<Void> updateMediaServerKeepalive(String mediaServerId, JSONObject data) {
|
|
|
+ return this.findById(mediaServerId)
|
|
|
+ .doOnNext(mediaServerItem -> {
|
|
|
+ String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX +serverId + "_" + mediaServerId;
|
|
|
+ int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2;
|
|
|
+ redisUtil.set(key, data, hookAliveInterval);
|
|
|
+ })
|
|
|
+ .then();
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<StreamInfo> getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks) {
|
|
|
+ return getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, null);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public Mono<StreamInfo> getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId, String addr) {
|
|
|
+ StreamInfo streamInfo = null;
|
|
|
+ return Mono.just(mediaServerId)
|
|
|
+ .flatMap(this::findById)
|
|
|
+ .switchIfEmpty(this.getDefaultMediaServer())
|
|
|
+ .flatMap(mediaServerItem -> {
|
|
|
+ JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaServerItem, app, stream);
|
|
|
+ if (mediaList != null) {
|
|
|
+ if (mediaList.getInt("code") == 0) {
|
|
|
+ JSONArray data = mediaList.getJSONArray("data");
|
|
|
+ if (data == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ JSONObject mediaJSON = JSONUtil.parseObj(JSONUtil.toJsonStr(data.get(0)));
|
|
|
+ JSONArray tracks = mediaJSON.getJSONArray("tracks");
|
|
|
+ return getStreamInfoByAppAndStream(mediaServerItem, app, stream, tracks);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return Mono.empty();
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ public Mono<StreamInfo> getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId) {
|
|
|
+ return getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, null);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public Mono<StreamInfo> getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr) {
|
|
|
+ StreamInfo streamInfoResult = new StreamInfo();
|
|
|
+ streamInfoResult.setStreamId(stream);
|
|
|
+ streamInfoResult.setApp(app);
|
|
|
+ if (addr == null) {
|
|
|
+ addr = mediaInfo.getStreamIp();
|
|
|
+ }
|
|
|
+ streamInfoResult.setMediaServerId(mediaInfo.getId());
|
|
|
+ streamInfoResult.setRtmp(String.format("rtmp://%s:%s/%s/%s", addr, mediaInfo.getRtmpPort(), app, stream));
|
|
|
+ if (mediaInfo.getRtmpSSlPort() != 0) {
|
|
|
+ streamInfoResult.setRtmps(String.format("rtmps://%s:%s/%s/%s", addr, mediaInfo.getRtmpSSlPort(), app, stream));
|
|
|
+ }
|
|
|
+ streamInfoResult.setRtsp(String.format("rtsp://%s:%s/%s/%s", addr, mediaInfo.getRtspPort(), app, stream));
|
|
|
+ if (mediaInfo.getRtspSSLPort() != 0) {
|
|
|
+ streamInfoResult.setRtsps(String.format("rtsps://%s:%s/%s/%s", addr, mediaInfo.getRtspSSLPort(), app, stream));
|
|
|
+ }
|
|
|
+ streamInfoResult.setFlv(String.format("http://%s:%s/%s/%s.flv", addr, mediaInfo.getHttpPort(), app, stream));
|
|
|
+ streamInfoResult.setWs_flv(String.format("ws://%s:%s/%s/%s.flv", addr, mediaInfo.getHttpPort(), app, stream));
|
|
|
+ streamInfoResult.setHls(String.format("http://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpPort(), app, stream));
|
|
|
+ streamInfoResult.setWs_hls(String.format("ws://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpPort(), app, stream));
|
|
|
+ streamInfoResult.setFmp4(String.format("http://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpPort(), app, stream));
|
|
|
+ streamInfoResult.setWs_fmp4(String.format("ws://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpPort(), app, stream));
|
|
|
+ streamInfoResult.setTs(String.format("http://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpPort(), app, stream));
|
|
|
+ streamInfoResult.setWs_ts(String.format("ws://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpPort(), app, stream));
|
|
|
+ if (mediaInfo.getHttpSSlPort() != 0) {
|
|
|
+ streamInfoResult.setHttps_flv(String.format("https://%s:%s/%s/%s.flv", addr, mediaInfo.getHttpSSlPort(), app, stream));
|
|
|
+ streamInfoResult.setWss_flv(String.format("wss://%s:%s/%s/%s.flv", addr, mediaInfo.getHttpSSlPort(), app, stream));
|
|
|
+ streamInfoResult.setHttps_hls(String.format("https://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpSSlPort(), app, stream));
|
|
|
+ streamInfoResult.setWss_hls(String.format("wss://%s:%s/%s/%s/hls.m3u8", addr, mediaInfo.getHttpSSlPort(), app, stream));
|
|
|
+ streamInfoResult.setHttps_fmp4(String.format("https://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpSSlPort(), app, stream));
|
|
|
+ streamInfoResult.setWss_fmp4(String.format("wss://%s:%s/%s/%s.live.mp4", addr, mediaInfo.getHttpSSlPort(), app, stream));
|
|
|
+ streamInfoResult.setHttps_ts(String.format("https://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpSSlPort(), app, stream));
|
|
|
+ streamInfoResult.setWss_ts(String.format("wss://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpSSlPort(), app, stream));
|
|
|
+ streamInfoResult.setWss_ts(String.format("wss://%s:%s/%s/%s.live.ts", addr, mediaInfo.getHttpSSlPort(), app, stream));
|
|
|
+ streamInfoResult.setRtc(String.format("https://%s:%s/index/api/webrtc?app=%s&stream=%s&type=play", mediaInfo.getStreamIp(), mediaInfo.getHttpSSlPort(), app, stream));
|
|
|
+ }
|
|
|
+
|
|
|
+ streamInfoResult.setTracks(tracks);
|
|
|
+ return Mono.just(streamInfoResult);
|
|
|
+ }
|
|
|
+
|
|
|
+}
|