diff --git a/src/main/java/com/windhoverlabs/com/video/VideoDataLink.java b/src/main/java/com/windhoverlabs/com/video/VideoDataLink.java index 1f906f3..ace0866 100644 --- a/src/main/java/com/windhoverlabs/com/video/VideoDataLink.java +++ b/src/main/java/com/windhoverlabs/com/video/VideoDataLink.java @@ -66,8 +66,8 @@ public void init(String instance, String name, YConfiguration config) public void doStart() { if (!isDisabled()) { try { -// readVideo(); - streamVideoOverRTP(); + // readVideo(); + streamVideoOverRTP(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); @@ -97,17 +97,18 @@ public void doStop() { private void streamVideoOverRTP() throws IOException { System.out.println("Starting video streaming over RTP..."); - + av_log_set_level(AV_LOG_DEBUG); - -// av_log_set_callback(); - - avformat_network_init(); -// avdevice_register_all(); + + // av_log_set_callback(); + + avformat_network_init(); + // avdevice_register_all(); int ret, v_stream_idx = -1; String inputFile = "/home/lgomez/Downloads/217115_small.mp4"; - inputFile = "/home/lgomez/Downloads/vecteezy_vancouver-canada-september-16-2023-flight-by-fpv-drone_37202565.mp4"; + inputFile = + "/home/lgomez/Downloads/vecteezy_vancouver-canada-september-16-2023-flight-by-fpv-drone_37202565.mp4"; String outputURL = "rtp://127.0.0.1:5005"; AVFormatContext inputCtx = avformat_alloc_context(); @@ -148,8 +149,7 @@ private void streamVideoOverRTP() throws IOException { System.out.println("Allocating codec context..."); AVCodecContext decoderCtx = avcodec_alloc_context3(codec); - - + avcodec_parameters_to_context(decoderCtx, inputStream.codecpar()); avcodec_open2(decoderCtx, codec, (PointerPointer) null); @@ -159,94 +159,167 @@ private void streamVideoOverRTP() throws IOException { // throw new IOException("Failed to create RTP output context"); // } - if (avformat_alloc_output_context2(outputCtx, null, "rtp", outputURL) < 0) { + if (avformat_alloc_output_context2(outputCtx, null, "rtp_mpegts", outputURL) < 0) { throw new IOException("Failed to create RTP output context"); } - - + + // Codec = avcodec_find_encoder_by_name(Config->Name); AVStream outputStream = avformat_new_stream(outputCtx, codec); if (outputStream == null) { throw new IOException("Failed to create output stream"); } - + AVRational q = new AVRational(); - + q.num(1); q.den(30); System.out.println("Configuring encoder..."); - AVCodecContext encoderCtx = avcodec_alloc_context3(codec); + + System.out.println("codec from input:" +codec.name().getString()); + + +// av_codec_iterate(); + + + AVCodec encoderCodec = avcodec_find_encoder_by_name("libx264"); + +// AVCodec encoderCodec = avcodec_find_encoder_by_name("h264"); + + if(encoderCodec == null) + { + throw new IOException("h264 codec not found"); + } + + AVCodecContext encoderCtx = avcodec_alloc_context3(encoderCodec); System.out.println("Configuring encoder2..."); - System.out.println("codec.id()-->" + codec.id()); + System.out.println("codec.id()-->" + encoderCodec.id()); System.out.println("codec.id()-->" + decoderCtx.width()); System.out.println("codec.id()-->" + decoderCtx.height()); System.out.println("inputStream.time_num()-->" + inputStream.time_base().num()); System.out.println("inputStream.time_den()-->" + inputStream.time_base().den()); System.out.println("av_inv_q(inputStream.time_base())-->" + av_inv_q(inputStream.time_base())); - - - -// Context->width = Config->Width; -// Context->height = Config->Height; -// Context->time_base = (AVRational){1, (int)Config->FramesPerSecond}; -// Context->pix_fmt = Config->PixelFormat; -// Context->framerate = (AVRational){(int)Config->FramesPerSecond, 1}; -// Context->bit_rate = Config->BitRate; -// Context->gop_size = Config->GopSize; -// Context->max_b_frames = Config->MaxBFrames; -// Context->gop_size = Config->GopSize; -// Context->flags |= Config->Flags; - - encoderCtx.codec_id(codec.id()); + + // Context->width = Config->Width; + // Context->height = Config->Height; + // Context->time_base = (AVRational){1, (int)Config->FramesPerSecond}; + // Context->pix_fmt = Config->PixelFormat; + // Context->framerate = (AVRational){(int)Config->FramesPerSecond, 1}; + // Context->bit_rate = Config->BitRate; + // Context->gop_size = Config->GopSize; + // Context->max_b_frames = Config->MaxBFrames; + // Context->gop_size = Config->GopSize; + // Context->flags |= Config->Flags; + + encoderCtx.codec_id(encoderCodec.id()); System.out.println("Configuring encoder3..."); encoderCtx.codec_type(AVMEDIA_TYPE_VIDEO); encoderCtx.pix_fmt(AV_PIX_FMT_YUV420P); System.out.println("Configuring encoder4..."); -// encoderCtx.width(decoderCtx.width()); -// encoderCtx.height(decoderCtx.height()); -// + // encoderCtx.width(decoderCtx.width()); + // encoderCtx.height(decoderCtx.height()); + // encoderCtx.width(640); encoderCtx.height(480); System.out.println("Configuring encoder5..."); -// encoderCtx.time_base(av_inv_q(inputStream.time_base())); + // encoderCtx.time_base(av_inv_q(inputStream.time_base())); encoderCtx.time_base(q); AVRational fr = new AVRational(); - + fr.den(1); fr.num(30); - encoderCtx.framerate(q); + encoderCtx.framerate(fr); System.out.println("Configuring encoder6..."); encoderCtx.bit_rate(400000); encoderCtx.gop_size(30); encoderCtx.max_b_frames(0); + System.out.println("Configuring encoder2..."); + System.out.println("output codec.id()-->" + codec.id()); + System.out.println("output codec.id()-->" + encoderCtx.width()); + System.out.println("output codec.id()-->" + encoderCtx.height()); + System.out.println("output inputStream.time_num()-->" + outputStream.time_base().num()); + System.out.println("output inputStream.time_den()-->" + outputStream.time_base().den()); + System.out.println( + "output av_inv_q(inputStream.time_base())-->" + av_inv_q(inputStream.time_base())); + // if (avcodec_open2(encoderCtx, codec, (PointerPointer) null) < 0) { // throw new IOException("Failed to open encoder"); // } -// if (avcodec_open2(encoderCtx, codec, new PointerPointer()) < 0) { -// throw new IOException("Failed to open encoder"); -// } - - + // if (avcodec_open2(encoderCtx, codec, new PointerPointer()) < 0) { + // throw new IOException("Failed to open encoder"); + // } + AVBufferRef HWAccelDeviceContext = new AVBufferRef(null); - int avRC = av_hwdevice_ctx_create( - HWAccelDeviceContext, - AV_HWDEVICE_TYPE_CUDA, - "", - new AVDictionary(null), - 0); + int avRC = + av_hwdevice_ctx_create( + HWAccelDeviceContext, AV_HWDEVICE_TYPE_CUDA, "0", new AVDictionary(null), 0); + + // if (avRC < 0) { + // ReportAVError("CEncoder::Initialize", "av_hwdevice_ctx_create", avRC, __LINE__); + // } + // else + // { + // /* Select the first supported hardware pixel format */ + // for (int i = 0; Context->codec->hw_configs[i]; i++) { + // const struct AVCodecHWConfig *config = avcodec_get_hw_config(Codec, i); + // if (config->device_type == Config->HWAccelDeviceCfg.DeviceType) + // { + // Context->hw_device_ctx = av_buffer_ref(HWAccelDeviceContext); + // if (!Context->hw_device_ctx) + // { + // ReportError("CDecoder::Initialize", "av_hwdevice_ctx_create", __LINE__, "Unable to enable + // hardware acceleration for the decoder."); + // } + // Context->get_format = MMC_GetHwFormat; + // break; + // } + // } + // } + + // if (avRC < 0) { + // ReportAVError("CEncoder::Initialize", "av_hwdevice_ctx_create", avRC, __LINE__); + // } + // else + { + /* Select the first supported hardware pixel format */ + // for (int i = 0;encoderCtx.codec(). ; i++) { + // const struct AVCodecHWConfig *config = avcodec_get_hw_config(Codec, i); + // if (config->device_type == Config->HWAccelDeviceCfg.DeviceType) + // { + // Context->hw_device_ctx = av_buffer_ref(HWAccelDeviceContext); + // if (!Context->hw_device_ctx) + // { + // ReportError("CDecoder::Initialize", "av_hwdevice_ctx_create", __LINE__, "Unable to + // enable hardware acceleration for the decoder."); + // } + // Context->get_format = MMC_GetHwFormat; + // break; + // } + // } + } + +// if (avcodec_get_hw_config(encoderCodec, 0) == null) +// { +// throw new IOException("avcodec_get_hw_configq failed"); +// } + + if (avcodec_parameters_from_context(outputStream.codecpar(), encoderCtx) < 0) { + throw new IOException("avcodec_parameters_from_context failed"); + } + System.out.println("outputStream.time_num()--> after context params" + outputStream.time_base().num()); + System.out.println("outputStream.time_den()--> after context params" + outputStream.time_base().den()); + System.out.println("av_inv_q(outputStream.time_base())--> after context params" + av_inv_q(outputStream.time_base())); - avcodec_parameters_from_context(outputStream.codecpar(), encoderCtx); - if (avcodec_open2(encoderCtx, codec, new PointerPointer()) < 0) { - throw new IOException("Failed to open encoder"); - } + if (avcodec_open2(encoderCtx, encoderCodec, new PointerPointer()) < 0) { + throw new IOException("Failed to open encoder"); + } System.out.println("Configuring encoder7..."); - System.out.println("Configuring encoder8..."); // if (avio_open2(outputCtx.pb(), outputURL, AVIO_FLAG_WRITE, null, null) < 0) { @@ -258,15 +331,15 @@ private void streamVideoOverRTP() throws IOException { AVIOContext pb = new AVIOContext(null); -// if (avio_open(pb, outputURL, AVIO_FLAG_WRITE) < 0) { -// throw new IOException("Failed to open RTP output"); -// } + // if (avio_open(pb, outputURL, AVIO_FLAG_WRITE) < 0) { + // throw new IOException("Failed to open RTP output"); + // } AVDictionary options = new AVDictionary(null); if (avio_open2(pb, outputURL, AVIO_FLAG_WRITE, null, options) < 0) { - throw new IOException("Failed to open RTP output"); - } - -// avio_open2(pb, filename, AVIO_FLAG_WRITE, null, options) + throw new IOException("Failed to open RTP output"); + } + + // avio_open2(pb, filename, AVIO_FLAG_WRITE, null, options) outputCtx.pb(pb); @@ -301,6 +374,8 @@ private void streamVideoOverRTP() throws IOException { outPacket.stream_index(outputStream.index()); av_write_frame(outputCtx, outPacket); av_packet_unref(outPacket); + + recv_packets = avcodec_receive_packet(encoderCtx, outPacket); } } System.out.println("ret for avcodec_receive_frame1 -->" + ret); diff --git a/src/main/java/com/windhoverlabs/yamcs/archive/ReplayVideoService.java b/src/main/java/com/windhoverlabs/yamcs/archive/ReplayVideoService.java index 53d99e2..27f09a6 100644 --- a/src/main/java/com/windhoverlabs/yamcs/archive/ReplayVideoService.java +++ b/src/main/java/com/windhoverlabs/yamcs/archive/ReplayVideoService.java @@ -1,11 +1,11 @@ package com.windhoverlabs.yamcs.archive; +import com.google.protobuf.util.JsonFormat; import java.io.IOException; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; - import org.yamcs.AbstractProcessorService; import org.yamcs.ConfigurationException; import org.yamcs.InvalidIdentification; @@ -26,16 +26,19 @@ import org.yamcs.cmdhistory.CommandHistoryProvider; import org.yamcs.cmdhistory.CommandHistoryRequestManager; import org.yamcs.commanding.PreparedCommand; +import org.yamcs.mdb.Mdb; +import org.yamcs.mdb.MdbFactory; import org.yamcs.mdb.ParameterTypeProcessor; import org.yamcs.mdb.ProcessingData; import org.yamcs.mdb.Subscription; -import org.yamcs.mdb.MdbFactory; import org.yamcs.mdb.XtceTmProcessor; import org.yamcs.parameter.ParameterProcessor; import org.yamcs.parameter.ParameterProcessorManager; import org.yamcs.parameter.ParameterProvider; import org.yamcs.parameter.ParameterValue; import org.yamcs.parameter.ParameterWithIdRequestHelper; +import org.yamcs.parameter.SystemParametersProducer; +import org.yamcs.parameter.SystemParametersService; import org.yamcs.protobuf.Commanding.CommandHistoryEntry; import org.yamcs.protobuf.Yamcs.CommandHistoryReplayRequest; import org.yamcs.protobuf.Yamcs.EndAction; @@ -53,411 +56,450 @@ import org.yamcs.tctm.ArchiveTmPacketProvider; import org.yamcs.xtce.Parameter; import org.yamcs.xtce.SequenceContainer; -import org.yamcs.mdb.Mdb; import org.yamcs.yarch.protobuf.Db.Event; import org.yamcs.yarch.protobuf.Db.ProtoDataType; -import com.google.protobuf.util.JsonFormat; - -/** - * Provides telemetry packets and processed parameters from the yamcs archive. - * - */ +/** Provides telemetry packets and processed parameters from the yamcs archive. */ public class ReplayVideoService extends AbstractProcessorService - implements ReplayListener, ArchiveTmPacketProvider, ParameterProvider, CommandHistoryProvider { - static final long TIMEOUT = 10000; - - EndAction endAction; - - ReplayOptions originalReplayRequest; - private HashSet subscribedParameters = new HashSet<>(); - private ParameterProcessorManager parameterProcessorManager; - TmProcessor tmProcessor; - Mdb mdb; - - YarchReplay yarchReplay; - // the originalReplayRequest contains possibly only parameters. - // the modified one sent to the ReplayServer contains the raw data required for extracting/processing those - // parameters - ReplayOptions rawDataRequest; - CommandHistoryRequestManager commandHistoryRequestManager; - - private SecurityStore securityStore; - - // this can be set in the config (in processor.yaml) to exclude certain parameter groups from replay - List excludeParameterGroups = null; - - @Override - public void init(Processor proc, YConfiguration args, Object spec) { - super.init(proc, args, spec); - mdb = MdbFactory.getInstance(getYamcsInstance()); - securityStore = YamcsServer.getServer().getSecurityStore(); - if (args.containsKey("excludeParameterGroups")) { - excludeParameterGroups = args.getList("excludeParameterGroups"); - } - this.tmProcessor = proc.getTmProcessor(); - parameterProcessorManager = proc.getParameterProcessorManager(); - proc.setPacketProvider(this); - parameterProcessorManager.addParameterProvider(this); - - if (spec instanceof ReplayOptions) { - originalReplayRequest = (ReplayOptions) spec; - } else if (spec instanceof String) { - ReplayRequest.Builder rrb = ReplayRequest.newBuilder(); - try { - JsonFormat.parser().merge((String) spec, rrb); - } catch (IOException e) { - throw new ConfigurationException("Cannot parse config into a replay request: " + e.getMessage(), e); - } - if (!rrb.hasSpeed()) { - rrb.setSpeed(ReplaySpeed.newBuilder().setType(ReplaySpeedType.REALTIME).setParam(1)); - } - originalReplayRequest = new ReplayOptions(rrb.build()); - } else if (spec == null) { // For example, created by ProcessorCreatorService - originalReplayRequest = new ReplayOptions(); - originalReplayRequest.setSpeed(new SpeedSpec(SpeedSpec.Type.ORIGINAL, 1)); - originalReplayRequest.setEndAction(EndAction.STOP); - originalReplayRequest.setAutostart(false); - } else { - throw new IllegalArgumentException("Unknown spec of type " + spec.getClass()); - } - } - - @Override - public boolean isArchiveReplay() { - return true; + implements ReplayListener, + ArchiveTmPacketProvider, + ParameterProvider, + CommandHistoryProvider, + SystemParametersProducer { + static final long TIMEOUT = 10000; + + EndAction endAction; + + ReplayOptions originalReplayRequest; + private HashSet subscribedParameters = new HashSet<>(); + private ParameterProcessorManager parameterProcessorManager; + TmProcessor tmProcessor; + Mdb mdb; + + YarchReplay yarchReplay; + // the originalReplayRequest contains possibly only parameters. + // the modified one sent to the ReplayServer contains the raw data required for + // extracting/processing those + // parameters + ReplayOptions rawDataRequest; + CommandHistoryRequestManager commandHistoryRequestManager; + + private SecurityStore securityStore; + + // this can be set in the config (in processor.yaml) to exclude certain parameter groups from + // replay + List excludeParameterGroups = null; + + private Parameter spVideoFrameSentCount; + + private SystemParametersService sysParamService; + + @Override + public void init(Processor proc, YConfiguration args, Object spec) { + super.init(proc, args, spec); + mdb = MdbFactory.getInstance(getYamcsInstance()); + securityStore = YamcsServer.getServer().getSecurityStore(); + if (args.containsKey("excludeParameterGroups")) { + excludeParameterGroups = args.getList("excludeParameterGroups"); } - - @Override - public void newData(ProtoDataType type, Object data) { - switch (type) { - case TM_PACKET: - ReplayPacket rp = (ReplayPacket) data; - String qn = rp.getQualifiedName(); - SequenceContainer container = mdb.getSequenceContainer(qn); - if (container == null) { - log.warn("Unknown sequence container '" + qn + "' found when replaying", qn); - } else { - SequenceContainer parent; - while ((parent = container.getBaseContainer()) != null) { - container = parent; - } - - tmProcessor.processPacket(new TmPacket(rp.getReceptionTime(), rp.getGenerationTime(), - rp.getSequenceNumber(), rp.getPacket()), container); - } - -// TODO:Send video frames to client(s) - break; - case PP: - @SuppressWarnings("unchecked") - List pvals = (List) data; - if (!pvals.isEmpty()) { - ProcessingData processingData = ProcessingData.createForTmProcessing(processor.getLastValueCache()); - calibrate(pvals, processingData); - parameterProcessorManager.process(processingData); - } - break; - case CMD_HISTORY: - CommandHistoryEntry che = (CommandHistoryEntry) data; - commandHistoryRequestManager.addCommand(PreparedCommand.fromCommandHistoryEntry(che)); - break; - case EVENT: - Event evt = (Event) data; - break; - default: - log.error("Unexpected data type {} received", type); - } + this.tmProcessor = proc.getTmProcessor(); + parameterProcessorManager = proc.getParameterProcessorManager(); + proc.setPacketProvider(this); + parameterProcessorManager.addParameterProvider(this); + + if (spec instanceof ReplayOptions) { + originalReplayRequest = (ReplayOptions) spec; + } else if (spec instanceof String) { + ReplayRequest.Builder rrb = ReplayRequest.newBuilder(); + try { + JsonFormat.parser().merge((String) spec, rrb); + } catch (IOException e) { + throw new ConfigurationException( + "Cannot parse config into a replay request: " + e.getMessage(), e); + } + if (!rrb.hasSpeed()) { + rrb.setSpeed(ReplaySpeed.newBuilder().setType(ReplaySpeedType.REALTIME).setParam(1)); + } + originalReplayRequest = new ReplayOptions(rrb.build()); + } else if (spec == null) { // For example, created by ProcessorCreatorService + originalReplayRequest = new ReplayOptions(); + originalReplayRequest.setSpeed(new SpeedSpec(SpeedSpec.Type.ORIGINAL, 1)); + originalReplayRequest.setEndAction(EndAction.STOP); + originalReplayRequest.setAutostart(false); + } else { + throw new IllegalArgumentException("Unknown spec of type " + spec.getClass()); } - private void calibrate(List pvlist, ProcessingData processingData) { - ParameterTypeProcessor ptypeProcessor = processor.getProcessorData().getParameterTypeProcessor(); - - for (ParameterValue pv : pvlist) { - if (pv.getEngValue() == null && pv.getRawValue() != null) { - ptypeProcessor.calibrate(processingData, pv); - } - processingData.addTmParam(pv); - } - } - - @Override - public void stateChanged(ReplayStatus rs) { - if (rs.getState() == ReplayState.CLOSED) { - log.debug("End signal received"); - notifyStopped(); - tmProcessor.finished(); + sysParamService = SystemParametersService.getInstance(getYamcsInstance()); + + // spVideoFrameSentCount = sysParamService.createSystemParameter(LINK_NAMESPACE + linkName + + // "/dataOutCount", Type.UINT64, + // "The total number of items (e.g. telecommand packets) that have been sent through + // this link"); + } + + @Override + public boolean isArchiveReplay() { + return true; + } + + @Override + public void newData(ProtoDataType type, Object data) { + switch (type) { + case TM_PACKET: + ReplayPacket rp = (ReplayPacket) data; + String qn = rp.getQualifiedName(); + SequenceContainer container = mdb.getSequenceContainer(qn); + if (container == null) { + log.warn("Unknown sequence container '" + qn + "' found when replaying", qn); } else { - processor.notifyStateChange(); + SequenceContainer parent; + while ((parent = container.getBaseContainer()) != null) { + container = parent; + } + + tmProcessor.processPacket( + new TmPacket( + rp.getReceptionTime(), + rp.getGenerationTime(), + rp.getSequenceNumber(), + rp.getPacket()), + container); } - } - @Override - public void doStop() { - if (yarchReplay != null) { - yarchReplay.quit(); + // TODO:Send video frames to client(s) + + break; + case PP: + @SuppressWarnings("unchecked") + List pvals = (List) data; + if (!pvals.isEmpty()) { + ProcessingData processingData = + ProcessingData.createForTmProcessing(processor.getLastValueCache()); + calibrate(pvals, processingData); + parameterProcessorManager.process(processingData); } - notifyStopped(); + break; + case CMD_HISTORY: + CommandHistoryEntry che = (CommandHistoryEntry) data; + commandHistoryRequestManager.addCommand(PreparedCommand.fromCommandHistoryEntry(che)); + break; + case EVENT: + Event evt = (Event) data; + break; + default: + log.error("Unexpected data type {} received", type); } + } - // Create rawDataRequest from originalReplayRequest by finding out all raw data (TM and PP) required to provide the - // needed parameters. The raw request must not contain parameters but only TM or PP. - // - // in order to do this, the method addPacketsRequiredForParams will subscribe to all parameters part of the original - // request, then check in the tmProcessor subscription which containers are needed and in the subscribedParameters - // which PPs may be required - private void createRawSubscription() throws YamcsException { - - boolean replayAll = originalReplayRequest.isReplayAll(); - - Set ppRecFilter = new HashSet<>(); - if (replayAll) { - rawDataRequest = new ReplayOptions(originalReplayRequest); - rawDataRequest.setPacketRequest(PacketReplayRequest.newBuilder().build()); - rawDataRequest.setEventRequest(EventReplayRequest.newBuilder().build()); - rawDataRequest.setPpRequest(PpReplayRequest.newBuilder().build()); - rawDataRequest.setCommandHistoryRequest(CommandHistoryReplayRequest.newBuilder().build()); - } else { - rawDataRequest = new ReplayOptions(originalReplayRequest); - rawDataRequest.clearParameterRequest(); - addPacketsRequiredForParams(); - - // addPacketsRequiredForParams above has caused the parameter request manager to populate the - // subscribedParameters set; in case we do not have to retrieve all parameters, create a pp filter such that - // only the required pps are replayed - if (!originalReplayRequest.isReplayAllParameters()) { - for (Parameter p : subscribedParameters) { - ppRecFilter.add(p.getRecordingGroup()); - } - } - } - - if (ppRecFilter.isEmpty() && excludeParameterGroups == null) { - log.debug("No additional pp group added or removed to/from the subscription"); - } else { - PpReplayRequest ppreq = originalReplayRequest.getPpRequest(); - PpReplayRequest.Builder pprr = ppreq.toBuilder(); - pprr.addAllGroupNameFilter(ppRecFilter); - if (excludeParameterGroups != null) { - pprr.addAllGroupNameExclude(excludeParameterGroups); - } - rawDataRequest.setPpRequest(pprr.build()); + private void calibrate(List pvlist, ProcessingData processingData) { + ParameterTypeProcessor ptypeProcessor = + processor.getProcessorData().getParameterTypeProcessor(); - } - if (!rawDataRequest.hasPacketRequest() && !rawDataRequest.hasPpRequest()) { - if (originalReplayRequest.hasParameterRequest()) { - throw new YamcsException("Cannot find a replay source for any parmeters from request: " - + originalReplayRequest.getParameterRequest().toString()); - } else { - throw new YamcsException("Refusing to create an empty replay request"); - } - } + for (ParameterValue pv : pvlist) { + if (pv.getEngValue() == null && pv.getRawValue() != null) { + ptypeProcessor.calibrate(processingData, pv); + } + processingData.addTmParam(pv); } - - private void addPacketsRequiredForParams() throws YamcsException { - List plist = originalReplayRequest.getParameterRequest().getNameFilterList(); - if (plist.isEmpty()) { - return; - } - ParameterWithIdRequestHelper pidrm = new ParameterWithIdRequestHelper( - parameterProcessorManager.getParameterRequestManager(), - (subscriptionId, params) -> { - // ignore data, we create this subscription just to get the list of - // dependent containers and PPs - }); - int subscriptionId; - try { - subscriptionId = pidrm.addRequest(plist, securityStore.getSystemUser()); - } catch (InvalidIdentification e) { - NamedObjectList nol = NamedObjectList.newBuilder().addAllList(e.getInvalidParameters()).build(); - throw new YamcsException("InvalidIdentification", "Invalid identification", nol); - } catch (NoPermissionException e) { - throw new IllegalStateException("Unexpected No permission"); - } - - XtceTmProcessor tmproc = processor.getTmProcessor(); - Subscription subscription = tmproc.getSubscription(); - Collection containers = subscription.getContainers(); - - if ((containers == null) || (containers.isEmpty())) { - log.debug("No container required for the parameter subscription"); - } else { - PacketReplayRequest.Builder rawPacketRequest = originalReplayRequest.getPacketRequest().toBuilder(); - - for (SequenceContainer sc : containers) { - rawPacketRequest.addNameFilter(NamedObjectId.newBuilder().setName(sc.getQualifiedName()).build()); - } - log.debug("after TM subscription, the request contains the following packets: " - + rawPacketRequest.getNameFilterList()); - rawDataRequest.setPacketRequest(rawPacketRequest.build()); - } - pidrm.removeRequest(subscriptionId); + } + + @Override + public void stateChanged(ReplayStatus rs) { + if (rs.getState() == ReplayState.CLOSED) { + log.debug("End signal received"); + notifyStopped(); + tmProcessor.finished(); + } else { + processor.notifyStateChange(); } + } - private void createReplay() throws ProcessorException { - ReplayServer replayServer = YamcsServer.getServer().getService(getYamcsInstance(), ReplayServer.class); - if (replayServer == null) { - throw new ProcessorException("ReplayServer not configured for this instance"); - } - try { - yarchReplay = replayServer.createReplay(rawDataRequest, this); - } catch (YamcsException e) { - log.error("Exception creating the replay", e); - throw new ProcessorException("Exception creating the replay: " + e.getMessage(), e); - } + @Override + public void doStop() { + if (yarchReplay != null) { + yarchReplay.quit(); } - - @Override - public void doStart() { - try { - createRawSubscription(); - createReplay(); - } catch (YamcsException e) { - notifyFailed(e); - return; - } - - if (originalReplayRequest.isAutostart()) { - yarchReplay.start(); + notifyStopped(); + } + + // Create rawDataRequest from originalReplayRequest by finding out all raw data (TM and PP) + // required to provide the + // needed parameters. The raw request must not contain parameters but only TM or PP. + // + // in order to do this, the method addPacketsRequiredForParams will subscribe to all parameters + // part of the original + // request, then check in the tmProcessor subscription which containers are needed and in the + // subscribedParameters + // which PPs may be required + private void createRawSubscription() throws YamcsException { + + boolean replayAll = originalReplayRequest.isReplayAll(); + + Set ppRecFilter = new HashSet<>(); + if (replayAll) { + rawDataRequest = new ReplayOptions(originalReplayRequest); + rawDataRequest.setPacketRequest(PacketReplayRequest.newBuilder().build()); + rawDataRequest.setEventRequest(EventReplayRequest.newBuilder().build()); + rawDataRequest.setPpRequest(PpReplayRequest.newBuilder().build()); + rawDataRequest.setCommandHistoryRequest(CommandHistoryReplayRequest.newBuilder().build()); + } else { + rawDataRequest = new ReplayOptions(originalReplayRequest); + rawDataRequest.clearParameterRequest(); + addPacketsRequiredForParams(); + + // addPacketsRequiredForParams above has caused the parameter request manager to populate the + // subscribedParameters set; in case we do not have to retrieve all parameters, create a pp + // filter such that + // only the required pps are replayed + if (!originalReplayRequest.isReplayAllParameters()) { + for (Parameter p : subscribedParameters) { + ppRecFilter.add(p.getRecordingGroup()); } - notifyStarted(); + } } - @Override - public void pause() { - yarchReplay.pause(); + if (ppRecFilter.isEmpty() && excludeParameterGroups == null) { + log.debug("No additional pp group added or removed to/from the subscription"); + } else { + PpReplayRequest ppreq = originalReplayRequest.getPpRequest(); + PpReplayRequest.Builder pprr = ppreq.toBuilder(); + pprr.addAllGroupNameFilter(ppRecFilter); + if (excludeParameterGroups != null) { + pprr.addAllGroupNameExclude(excludeParameterGroups); + } + rawDataRequest.setPpRequest(pprr.build()); } - - @Override - public void resume() { - yarchReplay.start(); + if (!rawDataRequest.hasPacketRequest() && !rawDataRequest.hasPpRequest()) { + if (originalReplayRequest.hasParameterRequest()) { + throw new YamcsException( + "Cannot find a replay source for any parmeters from request: " + + originalReplayRequest.getParameterRequest().toString()); + } else { + throw new YamcsException("Refusing to create an empty replay request"); + } } + } - @Override - public void seek(long time, boolean autostart) { - try { - yarchReplay.seek(time, autostart); - } catch (YamcsException e) { - throw new RuntimeException(e); - } + private void addPacketsRequiredForParams() throws YamcsException { + List plist = originalReplayRequest.getParameterRequest().getNameFilterList(); + if (plist.isEmpty()) { + return; } - - @Override - public void setParameterProcessor(ParameterProcessor ppm) { - this.parameterProcessorManager = (ParameterProcessorManager) ppm; + ParameterWithIdRequestHelper pidrm = + new ParameterWithIdRequestHelper( + parameterProcessorManager.getParameterRequestManager(), + (subscriptionId, params) -> { + // ignore data, we create this subscription just to get the list of + // dependent containers and PPs + }); + int subscriptionId; + try { + subscriptionId = pidrm.addRequest(plist, securityStore.getSystemUser()); + } catch (InvalidIdentification e) { + NamedObjectList nol = + NamedObjectList.newBuilder().addAllList(e.getInvalidParameters()).build(); + throw new YamcsException("InvalidIdentification", "Invalid identification", nol); + } catch (NoPermissionException e) { + throw new IllegalStateException("Unexpected No permission"); } - @Override - public void startProviding(Parameter paramDef) { - // the subscribedParameters is used at the beginning to select the PP parameters which have to be subscribed - synchronized (subscribedParameters) { - subscribedParameters.add(paramDef); - } + XtceTmProcessor tmproc = processor.getTmProcessor(); + Subscription subscription = tmproc.getSubscription(); + Collection containers = subscription.getContainers(); + + if ((containers == null) || (containers.isEmpty())) { + log.debug("No container required for the parameter subscription"); + } else { + PacketReplayRequest.Builder rawPacketRequest = + originalReplayRequest.getPacketRequest().toBuilder(); + + for (SequenceContainer sc : containers) { + rawPacketRequest.addNameFilter( + NamedObjectId.newBuilder().setName(sc.getQualifiedName()).build()); + } + log.debug( + "after TM subscription, the request contains the following packets: " + + rawPacketRequest.getNameFilterList()); + rawDataRequest.setPacketRequest(rawPacketRequest.build()); } - - @Override - public void startProvidingAll() { - // ignore as we always provide all parameters + pidrm.removeRequest(subscriptionId); + } + + private void createReplay() throws ProcessorException { + ReplayServer replayServer = + YamcsServer.getServer().getService(getYamcsInstance(), ReplayServer.class); + if (replayServer == null) { + throw new ProcessorException("ReplayServer not configured for this instance"); } - - @Override - public void stopProviding(Parameter paramDef) { - synchronized (subscribedParameters) { - subscribedParameters.remove(paramDef); - } + try { + yarchReplay = replayServer.createReplay(rawDataRequest, this); + } catch (YamcsException e) { + log.error("Exception creating the replay", e); + throw new ProcessorException("Exception creating the replay: " + e.getMessage(), e); } - - @Override - public boolean canProvide(NamedObjectId id) { - boolean result = false; - Parameter p = mdb.getParameter(id); - if (p != null) { - result = canProvide(p); - } else { // check if it's system parameter - if (Mdb.isSystemParameter(id)) { - result = true; - } - } - return result; + } + + @Override + public void doStart() { + try { + createRawSubscription(); + createReplay(); + } catch (YamcsException e) { + notifyFailed(e); + return; } - @Override - public boolean canProvide(Parameter p) { - boolean result; - if (mdb.getParameterEntries(p) != null) { - result = false; - } else { - result = true; - } - return result; + if (originalReplayRequest.isAutostart()) { + yarchReplay.start(); } - - @Override - public Parameter getParameter(NamedObjectId id) throws InvalidIdentification { - Parameter p = mdb.getParameter(id); - if (p == null) { - throw new InvalidIdentification(); - } else { - return p; - } + notifyStarted(); + } + + @Override + public void pause() { + yarchReplay.pause(); + } + + @Override + public void resume() { + yarchReplay.start(); + } + + @Override + public void seek(long time, boolean autostart) { + try { + yarchReplay.seek(time, autostart); + } catch (YamcsException e) { + throw new RuntimeException(e); } - - @Override - public ReplaySpeed getSpeed() { - return originalReplayRequest.getSpeed().toProtobuf(); + } + + @Override + public void setParameterProcessor(ParameterProcessor ppm) { + this.parameterProcessorManager = (ParameterProcessorManager) ppm; + } + + @Override + public void startProviding(Parameter paramDef) { + // the subscribedParameters is used at the beginning to select the PP parameters which have to + // be subscribed + synchronized (subscribedParameters) { + subscribedParameters.add(paramDef); } + } - @Override - public ReplayRequest getReplayRequest() { - return originalReplayRequest.toProtobuf(); - } + @Override + public void startProvidingAll() { + // ignore as we always provide all parameters + } - @Override - public ReplayRequest getCurrentReplayRequest() { - return yarchReplay != null ? yarchReplay.getCurrentReplayRequest().toProtobuf() : getReplayRequest(); + @Override + public void stopProviding(Parameter paramDef) { + synchronized (subscribedParameters) { + subscribedParameters.remove(paramDef); } - - @Override - public ReplayState getReplayState() { - if (state() == State.NEW) { - return ReplayState.INITIALIZATION; - } else if (state() == State.FAILED) { - return ReplayState.ERROR; - } else { - return yarchReplay.getState(); - } + } + + @Override + public boolean canProvide(NamedObjectId id) { + boolean result = false; + Parameter p = mdb.getParameter(id); + if (p != null) { + result = canProvide(p); + } else { // check if it's system parameter + if (Mdb.isSystemParameter(id)) { + result = true; + } } - - @Override - public long getReplayTime() { - if (yarchReplay != null) { - return yarchReplay.getReplayTime(); - } else { - return originalReplayRequest.getRangeStart(); - } + return result; + } + + @Override + public boolean canProvide(Parameter p) { + boolean result; + if (mdb.getParameterEntries(p) != null) { + result = false; + } else { + result = true; } - - @Override - public void changeSpeed(ReplaySpeed speed) { - yarchReplay.changeSpeed(SpeedSpec.fromProtobuf(speed)); + return result; + } + + @Override + public Parameter getParameter(NamedObjectId id) throws InvalidIdentification { + Parameter p = mdb.getParameter(id); + if (p == null) { + throw new InvalidIdentification(); + } else { + return p; } - - @Override - public void changeEndAction(EndAction endAction) { - yarchReplay.changeEndAction(endAction); + } + + @Override + public ReplaySpeed getSpeed() { + return originalReplayRequest.getSpeed().toProtobuf(); + } + + @Override + public ReplayRequest getReplayRequest() { + return originalReplayRequest.toProtobuf(); + } + + @Override + public ReplayRequest getCurrentReplayRequest() { + return yarchReplay != null + ? yarchReplay.getCurrentReplayRequest().toProtobuf() + : getReplayRequest(); + } + + @Override + public ReplayState getReplayState() { + if (state() == State.NEW) { + return ReplayState.INITIALIZATION; + } else if (state() == State.FAILED) { + return ReplayState.ERROR; + } else { + return yarchReplay.getState(); } - - @Override - public void changeRange(long start, long stop) { - try { - yarchReplay.changeRange(start, stop); - } catch (YamcsException e) { - throw new RuntimeException(e); - } + } + + @Override + public long getReplayTime() { + if (yarchReplay != null) { + return yarchReplay.getReplayTime(); + } else { + return originalReplayRequest.getRangeStart(); } - - @Override - public void setCommandHistoryRequestManager(CommandHistoryRequestManager chrm) { - this.commandHistoryRequestManager = chrm; + } + + @Override + public void changeSpeed(ReplaySpeed speed) { + yarchReplay.changeSpeed(SpeedSpec.fromProtobuf(speed)); + } + + @Override + public void changeEndAction(EndAction endAction) { + yarchReplay.changeEndAction(endAction); + } + + @Override + public void changeRange(long start, long stop) { + try { + yarchReplay.changeRange(start, stop); + } catch (YamcsException e) { + throw new RuntimeException(e); } + } + + @Override + public void setCommandHistoryRequestManager(CommandHistoryRequestManager chrm) { + this.commandHistoryRequestManager = chrm; + } + + @Override + public Collection getSystemParameters(long gentime) { + // TODO Auto-generated method stub + return null; + } }