DARC services
Master service
- class darc.darc_master.DARCMaster(config_file='/opt/hostedtoolcache/Python/3.10.8/x64/lib/python3.10/site-packages/darc/config.yaml')
Bases:
object
DARC master service that controls all other services and queues
Interact with this service through the ‘darc’ executable
- Parameters
config_file (str) – path to DARC configuration file
- _get_attribute(service, command)
Get attribute of a service instance
- Parameters
service (str) – Which service to get an attribute of
command (str) – Full service command (“get_attr {attribute}”)
- _load_config()
Load configuration file
- _load_parset(config_file)
Load parset file and convert to observation config
- Parameters
config_file (str) – Path to parset file
- Returns
observation configuration
- _load_yaml(config_file)
Load yaml file and convert to observation config
- Parameters
config_file (str) – Path to yaml file
- Returns
observation config dict
- _switch_cmd(command)
Check status of LOFAR trigger system / VOEvent generator, or enable/disable them
- Parameters
command (str) – command to run
- Returns
status, reply
- check_status(service)
Check status of a service
- Parameters
service (str) – Service to check status of
- Returns
status, reply
- create_thread(service)
Initialise a service thread
- Parameters
service (str) – service to create a new thread for
- get_queue(service)
Get control queue corresponding to a service
- Parameters
service (str) – Service to get queue for
- Returns
queue (Queue)
- parse_message(raw_message)
Parse raw received message
- Parameters
raw_message (str) – message as single string
- Returns
status, reply
- process_message(service, command, payload)
Process received message
- Parameters
service (str) – service to interact with
command (str) – command to run
payload – payload for command
- Returns
status, reply
- restart_service(service)
Restart a service
- Parameters
service (str) – service to restart
- Returns
status, reply
- run()
Main loop
Listen for messages on the command socket and process them
- start_observation(config_file, service=None)
Start an observation
- Parameters
config_file (str) – Path to observation config file
service (str) – Which service to send start_observation to (default: all)
- Returns
status, reply
- start_service(service)
Start a service
- Parameters
service (str) – service to start
- Returns
status, reply
- stop()
Stop all services and exit
- Returns
status, reply
- stop_observation(config_file, abort=False, service=None)
Stop an observation
- Parameters
config_file (str) – path to observation config file
abort (bool) – whether to abort the observation
service (str) – Which service to send start_observation to (default: all)
- Returns
status, reply message
- stop_service(service)
Stop a service
- Parameters
service (str) – service to stop
- Returns
status, reply
- exception darc.darc_master.DARCMasterException
Bases:
Exception
- darc.darc_master.main()
Run DARC Master
Reading AMBER candidates
- class darc.amber_listener.AMBERListener(*args, **kwargs)
Bases:
DARCBase
Continuously read AMBER candidate files from disk and put candidates on output queue.
- _follow_file(fname, event)
Tail a file an put lines on queue
- Parameters
fname (str) – file to follow
event (Event) – stop event
- start_observation(obs_config, reload=True)
Start an observation
- Parameters
obs_config (dict) – observation config dict
reload (bool) – reload service settings (default: True)
- stop_observation(*args, **kwargs)
Stop observation
- exception darc.amber_listener.AMBERListenerException
Bases:
Exception
Clustering of AMBER candidates
- class darc.amber_clustering.AMBERClustering(*args, connect_vo=True, connect_lofar=True, **kwargs)
Bases:
DARCBase
Trigger IQUV / LOFAR / VOEvent system based on AMBER candidates
Cluster incoming triggers
Apply thresholds (separate for known and new sources, and for IQUV vs LOFAR)
Put IQUV triggers on output queue
Put LOFAR triggers on remote LOFAR trigger queue and on VOEvent queue
- Parameters
connect_vo (bool) – Whether or not to connect to VOEvent queue on master node
connect_lofar (bool) – Whether or not to connect to LOFAR trigger queue on master node
- _check_triggers(triggers, sys_params, utc_start, datetimesource, dm_min=0, dm_max=inf, dm_src=None, width_max=inf, snr_min=8, src_type=None, src_name=None, dmgal=0, pointing=None, skip_lofar=False)
Cluster triggers and run IQUV and/or LOFAR triggering
- Parameters
triggers (list) – Raw triggers
sys_params (dict) – System parameters (dt, delta_nu_MHz, nu_GHz)
utc_start (str) – start time of observation, in format readable by astropy.time.Time
datetimesource (str) – Field name with date and time
dm_min (float) – minimum DM (default: 0)
dm_max (float) – maximum DM (default: inf)
dm_src (float) – DM of known source (default: None)
width_max (float) – maximum width (default: inf)
snr_min (float) – mininum S/N (default: 8)
src_type (str) – Source type (pulsar, frb, None)
src_name (str) – Source name (default: None)
dmgal (float) – galactic maximum DM
pointing (astropy.coordinates.SkyCoord) – Pointing for LOFAR triggering (default: None)
skip_lofar (bool) – Skip LOFAR triggering (default: False)
- _get_pointing()
Get pointing of this CB from parset
- Returns
pointing SkyCoord
- _get_source()
Try to get DM for a known source
- Returns
DM for known source, else None
- _load_parset(obs_config)
Load the observation parset
- Parameters
obs_config (dict) – Observation config
- Returns
parset as dict
- _load_source_list()
Load the list with known source DMs
- Returns
source list with dict per category
- _process_triggers()
Read thresholds (DM, width, S/N) for clustering
Continuously read AMBER triggers from queue and start processing for known and/or new sources
- lofar_connector()
Connect to the LOFAR triggering system on the master node
- process_command(command)
Process command received from queue
- Parameters
command (dict) – Command to process
- start_observation(obs_config, reload=True)
Parse obs config and start listening for amber triggers on queue
- Parameters
obs_config (dict) – Observation configuration
reload (bool) – reload service settings (default: True)
- stop_observation(*args, **kwargs)
Stop observation
- voevent_connector()
Connect to the VOEvent generator on the master node
- exception darc.amber_clustering.AMBERClusteringException
Bases:
Exception
IQUV triggering
- class darc.dada_trigger.DADATrigger(*args, **kwargs)
Bases:
DARCBase
Generate and send dada_dbevent triggers
- _load_parset(obs_config)
Load the observation parset
- Parameters
obs_config (dict) – Observation config
- Returns
parset as dict
- cleanup()
Remove all trigger-sending threads
- polcal_dumps(obs_config)
Automatically dump IQUV data at regular intervals for polcal calibrator observations
- Parameters
obs_config (dict) – Observation config
- process_command(command)
Process command received from queue
- Parameters
command (dict) – command with arguments
- send_event(triggers)
Send trigger to dada_dbevent
- Parameters
triggers (list) – list of trigger dictionaries
- send_events(event, stokes)
Send stokes I or IQUV events
- Parameters
event (str) – raw event to send
stokes (str) – I or IQUV
- Returns
- start_observation(obs_config, reload=True)
Start observation: run IQUV dumps automatically if source is polarisation calibrator. Else ensure normal FRB candidate I/IQUV dumps are enabled
- Parameters
obs_config (dict) – Observation config
reload (bool) – reload service settings (default: True)
- exception darc.dada_trigger.DADATriggerException
Bases:
Exception
LOFAR triggering
- class darc.lofar_trigger.LOFARTrigger(source_queue, *args, config_file='/opt/hostedtoolcache/Python/3.10.8/x64/lib/python3.10/site-packages/darc/config.yaml', control_queue=None, **kwargs)
Bases:
Process
Select brightest trigger from incoming trigger and send to LOFAR for TBB triggering
- Parameters
source_queue (Queue) – Input queue for controlling this service
config_file (str) – Path to config file
control_queue (Queue) – Control queue of parent Process
- _get_attribute(command)
Get attribute as given in input command
- Parameters
command (dict) – Command received over queue
- _new_trigger(dm, utc, nu_GHz=1.37, test=None)
Create a LOFAR trigger struct
- Parameters
dm (float) – Dispersion measure (pc cm**-3)
utc (str) – UTC arrival time in ISOT format
nu_GHz (float) – Apertif centre frequency (GHz)
test (bool) – Whether to send a test event or observation event
- static _select_trigger(triggers)
Select trigger with highest S/N from a list of triggers. If there are triggers from both known and new sources, select the known source
- Parameters
triggers (list) – one dict per trigger
- Returns
trigger with highest S/N and number of unique CBs in trigger list
- _switch_command(command)
Check status or enable/disable sending of events
- create_and_send(trigger)
Create LOFAR trigger struct
Event is only sent if enabled in config
- Parameters
trigger (list/dict) – Trigger event(s). dict if one event, list of dicts if multiple events
- run()
Main loop
Read triggers from queue and process them
- send_email(trigger)
Send email upon LOFAR trigger
Consider running this method in a try/except block, as sending emails might fail in case of network interrupts
- Parameters
trigger (dict) – Trigger as sent to LOFAR
- stop()
Stop this service
- exception darc.lofar_trigger.LOFARTriggerException
Bases:
Exception
- class darc.lofar_trigger.LOFARTriggerQueueServer(address=None, authkey=None, serializer='pickle', ctx=None)
Bases:
BaseManager
Server for LOFAR Trigger input queue
Sending VOEvents
- class darc.voevent_generator.VOEventGenerator(source_queue, *args, config_file='/opt/hostedtoolcache/Python/3.10.8/x64/lib/python3.10/site-packages/darc/config.yaml', control_queue=None, **kwargs)
Bases:
Process
Convert incoming triggers to VOEvent and send to the VOEvent broker
- Parameters
source_queue (Queue) – Input queue for controlling this service
config_file (str) – Path to config file
control_queue (Queue) – Control queue of parent Process
- _NewVOEvent(dm, dm_err, width, snr, flux, ra, dec, semiMaj, semiMin, ymw16, name, importance, utc, gl, gb, gain, dt=0.08192, delta_nu_MHz=0.1953125, nu_GHz=1.37, posang=0, test=None)
Create a VOEvent
- Parameters
dm (float) – Dispersion measure (pc cm**-3)
dm_err (float) – Error on DM (pc cm**-3)
width (float) – Pulse width (ms)
snr (float) – Signal-to-noise ratio
flux (float) – flux density (mJy)
ra (float) – Right ascension (deg)
dec (float) – Declination (deg)
semiMaj (float) – Localisation region semi-major axis (arcmin)
semiMin (float) – Localisation region semi-minor axis (arcmin)
ymw16 (float) – YMW16 DM (pc cm**-3)
name (str) – Source name
importance (float) – Trigger importance (0-1)
utc (str) – UTC arrival time in ISOT format
gl (float) – Galactic longitude (deg)
gb (float) – Galactic latitude (deg)
gain (float) – Telescope gain (K Jy**-1)
dt (float) – Telescope time resolution (ms)
delta_nu_MHz (float) – Telescope frequency channel width (MHz)
nu_GHz (float) – Telescope centre frequency (GHz)
posang (float) – Localisation region position angle (deg)
test (bool) – Whether to send a test event or observation event
- _get_attribute(command)
Get attribute as given in input command
- Parameters
command (dict) – Command received over queue
- static _select_trigger(triggers)
Select trigger with highest S/N from a list of triggers
- Parameters
triggers (list) – one dict per trigger
- Returns
trigger with highest S/N and number of unique CBs in trigger list
- _switch_command(command)
Check status or enable/disable sending of events
- create_and_send(trigger)
Create voevent
Event is only sent if enabled in config
- Parameters
trigger (list/dict) – Trigger event(s). dict if one event, list of dicts if multiple events
- run()
Main loop
Read triggers from queue and process them
- stop()
Stop this service
- exception darc.voevent_generator.VOEventGeneratorException
Bases:
Exception
- class darc.voevent_generator.VOEventQueueServer(address=None, authkey=None, serializer='pickle', ctx=None)
Bases:
BaseManager
Server for VOEvent input queue
Offline data processing
- class darc.offline_processing.OfflineProcessing(source_queue, *args, config_file='/opt/hostedtoolcache/Python/3.10.8/x64/lib/python3.10/site-packages/darc/config.yaml', control_queue=None, **kwargs)
Bases:
Process
Full offline processing pipeline:
Candidate clustering
Extraction of filterbank data
ML classification
Email to astronomers
Also includes:
Automated pulsar folding
Automated run of calibration tools for drift scans
Automated run of known FRB candidate extractor
- Parameters
source_queue (Queue) – Input queue
config_file (str) – Path to config file
control_queue (Queue) – Control queue of parent Process
- _classify(obs_config, input_file)
Run the ML classifier
- Parameters
obs_config (dict) – Observation config
input_file (str) – HDF5 file to process
- Returns
prefix of output figure path
- _cluster(obs_config, filterbank_name, tab=None, ind=None, sbmin=None, sbmax=None, out=None)
Run triggers.py, which takes care of clustering and extracting filterbank data
- Parameters
obs_config (dict) – Observation config
filterbank_name (str) – Full path to filterbank file (TAB/IAB) or prefix (SB) to use
tab (int) – TAB number to process (0 for IAB, absent/None for SB)
ind (int) – Index of out array where to store results
sbmin (int) – First SB to process (SB mode only)
sbmax (int) – Last SB to process (SB mode only)
out (np.ndarray) – output array where return value is put at index <ind> (optional)
- Returns
number of grouped candidates (put in out array, else returned)
- _fold_pulsar(source, obs_config)
Fold pulsar with PRESTO
- Parameters
source (str) – pulsar name including B or J
obs_config (dict) – Observation config
- _gather_results(obs_config, **kwargs)
Gather output results and put in central location
- Parameters
obs_config (dict) – Observation config
kwargs (dict) – number of candidates, output prefixes; these are added to obs_config
- _get_attribute(command)
Get attribute as given in input command
- Parameters
command (dict) – Command received over queue
- _get_coordinates(obs_config)
Generate coordinates file from the pointing directions
File contains RA, Dec, gl, gb for each CB used in this observation
- Parameters
obs_config (dict) – Observation config
- Returns
ra, deg for CB00 (decimal deg) if available, else None
- _get_overview(obs_config)
Generate observation overview file
- Parameters
obs_config (dict) – Observation config
- _load_parset(obs_config)
Load the observation parset
- Parameters
obs_config (dict) – Observation config
- Returns
parset as dict
- _merge_hdf5(obs_config, output_file)
Merge HDF5 files generated by clustering
- Parameters
obs_config (dict) – Observation config
output_file (str) – Filename of merged HDF5 data
- Returns
Number of triggers in combined HDF5 file
- _merge_plots(obs_config)
Merge classifier output plots into one pdf
- Parameters
obs_config (dict) – Observation config
- _merge_triggers(obs_config)
Merge AMBER triggers into one file
- Parameters
obs_config (dict) – Observation config
- _plot_known_frb_cands(obs_config, coord_cb00)
Call external script that plots candidates of known FRBs
- Parameters
obs_config (dict) – Observation config
coord_cb00 (list) – [ra, dec] of CB00 in decimal degrees
- _run_calibration_tools(source, obs_config)
- Parameters
source (str) – Source name (like 3C296, not 3C286drift0107)
obs_config (dict) – Observation config
- _start_observation_master(obs_config, reload=True)
Start observation on master node
Generated observation summary and send email once all workers are done
- Parameters
obs_config (dict) – Observation config
reload (bool) – reload service settings (default: True)
- _start_observation_worker(obs_config, reload=True)
Start observation on worker node:
Candidate clustering
Extraction of filterbank data
ML classification
- Parameters
obs_config (dict) – Observation config
reload (bool) – reload service settings (default: True)
- load_config()
- run()
Wait for start_observation command, then run offline processing pipeline. Executed commands depend on whether this is a worker node or the master node.
- stop()
Stop this service
- exception darc.offline_processing.OfflineProcessingException
Bases:
Exception
Real-time data processing (worker node)
- class darc.processor.Processor(log_queue, *args, **kwargs)
Bases:
DARCBase
Real-time processing of candidates
Clustering + thresholding
Extract data from filterbank
Run classifier
Visualize candidates
After observation finishes, results are gathered in a central location to be picked up by the master node
- Parameters
log_queue (Queue) – Queue to use for logging
- _finish_processing()
Wait for real-time processing to finish and visualize results
- _get_timeout()
Get procesing time limit
- Returns
time limit in seconds (float) or None if no limit
- _join_with_timeout(name, timeout)
Signal a process to stop. Terminate if timeout is reached
- Parameters
name (str) – name of Process in self.threads dict to join
timeout (float) – timeout in seconds (None for no time limit)
- _read_amber_triggers()
Read AMBER triggers for reprocessing of an observation. Based on AMBERListener
- _read_and_process_data()
Process incoming AMBER triggers
- _reorder_clusters()
Reorder clusters ready for data extraction to highest-S/N first. This is used such that bright candidates are prioritized when there is a processing time limit
- _store_obs_stats()
Store observation statistics to central result directory
- process_command(command)
Process command received from queue
- Parameters
command (dict) – Command to process
- start_observation(obs_config, reload=True)
Parse obs config and start listening for amber triggers on queue
- Parameters
obs_config (dict) – Observation configuration
reload (bool) – reload service settings (default: True)
- stop(abort=None)
Stop this service
- Parameters
abort (bool) – Ignored, a stop of the service always equals abort
- stop_observation(abort=False)
Stop observation
- Parameters
abort (bool) – Whether or not to abort the observation
- exception darc.processor.ProcessorException
Bases:
Exception
- class darc.processor.ProcessorManager(*args, **kwargs)
Bases:
DARCBase
Control logic for running several Processor instances, one per observation
- _load_parset(obs_config)
Load the observation parset
- Parameters
obs_config (dict) – Observation config
- Returns
parset as dict
- process_command(command)
Forward any data from the input queue to the running observation
- processing_status_generator()
At regular interval, create status file for processing website
- run()
Main loop. Create thread scavenger, then run parent class run method
- start_observation(obs_config, reload=True)
Initialize a Processor and call its start_observation
- stop(abort=False)
Stop this service
- Parameters
abort (bool) – Ignored; a stop of the manager always equals an abort
- stop_observation(obs_config)
Stop observation with task ID as given in parset
- Parameters
obs_config (dict) – Observation config
- thread_scavenger()
Remove any finished threads at regular intervals
Real-time data processing (master node)
- class darc.processor_master.ProcessorMaster(log_queue, *args, **kwargs)
Bases:
DARCBase
Combine results from worker node processors
- Parameters
log_queue (Queue) – Queue to use for logging
- _check_node_online(node)
Check if the processor on a node is still online and processing the current observation
- Parameters
node (str) – Hostname of node to check
- Returns
status (bool): True if node is online, else False
- _generate_info_file()
Generate observation info files
- Returns
info (dict), coordinates of each CB (dict)
- _get_result_dir()
Get result directory from worker processor config
- _process_observation()
Process observation
- _process_results(info, coordinates)
Load statistics and plots from the nodes. Copy to website directory and return data to be sent as email
- Parameters
info (dict) – Observation info summary
coordinates (dict) – Coordinates of every CB in the observation
- Returns
email (str), attachments (list)
- _publish_results(body, files)
Publish email content as local website
- _send_email(email, attachments)
Send email with observation results
- Parameters
email (str) – Email body
attachments (list) – Attachments
- _send_warning(node)
Send a warning email about a node
- Parameters
node (str) – Node to send warning about
- _wait_for_workers()
Wait for all worker nodes to finish processing this observation
- start_observation(obs_config, reload=True)
Parse obs config and start observation processing after end time has passed
- Parameters
obs_config (dict) – Observation configuration
reload (bool) – reload service settings (default: True)
- stop(abort=None)
Stop this service
- Parameters
abort (bool) – Ignored, a stop of the service always equals abort
- stop_observation(abort=False)
Stop observation
- Parameters
abort (bool) – Whether or not to abort the observation
- class darc.processor_master.ProcessorMasterManager(*args, **kwargs)
Bases:
DARCBase
Control logic for running several ProcessorMaster instances, one per observation
- _load_parset(obs_config)
Load the observation parset
- Parameters
obs_config (dict) – Observation config
- Returns
parset as dict
- processing_status_generator()
At regular interval, create status file for processing website
- run()
Main loop. Create thread scavenger, then run parent class run method
- start_observation(obs_config, reload=True)
Initialize a ProcessorMaster and call its start_observation
- stop(abort=False)
Stop this service
- Parameters
abort (bool) – Ignored; a stop of the manager always equals an abort
- stop_observation(obs_config)
Stop observation with task ID as given in parset
- Parameters
obs_config (dict) – Observation config
- thread_scavenger()
Remove any finished threads at regular intervals
Monitoring
- class darc.status_website.StatusWebsite(source_queue, *args, config_file='/opt/hostedtoolcache/Python/3.10.8/x64/lib/python3.10/site-packages/darc/config.yaml', control_queue=None, **kwargs)
Bases:
Process
Generate a HTML page with the status of each service across the ARTS cluster at regular intervals
- Parameters
source_queue (Queue) – Input queue
config_file (str) – Path to config file
control_queue (Queue) – Control queue of parent Process
- _get_attribute(command)
Get attribute as given in input command
- Parameters
command (dict) – Command received over queue
- check_command()
Check if this service should execute a command
- static get_template()
Return HTML template
Split into header and footer
- Returns
header, footer
- make_offline_page()
Create page for when status website is offline
- publish_status(statuses)
Publish status as simple html webpage
- Parameters
statuses (dict) – Status of each service across all nodes
- run()
Main loop:
Get status of all services across all nodes
Publish HTML page with statuses
Generate offline page upon exit
- stop()
Stop this service
- exception darc.status_website.StatusWebsiteException
Bases:
Exception