DARC services

Master service

class darc.darc_master.DARCMaster(config_file='/opt/hostedtoolcache/Python/3.10.8/x64/lib/python3.10/site-packages/darc/config.yaml')

Bases: object

DARC master service that controls all other services and queues

Interact with this service through the ‘darc’ executable

Parameters

config_file (str) – path to DARC configuration file

_get_attribute(service, command)

Get attribute of a service instance

Parameters
  • service (str) – Which service to get an attribute of

  • command (str) – Full service command (“get_attr {attribute}”)

_load_config()

Load configuration file

_load_parset(config_file)

Load parset file and convert to observation config

Parameters

config_file (str) – Path to parset file

Returns

observation configuration

_load_yaml(config_file)

Load yaml file and convert to observation config

Parameters

config_file (str) – Path to yaml file

Returns

observation config dict

_switch_cmd(command)

Check status of LOFAR trigger system / VOEvent generator, or enable/disable them

Parameters

command (str) – command to run

Returns

status, reply

check_status(service)

Check status of a service

Parameters

service (str) – Service to check status of

Returns

status, reply

create_thread(service)

Initialise a service thread

Parameters

service (str) – service to create a new thread for

get_queue(service)

Get control queue corresponding to a service

Parameters

service (str) – Service to get queue for

Returns

queue (Queue)

parse_message(raw_message)

Parse raw received message

Parameters

raw_message (str) – message as single string

Returns

status, reply

process_message(service, command, payload)

Process received message

Parameters
  • service (str) – service to interact with

  • command (str) – command to run

  • payload – payload for command

Returns

status, reply

restart_service(service)

Restart a service

Parameters

service (str) – service to restart

Returns

status, reply

run()

Main loop

Listen for messages on the command socket and process them

start_observation(config_file, service=None)

Start an observation

Parameters
  • config_file (str) – Path to observation config file

  • service (str) – Which service to send start_observation to (default: all)

Returns

status, reply

start_service(service)

Start a service

Parameters

service (str) – service to start

Returns

status, reply

stop()

Stop all services and exit

Returns

status, reply

stop_observation(config_file, abort=False, service=None)

Stop an observation

Parameters
  • config_file (str) – path to observation config file

  • abort (bool) – whether to abort the observation

  • service (str) – Which service to send start_observation to (default: all)

Returns

status, reply message

stop_service(service)

Stop a service

Parameters

service (str) – service to stop

Returns

status, reply

exception darc.darc_master.DARCMasterException

Bases: Exception

darc.darc_master.main()

Run DARC Master

Reading AMBER candidates

class darc.amber_listener.AMBERListener(*args, **kwargs)

Bases: DARCBase

Continuously read AMBER candidate files from disk and put candidates on output queue.

_follow_file(fname, event)

Tail a file an put lines on queue

Parameters
  • fname (str) – file to follow

  • event (Event) – stop event

start_observation(obs_config, reload=True)

Start an observation

Parameters
  • obs_config (dict) – observation config dict

  • reload (bool) – reload service settings (default: True)

stop_observation(*args, **kwargs)

Stop observation

exception darc.amber_listener.AMBERListenerException

Bases: Exception

Clustering of AMBER candidates

class darc.amber_clustering.AMBERClustering(*args, connect_vo=True, connect_lofar=True, **kwargs)

Bases: DARCBase

Trigger IQUV / LOFAR / VOEvent system based on AMBER candidates

  1. Cluster incoming triggers

  2. Apply thresholds (separate for known and new sources, and for IQUV vs LOFAR)

  3. Put IQUV triggers on output queue

  4. Put LOFAR triggers on remote LOFAR trigger queue and on VOEvent queue

Parameters
  • connect_vo (bool) – Whether or not to connect to VOEvent queue on master node

  • connect_lofar (bool) – Whether or not to connect to LOFAR trigger queue on master node

_check_triggers(triggers, sys_params, utc_start, datetimesource, dm_min=0, dm_max=inf, dm_src=None, width_max=inf, snr_min=8, src_type=None, src_name=None, dmgal=0, pointing=None, skip_lofar=False)

Cluster triggers and run IQUV and/or LOFAR triggering

Parameters
  • triggers (list) – Raw triggers

  • sys_params (dict) – System parameters (dt, delta_nu_MHz, nu_GHz)

  • utc_start (str) – start time of observation, in format readable by astropy.time.Time

  • datetimesource (str) – Field name with date and time

  • dm_min (float) – minimum DM (default: 0)

  • dm_max (float) – maximum DM (default: inf)

  • dm_src (float) – DM of known source (default: None)

  • width_max (float) – maximum width (default: inf)

  • snr_min (float) – mininum S/N (default: 8)

  • src_type (str) – Source type (pulsar, frb, None)

  • src_name (str) – Source name (default: None)

  • dmgal (float) – galactic maximum DM

  • pointing (astropy.coordinates.SkyCoord) – Pointing for LOFAR triggering (default: None)

  • skip_lofar (bool) – Skip LOFAR triggering (default: False)

_get_pointing()

Get pointing of this CB from parset

Returns

pointing SkyCoord

_get_source()

Try to get DM for a known source

Returns

DM for known source, else None

_load_parset(obs_config)

Load the observation parset

Parameters

obs_config (dict) – Observation config

Returns

parset as dict

_load_source_list()

Load the list with known source DMs

Returns

source list with dict per category

_process_triggers()

Read thresholds (DM, width, S/N) for clustering

Continuously read AMBER triggers from queue and start processing for known and/or new sources

lofar_connector()

Connect to the LOFAR triggering system on the master node

process_command(command)

Process command received from queue

Parameters

command (dict) – Command to process

start_observation(obs_config, reload=True)

Parse obs config and start listening for amber triggers on queue

Parameters
  • obs_config (dict) – Observation configuration

  • reload (bool) – reload service settings (default: True)

stop_observation(*args, **kwargs)

Stop observation

voevent_connector()

Connect to the VOEvent generator on the master node

exception darc.amber_clustering.AMBERClusteringException

Bases: Exception

IQUV triggering

class darc.dada_trigger.DADATrigger(*args, **kwargs)

Bases: DARCBase

Generate and send dada_dbevent triggers

_load_parset(obs_config)

Load the observation parset

Parameters

obs_config (dict) – Observation config

Returns

parset as dict

cleanup()

Remove all trigger-sending threads

polcal_dumps(obs_config)

Automatically dump IQUV data at regular intervals for polcal calibrator observations

Parameters

obs_config (dict) – Observation config

process_command(command)

Process command received from queue

Parameters

command (dict) – command with arguments

send_event(triggers)

Send trigger to dada_dbevent

Parameters

triggers (list) – list of trigger dictionaries

send_events(event, stokes)

Send stokes I or IQUV events

Parameters
  • event (str) – raw event to send

  • stokes (str) – I or IQUV

Returns

start_observation(obs_config, reload=True)

Start observation: run IQUV dumps automatically if source is polarisation calibrator. Else ensure normal FRB candidate I/IQUV dumps are enabled

Parameters
  • obs_config (dict) – Observation config

  • reload (bool) – reload service settings (default: True)

exception darc.dada_trigger.DADATriggerException

Bases: Exception

LOFAR triggering

class darc.lofar_trigger.LOFARTrigger(source_queue, *args, config_file='/opt/hostedtoolcache/Python/3.10.8/x64/lib/python3.10/site-packages/darc/config.yaml', control_queue=None, **kwargs)

Bases: Process

Select brightest trigger from incoming trigger and send to LOFAR for TBB triggering

Parameters
  • source_queue (Queue) – Input queue for controlling this service

  • config_file (str) – Path to config file

  • control_queue (Queue) – Control queue of parent Process

_get_attribute(command)

Get attribute as given in input command

Parameters

command (dict) – Command received over queue

_new_trigger(dm, utc, nu_GHz=1.37, test=None)

Create a LOFAR trigger struct

Parameters
  • dm (float) – Dispersion measure (pc cm**-3)

  • utc (str) – UTC arrival time in ISOT format

  • nu_GHz (float) – Apertif centre frequency (GHz)

  • test (bool) – Whether to send a test event or observation event

static _select_trigger(triggers)

Select trigger with highest S/N from a list of triggers. If there are triggers from both known and new sources, select the known source

Parameters

triggers (list) – one dict per trigger

Returns

trigger with highest S/N and number of unique CBs in trigger list

_switch_command(command)

Check status or enable/disable sending of events

create_and_send(trigger)

Create LOFAR trigger struct

Event is only sent if enabled in config

Parameters

trigger (list/dict) – Trigger event(s). dict if one event, list of dicts if multiple events

run()

Main loop

Read triggers from queue and process them

send_email(trigger)

Send email upon LOFAR trigger

Consider running this method in a try/except block, as sending emails might fail in case of network interrupts

Parameters

trigger (dict) – Trigger as sent to LOFAR

stop()

Stop this service

exception darc.lofar_trigger.LOFARTriggerException

Bases: Exception

class darc.lofar_trigger.LOFARTriggerQueueServer(address=None, authkey=None, serializer='pickle', ctx=None)

Bases: BaseManager

Server for LOFAR Trigger input queue

Sending VOEvents

class darc.voevent_generator.VOEventGenerator(source_queue, *args, config_file='/opt/hostedtoolcache/Python/3.10.8/x64/lib/python3.10/site-packages/darc/config.yaml', control_queue=None, **kwargs)

Bases: Process

Convert incoming triggers to VOEvent and send to the VOEvent broker

Parameters
  • source_queue (Queue) – Input queue for controlling this service

  • config_file (str) – Path to config file

  • control_queue (Queue) – Control queue of parent Process

_NewVOEvent(dm, dm_err, width, snr, flux, ra, dec, semiMaj, semiMin, ymw16, name, importance, utc, gl, gb, gain, dt=0.08192, delta_nu_MHz=0.1953125, nu_GHz=1.37, posang=0, test=None)

Create a VOEvent

Parameters
  • dm (float) – Dispersion measure (pc cm**-3)

  • dm_err (float) – Error on DM (pc cm**-3)

  • width (float) – Pulse width (ms)

  • snr (float) – Signal-to-noise ratio

  • flux (float) – flux density (mJy)

  • ra (float) – Right ascension (deg)

  • dec (float) – Declination (deg)

  • semiMaj (float) – Localisation region semi-major axis (arcmin)

  • semiMin (float) – Localisation region semi-minor axis (arcmin)

  • ymw16 (float) – YMW16 DM (pc cm**-3)

  • name (str) – Source name

  • importance (float) – Trigger importance (0-1)

  • utc (str) – UTC arrival time in ISOT format

  • gl (float) – Galactic longitude (deg)

  • gb (float) – Galactic latitude (deg)

  • gain (float) – Telescope gain (K Jy**-1)

  • dt (float) – Telescope time resolution (ms)

  • delta_nu_MHz (float) – Telescope frequency channel width (MHz)

  • nu_GHz (float) – Telescope centre frequency (GHz)

  • posang (float) – Localisation region position angle (deg)

  • test (bool) – Whether to send a test event or observation event

_get_attribute(command)

Get attribute as given in input command

Parameters

command (dict) – Command received over queue

static _select_trigger(triggers)

Select trigger with highest S/N from a list of triggers

Parameters

triggers (list) – one dict per trigger

Returns

trigger with highest S/N and number of unique CBs in trigger list

_switch_command(command)

Check status or enable/disable sending of events

create_and_send(trigger)

Create voevent

Event is only sent if enabled in config

Parameters

trigger (list/dict) – Trigger event(s). dict if one event, list of dicts if multiple events

run()

Main loop

Read triggers from queue and process them

stop()

Stop this service

exception darc.voevent_generator.VOEventGeneratorException

Bases: Exception

class darc.voevent_generator.VOEventQueueServer(address=None, authkey=None, serializer='pickle', ctx=None)

Bases: BaseManager

Server for VOEvent input queue

Offline data processing

class darc.offline_processing.OfflineProcessing(source_queue, *args, config_file='/opt/hostedtoolcache/Python/3.10.8/x64/lib/python3.10/site-packages/darc/config.yaml', control_queue=None, **kwargs)

Bases: Process

Full offline processing pipeline:

  1. Candidate clustering

  2. Extraction of filterbank data

  3. ML classification

  4. Email to astronomers

Also includes:

  • Automated pulsar folding

  • Automated run of calibration tools for drift scans

  • Automated run of known FRB candidate extractor

Parameters
  • source_queue (Queue) – Input queue

  • config_file (str) – Path to config file

  • control_queue (Queue) – Control queue of parent Process

_classify(obs_config, input_file)

Run the ML classifier

Parameters
  • obs_config (dict) – Observation config

  • input_file (str) – HDF5 file to process

Returns

prefix of output figure path

_cluster(obs_config, filterbank_name, tab=None, ind=None, sbmin=None, sbmax=None, out=None)

Run triggers.py, which takes care of clustering and extracting filterbank data

Parameters
  • obs_config (dict) – Observation config

  • filterbank_name (str) – Full path to filterbank file (TAB/IAB) or prefix (SB) to use

  • tab (int) – TAB number to process (0 for IAB, absent/None for SB)

  • ind (int) – Index of out array where to store results

  • sbmin (int) – First SB to process (SB mode only)

  • sbmax (int) – Last SB to process (SB mode only)

  • out (np.ndarray) – output array where return value is put at index <ind> (optional)

Returns

number of grouped candidates (put in out array, else returned)

_fold_pulsar(source, obs_config)

Fold pulsar with PRESTO

Parameters
  • source (str) – pulsar name including B or J

  • obs_config (dict) – Observation config

_gather_results(obs_config, **kwargs)

Gather output results and put in central location

Parameters
  • obs_config (dict) – Observation config

  • kwargs (dict) – number of candidates, output prefixes; these are added to obs_config

_get_attribute(command)

Get attribute as given in input command

Parameters

command (dict) – Command received over queue

_get_coordinates(obs_config)

Generate coordinates file from the pointing directions

File contains RA, Dec, gl, gb for each CB used in this observation

Parameters

obs_config (dict) – Observation config

Returns

ra, deg for CB00 (decimal deg) if available, else None

_get_overview(obs_config)

Generate observation overview file

Parameters

obs_config (dict) – Observation config

_load_parset(obs_config)

Load the observation parset

Parameters

obs_config (dict) – Observation config

Returns

parset as dict

_merge_hdf5(obs_config, output_file)

Merge HDF5 files generated by clustering

Parameters
  • obs_config (dict) – Observation config

  • output_file (str) – Filename of merged HDF5 data

Returns

Number of triggers in combined HDF5 file

_merge_plots(obs_config)

Merge classifier output plots into one pdf

Parameters

obs_config (dict) – Observation config

_merge_triggers(obs_config)

Merge AMBER triggers into one file

Parameters

obs_config (dict) – Observation config

_plot_known_frb_cands(obs_config, coord_cb00)

Call external script that plots candidates of known FRBs

Parameters
  • obs_config (dict) – Observation config

  • coord_cb00 (list) – [ra, dec] of CB00 in decimal degrees

_run_calibration_tools(source, obs_config)
Parameters
  • source (str) – Source name (like 3C296, not 3C286drift0107)

  • obs_config (dict) – Observation config

_start_observation_master(obs_config, reload=True)

Start observation on master node

Generated observation summary and send email once all workers are done

Parameters
  • obs_config (dict) – Observation config

  • reload (bool) – reload service settings (default: True)

_start_observation_worker(obs_config, reload=True)

Start observation on worker node:

  1. Candidate clustering

  2. Extraction of filterbank data

  3. ML classification

Parameters
  • obs_config (dict) – Observation config

  • reload (bool) – reload service settings (default: True)

load_config()
run()

Wait for start_observation command, then run offline processing pipeline. Executed commands depend on whether this is a worker node or the master node.

stop()

Stop this service

exception darc.offline_processing.OfflineProcessingException

Bases: Exception

Real-time data processing (worker node)

class darc.processor.Processor(log_queue, *args, **kwargs)

Bases: DARCBase

Real-time processing of candidates

  1. Clustering + thresholding

  2. Extract data from filterbank

  3. Run classifier

  4. Visualize candidates

After observation finishes, results are gathered in a central location to be picked up by the master node

Parameters

log_queue (Queue) – Queue to use for logging

_finish_processing()

Wait for real-time processing to finish and visualize results

_get_timeout()

Get procesing time limit

Returns

time limit in seconds (float) or None if no limit

_join_with_timeout(name, timeout)

Signal a process to stop. Terminate if timeout is reached

Parameters
  • name (str) – name of Process in self.threads dict to join

  • timeout (float) – timeout in seconds (None for no time limit)

_read_amber_triggers()

Read AMBER triggers for reprocessing of an observation. Based on AMBERListener

_read_and_process_data()

Process incoming AMBER triggers

_reorder_clusters()

Reorder clusters ready for data extraction to highest-S/N first. This is used such that bright candidates are prioritized when there is a processing time limit

_store_obs_stats()

Store observation statistics to central result directory

process_command(command)

Process command received from queue

Parameters

command (dict) – Command to process

start_observation(obs_config, reload=True)

Parse obs config and start listening for amber triggers on queue

Parameters
  • obs_config (dict) – Observation configuration

  • reload (bool) – reload service settings (default: True)

stop(abort=None)

Stop this service

Parameters

abort (bool) – Ignored, a stop of the service always equals abort

stop_observation(abort=False)

Stop observation

Parameters

abort (bool) – Whether or not to abort the observation

exception darc.processor.ProcessorException

Bases: Exception

class darc.processor.ProcessorManager(*args, **kwargs)

Bases: DARCBase

Control logic for running several Processor instances, one per observation

_load_parset(obs_config)

Load the observation parset

Parameters

obs_config (dict) – Observation config

Returns

parset as dict

process_command(command)

Forward any data from the input queue to the running observation

processing_status_generator()

At regular interval, create status file for processing website

run()

Main loop. Create thread scavenger, then run parent class run method

start_observation(obs_config, reload=True)

Initialize a Processor and call its start_observation

stop(abort=False)

Stop this service

Parameters

abort (bool) – Ignored; a stop of the manager always equals an abort

stop_observation(obs_config)

Stop observation with task ID as given in parset

Parameters

obs_config (dict) – Observation config

thread_scavenger()

Remove any finished threads at regular intervals

Real-time data processing (master node)

class darc.processor_master.ProcessorMaster(log_queue, *args, **kwargs)

Bases: DARCBase

Combine results from worker node processors

Parameters

log_queue (Queue) – Queue to use for logging

_check_node_online(node)

Check if the processor on a node is still online and processing the current observation

Parameters

node (str) – Hostname of node to check

Returns

status (bool): True if node is online, else False

_generate_info_file()

Generate observation info files

Returns

info (dict), coordinates of each CB (dict)

_get_result_dir()

Get result directory from worker processor config

_process_observation()

Process observation

_process_results(info, coordinates)

Load statistics and plots from the nodes. Copy to website directory and return data to be sent as email

Parameters
  • info (dict) – Observation info summary

  • coordinates (dict) – Coordinates of every CB in the observation

Returns

email (str), attachments (list)

_publish_results(body, files)

Publish email content as local website

_send_email(email, attachments)

Send email with observation results

Parameters
  • email (str) – Email body

  • attachments (list) – Attachments

_send_warning(node)

Send a warning email about a node

Parameters

node (str) – Node to send warning about

_wait_for_workers()

Wait for all worker nodes to finish processing this observation

start_observation(obs_config, reload=True)

Parse obs config and start observation processing after end time has passed

Parameters
  • obs_config (dict) – Observation configuration

  • reload (bool) – reload service settings (default: True)

stop(abort=None)

Stop this service

Parameters

abort (bool) – Ignored, a stop of the service always equals abort

stop_observation(abort=False)

Stop observation

Parameters

abort (bool) – Whether or not to abort the observation

class darc.processor_master.ProcessorMasterManager(*args, **kwargs)

Bases: DARCBase

Control logic for running several ProcessorMaster instances, one per observation

_load_parset(obs_config)

Load the observation parset

Parameters

obs_config (dict) – Observation config

Returns

parset as dict

processing_status_generator()

At regular interval, create status file for processing website

run()

Main loop. Create thread scavenger, then run parent class run method

start_observation(obs_config, reload=True)

Initialize a ProcessorMaster and call its start_observation

stop(abort=False)

Stop this service

Parameters

abort (bool) – Ignored; a stop of the manager always equals an abort

stop_observation(obs_config)

Stop observation with task ID as given in parset

Parameters

obs_config (dict) – Observation config

thread_scavenger()

Remove any finished threads at regular intervals

Monitoring

class darc.status_website.StatusWebsite(source_queue, *args, config_file='/opt/hostedtoolcache/Python/3.10.8/x64/lib/python3.10/site-packages/darc/config.yaml', control_queue=None, **kwargs)

Bases: Process

Generate a HTML page with the status of each service across the ARTS cluster at regular intervals

Parameters
  • source_queue (Queue) – Input queue

  • config_file (str) – Path to config file

  • control_queue (Queue) – Control queue of parent Process

_get_attribute(command)

Get attribute as given in input command

Parameters

command (dict) – Command received over queue

check_command()

Check if this service should execute a command

static get_template()

Return HTML template

Split into header and footer

Returns

header, footer

make_offline_page()

Create page for when status website is offline

publish_status(statuses)

Publish status as simple html webpage

Parameters

statuses (dict) – Status of each service across all nodes

run()

Main loop:

  1. Get status of all services across all nodes

  2. Publish HTML page with statuses

  3. Generate offline page upon exit

stop()

Stop this service

exception darc.status_website.StatusWebsiteException

Bases: Exception