DARC services
Master service
- class darc.darc_master.DARCMaster(config_file='/opt/hostedtoolcache/Python/3.12.12/x64/lib/python3.12/site-packages/darc/config.yaml')
Bases:
objectDARC master service that controls all other services and queues
Interact with this service through the ‘darc’ executable
- Parameters:
config_file (str) – path to DARC configuration file
- _get_attribute(service, command)
Get attribute of a service instance
- Parameters:
service (str) – Which service to get an attribute of
command (str) – Full service command (“get_attr {attribute}”)
- _load_config()
Load configuration file
- _load_parset(config_file)
Load parset file and convert to observation config
- Parameters:
config_file (str) – Path to parset file
- Returns:
observation configuration
- _load_yaml(config_file)
Load yaml file and convert to observation config
- Parameters:
config_file (str) – Path to yaml file
- Returns:
observation config dict
- _switch_cmd(command)
Check status of LOFAR trigger system / VOEvent generator, or enable/disable them
- Parameters:
command (str) – command to run
- Returns:
status, reply
- check_status(service)
Check status of a service
- Parameters:
service (str) – Service to check status of
- Returns:
status, reply
- create_thread(service)
Initialise a service thread
- Parameters:
service (str) – service to create a new thread for
- get_queue(service)
Get control queue corresponding to a service
- Parameters:
service (str) – Service to get queue for
- Returns:
queue (Queue)
- parse_message(raw_message)
Parse raw received message
- Parameters:
raw_message (str) – message as single string
- Returns:
status, reply
- process_message(service, command, payload)
Process received message
- Parameters:
service (str) – service to interact with
command (str) – command to run
payload – payload for command
- Returns:
status, reply
- restart_service(service)
Restart a service
- Parameters:
service (str) – service to restart
- Returns:
status, reply
- run()
Main loop
Listen for messages on the command socket and process them
- start_observation(config_file, service=None)
Start an observation
- Parameters:
config_file (str) – Path to observation config file
service (str) – Which service to send start_observation to (default: all)
- Returns:
status, reply
- start_service(service)
Start a service
- Parameters:
service (str) – service to start
- Returns:
status, reply
- stop()
Stop all services and exit
- Returns:
status, reply
- stop_observation(config_file, abort=False, service=None)
Stop an observation
- Parameters:
config_file (str) – path to observation config file
abort (bool) – whether to abort the observation
service (str) – Which service to send start_observation to (default: all)
- Returns:
status, reply message
- stop_service(service)
Stop a service
- Parameters:
service (str) – service to stop
- Returns:
status, reply
- exception darc.darc_master.DARCMasterException
Bases:
Exception
- darc.darc_master.main()
Run DARC Master
Reading AMBER candidates
- class darc.amber_listener.AMBERListener(*args, **kwargs)
Bases:
DARCBaseContinuously read AMBER candidate files from disk and put candidates on output queue.
- _follow_file(fname, event)
Tail a file an put lines on queue
- Parameters:
fname (str) – file to follow
event (Event) – stop event
- start_observation(obs_config, reload=True)
Start an observation
- Parameters:
obs_config (dict) – observation config dict
reload (bool) – reload service settings (default: True)
- stop_observation(*args, **kwargs)
Stop observation
- exception darc.amber_listener.AMBERListenerException
Bases:
Exception
Clustering of AMBER candidates
- class darc.amber_clustering.AMBERClustering(*args, connect_vo=True, connect_lofar=True, **kwargs)
Bases:
DARCBaseTrigger IQUV / LOFAR / VOEvent system based on AMBER candidates
Cluster incoming triggers
Apply thresholds (separate for known and new sources, and for IQUV vs LOFAR)
Put IQUV triggers on output queue
Put LOFAR triggers on remote LOFAR trigger queue and on VOEvent queue
- Parameters:
connect_vo (bool) – Whether or not to connect to VOEvent queue on master node
connect_lofar (bool) – Whether or not to connect to LOFAR trigger queue on master node
- _check_triggers(triggers, sys_params, utc_start, datetimesource, dm_min=0, dm_max=inf, dm_src=None, width_max=inf, snr_min=8, src_type=None, src_name=None, dmgal=0, pointing=None, skip_lofar=False)
Cluster triggers and run IQUV and/or LOFAR triggering
- Parameters:
triggers (list) – Raw triggers
sys_params (dict) – System parameters (dt, delta_nu_MHz, nu_GHz)
utc_start (str) – start time of observation, in format readable by astropy.time.Time
datetimesource (str) – Field name with date and time
dm_min (float) – minimum DM (default: 0)
dm_max (float) – maximum DM (default: inf)
dm_src (float) – DM of known source (default: None)
width_max (float) – maximum width (default: inf)
snr_min (float) – mininum S/N (default: 8)
src_type (str) – Source type (pulsar, frb, None)
src_name (str) – Source name (default: None)
dmgal (float) – galactic maximum DM
pointing (astropy.coordinates.SkyCoord) – Pointing for LOFAR triggering (default: None)
skip_lofar (bool) – Skip LOFAR triggering (default: False)
- _get_pointing()
Get pointing of this CB from parset
- Returns:
pointing SkyCoord
- _get_source()
Try to get DM for a known source
- Returns:
DM for known source, else None
- _load_parset(obs_config)
Load the observation parset
- Parameters:
obs_config (dict) – Observation config
- Returns:
parset as dict
- _load_source_list()
Load the list with known source DMs
- Returns:
source list with dict per category
- _process_triggers()
Read thresholds (DM, width, S/N) for clustering
Continuously read AMBER triggers from queue and start processing for known and/or new sources
- lofar_connector()
Connect to the LOFAR triggering system on the master node
- process_command(command)
Process command received from queue
- Parameters:
command (dict) – Command to process
- start_observation(obs_config, reload=True)
Parse obs config and start listening for amber triggers on queue
- Parameters:
obs_config (dict) – Observation configuration
reload (bool) – reload service settings (default: True)
- stop_observation(*args, **kwargs)
Stop observation
- voevent_connector()
Connect to the VOEvent generator on the master node
- exception darc.amber_clustering.AMBERClusteringException
Bases:
Exception
IQUV triggering
- class darc.dada_trigger.DADATrigger(*args, **kwargs)
Bases:
DARCBaseGenerate and send dada_dbevent triggers
- _load_parset(obs_config)
Load the observation parset
- Parameters:
obs_config (dict) – Observation config
- Returns:
parset as dict
- cleanup()
Remove all trigger-sending threads
- polcal_dumps(obs_config)
Automatically dump IQUV data at regular intervals for polcal calibrator observations
- Parameters:
obs_config (dict) – Observation config
- process_command(command)
Process command received from queue
- Parameters:
command (dict) – command with arguments
- send_event(triggers)
Send trigger to dada_dbevent
- Parameters:
triggers (list) – list of trigger dictionaries
- send_events(event, stokes)
Send stokes I or IQUV events
- Parameters:
event (str) – raw event to send
stokes (str) – I or IQUV
- Returns:
- start_observation(obs_config, reload=True)
Start observation: run IQUV dumps automatically if source is polarisation calibrator. Else ensure normal FRB candidate I/IQUV dumps are enabled
- Parameters:
obs_config (dict) – Observation config
reload (bool) – reload service settings (default: True)
- exception darc.dada_trigger.DADATriggerException
Bases:
Exception
LOFAR triggering
- class darc.lofar_trigger.LOFARTrigger(source_queue, *args, config_file='/opt/hostedtoolcache/Python/3.12.12/x64/lib/python3.12/site-packages/darc/config.yaml', control_queue=None, **kwargs)
Bases:
ProcessSelect brightest trigger from incoming trigger and send to LOFAR for TBB triggering
- Parameters:
source_queue (Queue) – Input queue for controlling this service
config_file (str) – Path to config file
control_queue (Queue) – Control queue of parent Process
- _get_attribute(command)
Get attribute as given in input command
- Parameters:
command (dict) – Command received over queue
- _new_trigger(dm, utc, nu_GHz=1.37, test=None)
Create a LOFAR trigger struct
- Parameters:
dm (float) – Dispersion measure (pc cm**-3)
utc (str) – UTC arrival time in ISOT format
nu_GHz (float) – Apertif centre frequency (GHz)
test (bool) – Whether to send a test event or observation event
- static _select_trigger(triggers)
Select trigger with highest S/N from a list of triggers. If there are triggers from both known and new sources, select the known source
- Parameters:
triggers (list) – one dict per trigger
- Returns:
trigger with highest S/N and number of unique CBs in trigger list
- _switch_command(command)
Check status or enable/disable sending of events
- create_and_send(trigger)
Create LOFAR trigger struct
Event is only sent if enabled in config
- Parameters:
trigger (list/dict) – Trigger event(s). dict if one event, list of dicts if multiple events
- run()
Main loop
Read triggers from queue and process them
- send_email(trigger)
Send email upon LOFAR trigger
Consider running this method in a try/except block, as sending emails might fail in case of network interrupts
- Parameters:
trigger (dict) – Trigger as sent to LOFAR
- stop()
Stop this service
- exception darc.lofar_trigger.LOFARTriggerException
Bases:
Exception
- class darc.lofar_trigger.LOFARTriggerQueueServer(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)
Bases:
BaseManagerServer for LOFAR Trigger input queue
Sending VOEvents
- class darc.voevent_generator.VOEventGenerator(source_queue, *args, config_file='/opt/hostedtoolcache/Python/3.12.12/x64/lib/python3.12/site-packages/darc/config.yaml', control_queue=None, **kwargs)
Bases:
ProcessConvert incoming triggers to VOEvent and send to the VOEvent broker
- Parameters:
source_queue (Queue) – Input queue for controlling this service
config_file (str) – Path to config file
control_queue (Queue) – Control queue of parent Process
- _NewVOEvent(dm, dm_err, width, snr, flux, ra, dec, semiMaj, semiMin, ymw16, name, importance, utc, gl, gb, gain, dt=np.float64(0.08192), delta_nu_MHz=np.float64(0.1953125), nu_GHz=1.37, posang=0, test=None)
Create a VOEvent
- Parameters:
dm (float) – Dispersion measure (pc cm**-3)
dm_err (float) – Error on DM (pc cm**-3)
width (float) – Pulse width (ms)
snr (float) – Signal-to-noise ratio
flux (float) – flux density (mJy)
ra (float) – Right ascension (deg)
dec (float) – Declination (deg)
semiMaj (float) – Localisation region semi-major axis (arcmin)
semiMin (float) – Localisation region semi-minor axis (arcmin)
ymw16 (float) – YMW16 DM (pc cm**-3)
name (str) – Source name
importance (float) – Trigger importance (0-1)
utc (str) – UTC arrival time in ISOT format
gl (float) – Galactic longitude (deg)
gb (float) – Galactic latitude (deg)
gain (float) – Telescope gain (K Jy**-1)
dt (float) – Telescope time resolution (ms)
delta_nu_MHz (float) – Telescope frequency channel width (MHz)
nu_GHz (float) – Telescope centre frequency (GHz)
posang (float) – Localisation region position angle (deg)
test (bool) – Whether to send a test event or observation event
- _get_attribute(command)
Get attribute as given in input command
- Parameters:
command (dict) – Command received over queue
- static _select_trigger(triggers)
Select trigger with highest S/N from a list of triggers
- Parameters:
triggers (list) – one dict per trigger
- Returns:
trigger with highest S/N and number of unique CBs in trigger list
- _switch_command(command)
Check status or enable/disable sending of events
- create_and_send(trigger)
Create voevent
Event is only sent if enabled in config
- Parameters:
trigger (list/dict) – Trigger event(s). dict if one event, list of dicts if multiple events
- run()
Main loop
Read triggers from queue and process them
- stop()
Stop this service
- exception darc.voevent_generator.VOEventGeneratorException
Bases:
Exception
- class darc.voevent_generator.VOEventQueueServer(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)
Bases:
BaseManagerServer for VOEvent input queue
Offline data processing
- class darc.offline_processing.OfflineProcessing(source_queue, *args, config_file='/opt/hostedtoolcache/Python/3.12.12/x64/lib/python3.12/site-packages/darc/config.yaml', control_queue=None, **kwargs)
Bases:
ProcessFull offline processing pipeline:
Candidate clustering
Extraction of filterbank data
ML classification
Email to astronomers
Also includes:
Automated pulsar folding
Automated run of calibration tools for drift scans
Automated run of known FRB candidate extractor
- Parameters:
source_queue (Queue) – Input queue
config_file (str) – Path to config file
control_queue (Queue) – Control queue of parent Process
- _classify(obs_config, input_file)
Run the ML classifier
- Parameters:
obs_config (dict) – Observation config
input_file (str) – HDF5 file to process
- Returns:
prefix of output figure path
- _cluster(obs_config, filterbank_name, tab=None, ind=None, sbmin=None, sbmax=None, out=None)
Run triggers.py, which takes care of clustering and extracting filterbank data
- Parameters:
obs_config (dict) – Observation config
filterbank_name (str) – Full path to filterbank file (TAB/IAB) or prefix (SB) to use
tab (int) – TAB number to process (0 for IAB, absent/None for SB)
ind (int) – Index of out array where to store results
sbmin (int) – First SB to process (SB mode only)
sbmax (int) – Last SB to process (SB mode only)
out (np.ndarray) – output array where return value is put at index <ind> (optional)
- Returns:
number of grouped candidates (put in out array, else returned)
- _fold_pulsar(source, obs_config)
Fold pulsar with PRESTO
- Parameters:
source (str) – pulsar name including B or J
obs_config (dict) – Observation config
- _gather_results(obs_config, **kwargs)
Gather output results and put in central location
- Parameters:
obs_config (dict) – Observation config
kwargs (dict) – number of candidates, output prefixes; these are added to obs_config
- _get_attribute(command)
Get attribute as given in input command
- Parameters:
command (dict) – Command received over queue
- _get_coordinates(obs_config)
Generate coordinates file from the pointing directions
File contains RA, Dec, gl, gb for each CB used in this observation
- Parameters:
obs_config (dict) – Observation config
- Returns:
ra, deg for CB00 (decimal deg) if available, else None
- _get_overview(obs_config)
Generate observation overview file
- Parameters:
obs_config (dict) – Observation config
- _load_parset(obs_config)
Load the observation parset
- Parameters:
obs_config (dict) – Observation config
- Returns:
parset as dict
- _merge_hdf5(obs_config, output_file)
Merge HDF5 files generated by clustering
- Parameters:
obs_config (dict) – Observation config
output_file (str) – Filename of merged HDF5 data
- Returns:
Number of triggers in combined HDF5 file
- _merge_plots(obs_config)
Merge classifier output plots into one pdf
- Parameters:
obs_config (dict) – Observation config
- _merge_triggers(obs_config)
Merge AMBER triggers into one file
- Parameters:
obs_config (dict) – Observation config
- _plot_known_frb_cands(obs_config, coord_cb00)
Call external script that plots candidates of known FRBs
- Parameters:
obs_config (dict) – Observation config
coord_cb00 (list) – [ra, dec] of CB00 in decimal degrees
- _run_calibration_tools(source, obs_config)
- Parameters:
source (str) – Source name (like 3C296, not 3C286drift0107)
obs_config (dict) – Observation config
- _start_observation_master(obs_config, reload=True)
Start observation on master node
Generated observation summary and send email once all workers are done
- Parameters:
obs_config (dict) – Observation config
reload (bool) – reload service settings (default: True)
- _start_observation_worker(obs_config, reload=True)
Start observation on worker node:
Candidate clustering
Extraction of filterbank data
ML classification
- Parameters:
obs_config (dict) – Observation config
reload (bool) – reload service settings (default: True)
- load_config()
- run()
Wait for start_observation command, then run offline processing pipeline. Executed commands depend on whether this is a worker node or the master node.
- stop()
Stop this service
- exception darc.offline_processing.OfflineProcessingException
Bases:
Exception
Real-time data processing (worker node)
- class darc.processor.Processor(log_queue, *args, **kwargs)
Bases:
DARCBaseReal-time processing of candidates
Clustering + thresholding
Extract data from filterbank
Run classifier
Visualize candidates
After observation finishes, results are gathered in a central location to be picked up by the master node
- Parameters:
log_queue (Queue) – Queue to use for logging
- _finish_processing()
Wait for real-time processing to finish and visualize results
- _get_timeout()
Get procesing time limit
- Returns:
time limit in seconds (float) or None if no limit
- _join_with_timeout(name, timeout)
Signal a process to stop. Terminate if timeout is reached
- Parameters:
name (str) – name of Process in self.threads dict to join
timeout (float) – timeout in seconds (None for no time limit)
- _read_amber_triggers()
Read AMBER triggers for reprocessing of an observation. Based on AMBERListener
- _read_and_process_data()
Process incoming AMBER triggers
- _reorder_clusters()
Reorder clusters ready for data extraction to highest-S/N first. This is used such that bright candidates are prioritized when there is a processing time limit
- _store_obs_stats()
Store observation statistics to central result directory
- process_command(command)
Process command received from queue
- Parameters:
command (dict) – Command to process
- start_observation(obs_config, reload=True)
Parse obs config and start listening for amber triggers on queue
- Parameters:
obs_config (dict) – Observation configuration
reload (bool) – reload service settings (default: True)
- stop(abort=None)
Stop this service
- Parameters:
abort (bool) – Ignored, a stop of the service always equals abort
- stop_observation(abort=False)
Stop observation
- Parameters:
abort (bool) – Whether or not to abort the observation
- exception darc.processor.ProcessorException
Bases:
Exception
- class darc.processor.ProcessorManager(*args, **kwargs)
Bases:
DARCBaseControl logic for running several Processor instances, one per observation
- _load_parset(obs_config)
Load the observation parset
- Parameters:
obs_config (dict) – Observation config
- Returns:
parset as dict
- process_command(command)
Forward any data from the input queue to the running observation
- processing_status_generator()
At regular interval, create status file for processing website
- run()
Main loop. Create thread scavenger, then run parent class run method
- start_observation(obs_config, reload=True)
Initialize a Processor and call its start_observation
- stop(abort=False)
Stop this service
- Parameters:
abort (bool) – Ignored; a stop of the manager always equals an abort
- stop_observation(obs_config)
Stop observation with task ID as given in parset
- Parameters:
obs_config (dict) – Observation config
- thread_scavenger()
Remove any finished threads at regular intervals
Real-time data processing (master node)
- class darc.processor_master.ProcessorMaster(log_queue, *args, **kwargs)
Bases:
DARCBaseCombine results from worker node processors
- Parameters:
log_queue (Queue) – Queue to use for logging
- _check_node_online(node)
Check if the processor on a node is still online and processing the current observation
- Parameters:
node (str) – Hostname of node to check
- Returns:
status (bool): True if node is online, else False
- _generate_info_file()
Generate observation info files
- Returns:
info (dict), coordinates of each CB (dict)
- _get_result_dir()
Get result directory from worker processor config
- _process_observation()
Process observation
- _process_results(info, coordinates)
Load statistics and plots from the nodes. Copy to website directory and return data to be sent as email
- Parameters:
info (dict) – Observation info summary
coordinates (dict) – Coordinates of every CB in the observation
- Returns:
email (str), attachments (list)
- _publish_results(body, files)
Publish email content as local website
- _send_email(email, attachments)
Send email with observation results
- Parameters:
email (str) – Email body
attachments (list) – Attachments
- _send_warning(node)
Send a warning email about a node
- Parameters:
node (str) – Node to send warning about
- _wait_for_workers()
Wait for all worker nodes to finish processing this observation
- start_observation(obs_config, reload=True)
Parse obs config and start observation processing after end time has passed
- Parameters:
obs_config (dict) – Observation configuration
reload (bool) – reload service settings (default: True)
- stop(abort=None)
Stop this service
- Parameters:
abort (bool) – Ignored, a stop of the service always equals abort
- stop_observation(abort=False)
Stop observation
- Parameters:
abort (bool) – Whether or not to abort the observation
- class darc.processor_master.ProcessorMasterManager(*args, **kwargs)
Bases:
DARCBaseControl logic for running several ProcessorMaster instances, one per observation
- _load_parset(obs_config)
Load the observation parset
- Parameters:
obs_config (dict) – Observation config
- Returns:
parset as dict
- processing_status_generator()
At regular interval, create status file for processing website
- run()
Main loop. Create thread scavenger, then run parent class run method
- start_observation(obs_config, reload=True)
Initialize a ProcessorMaster and call its start_observation
- stop(abort=False)
Stop this service
- Parameters:
abort (bool) – Ignored; a stop of the manager always equals an abort
- stop_observation(obs_config)
Stop observation with task ID as given in parset
- Parameters:
obs_config (dict) – Observation config
- thread_scavenger()
Remove any finished threads at regular intervals
Monitoring
- class darc.status_website.StatusWebsite(source_queue, *args, config_file='/opt/hostedtoolcache/Python/3.12.12/x64/lib/python3.12/site-packages/darc/config.yaml', control_queue=None, **kwargs)
Bases:
ProcessGenerate a HTML page with the status of each service across the ARTS cluster at regular intervals
- Parameters:
source_queue (Queue) – Input queue
config_file (str) – Path to config file
control_queue (Queue) – Control queue of parent Process
- _get_attribute(command)
Get attribute as given in input command
- Parameters:
command (dict) – Command received over queue
- check_command()
Check if this service should execute a command
- static get_template()
Return HTML template
Split into header and footer
- Returns:
header, footer
- make_offline_page()
Create page for when status website is offline
- publish_status(statuses)
Publish status as simple html webpage
- Parameters:
statuses (dict) – Status of each service across all nodes
- run()
Main loop:
Get status of all services across all nodes
Publish HTML page with statuses
Generate offline page upon exit
- stop()
Stop this service
- exception darc.status_website.StatusWebsiteException
Bases:
Exception