Source code for callhorizons

"""CALLHORIZONS - a Python interface to access JPL HORIZONS
ephemerides and orbital elements.

This module provides a convenient python interface to the JPL
HORIZONS system by directly accessing and parsing the HORIZONS
website. Ephemerides can be obtained through get_ephemerides,
orbital elements through get_elements. Function
export2pyephem provides an interface to the PyEphem module.

michael.mommert (at) nau.edu, latest version: v1.0.5, 2017-05-05.
This code is inspired by code created by Alex Hagen.

* v1.
* v1.0.5: 15-epoch limit for set_discreteepochs removed
* v1.0.4: improved asteroid and comet name parsing
* v1.0.3: ObsEclLon and ObsEclLat added to get_ephemerides
* v1.0.2: Python 3.5 compatibility implemented
* v1.0.1: get_ephemerides fixed
* v1.0:   bugfixes completed, planets/satellites accessible, too
* v0.9:   first release


"""

from __future__ import (print_function, unicode_literals)

import re
import sys
import time
import numpy as np
import warnings
try:
    # Python 3
    import urllib.request as urllib
except ImportError:
    # Python 2
    import urllib2 as urllib

warnings.filterwarnings('once', category=DeprecationWarning)
warnings.warn(('CALLHORIZONS is not maintained anymore; please use '
               'astroquery.jplhorizons instead (https://github.com/'
               'astropy/astroquery)'),
              DeprecationWarning)


def _char2int(char):
    """ translate characters to integer values (upper and lower case)"""
    if char.isdigit():
        return int(float(char))
    if char.isupper():
        return int(char, 36)
    else:
        return 26 + int(char, 36)


[docs]class query(): # constructor
[docs] def __init__(self, targetname, smallbody=True, cap=True, nofrag=False, comet=False, asteroid=False): """Initialize query to Horizons :param targetname: HORIZONS-readable target number, name, or designation :param smallbody: boolean use ``smallbody=False`` if targetname is a planet or spacecraft (optional, default: `True`); also use `True` if the targetname is exact and should be queried as is :param cap: set to `True` to return the current apparition for comet targets :param nofrag: set to `True` to disable HORIZONS's comet fragment search :param comet: set to `True` if this is a comet (will override automatic targetname parsing) :param asteroid: set to `True` if this is an asteroid (will override automatic targetname parsing) :return: None """ self.targetname = str(targetname) self.not_smallbody = not smallbody self.cap = cap self.nofrag = nofrag self.comet = comet # is this object a comet? self.asteroid = asteroid # is this object an asteroid? self.start_epoch = None self.stop_epoch = None self.step_size = None self.discreteepochs = None self.url = None self.data = None assert not ( self.comet and self.asteroid), 'Only one of comet or asteroid can be `True`.' return None
# small body designation parsing
[docs] def parse_comet(self): """Parse `targetname` as if it were a comet. :return: (string or None, int or None, string or None); The designation, number and prefix, and name of the comet as derived from `self.targetname` are extracted into a tuple; each element that does not exist is set to `None`. Parenthesis in `self.targetname` will be ignored. :example: the following table shows the result of the parsing: +--------------------------------+--------------------------------+ |targetname |(desig, prefixnumber, name) | +================================+================================+ |1P/Halley |(None, '1P', 'Halley') | +--------------------------------+--------------------------------+ |3D/Biela |(None, '3D', 'Biela') | +--------------------------------+--------------------------------+ |9P/Tempel 1 |(None, '9P', 'Tempel 1') | +--------------------------------+--------------------------------+ |73P/Schwassmann Wachmann 3 C |(None, '73P', | | |'Schwassmann Wachmann 3 C') | +--------------------------------+--------------------------------+ |73P-C/Schwassmann Wachmann 3 C |(None, '73P-C', | | |'Schwassmann Wachmann 3 C') | +--------------------------------+--------------------------------+ |73P-BB |(None, '73P-BB', None) | +--------------------------------+--------------------------------+ |322P |(None, '322P', None) | +--------------------------------+--------------------------------+ |X/1106 C1 |('1166 C1', 'X', None) | +--------------------------------+--------------------------------+ |P/1994 N2 (McNaught-Hartley) |('1994 N2', 'P', | | |'McNaught-Hartley') | +--------------------------------+--------------------------------+ |P/2001 YX127 (LINEAR) |('2001 YX127', 'P', 'LINEAR') | +--------------------------------+--------------------------------+ |C/-146 P1 |('-146 P1', 'C', None) | +--------------------------------+--------------------------------+ |C/2001 A2-A (LINEAR) |('2001 A2-A', 'C', 'LINEAR') | +--------------------------------+--------------------------------+ |C/2013 US10 |('2013 US10', 'C', None) | +--------------------------------+--------------------------------+ |C/2015 V2 (Johnson) |('2015 V2', 'C', 'Johnson') | +--------------------------------+--------------------------------+ |C/2016 KA (Catalina) |('2016 KA', 'C', 'Catalina') | +--------------------------------+--------------------------------+ """ import re pat = ('^(([1-9]+[PDCXAI](-[A-Z]{1,2})?)|[PDCXAI]/)' + # prefix [0,1,2] '|([-]?[0-9]{3,4}[ _][A-Z]{1,2}([0-9]{1,3})?(-[1-9A-Z]{0,2})?)' + # designation [3,4] ('|(([A-Z][a-z]?[A-Z]*[a-z]*[ -]?[A-Z]?[1-9]*[a-z]*)' + '( [1-9A-Z]{1,2})*)') # name [5,6] ) m = re.findall(pat, self.targetname.strip()) # print(m) prefixnumber = None desig = None name = None if len(m) > 0: for el in m: # prefix/number if len(el[0]) > 0: prefixnumber = el[0].replace('/', '') # designation if len(el[3]) > 0: desig = el[3].replace('_', ' ') # name if len(el[5]) > 0: if len(el[5]) > 1: name = el[5] return (desig, prefixnumber, name)
[docs] def parse_asteroid(self): """Parse `targetname` as if it were a asteroid. :return: (string or None, int or None, string or None); The designation, number, and name of the asteroid as derived from `self.targetname` are extracted into a tuple; each element that does not exist is set to `None`. Parenthesis in `self.targetname` will be ignored. Packed designations and numbers are unpacked. :example: the following table shows the result of the parsing: +--------------------------------+---------------------------------+ |targetname |(desig, number, name) | +================================+=================================+ |1 |(None, 1, None) | +--------------------------------+---------------------------------+ |2 Pallas |(None, 2, Pallas) | +--------------------------------+---------------------------------+ |\(2001\) Einstein |(None, 2001, Einstein) | +--------------------------------+---------------------------------+ |1714 Sy |(None, 1714, Sy) | +--------------------------------+---------------------------------+ |2014 MU69 |(2014 MU69, None, None) | +--------------------------------+---------------------------------+ |(228195) 6675 P-L |(6675 P-L, 228195, None) | +--------------------------------+---------------------------------+ |4101 T-3 |(4101 T-3, None, None) | +--------------------------------+---------------------------------+ |4015 Wilson-Harrington (1979 VA)|(1979 VA, 4015, Wilson-Harrington| +--------------------------------+---------------------------------+ |J95X00A |(1995 XA, None, None) | +--------------------------------+---------------------------------+ |K07Tf8A |(2007 TA418, None, None) | +--------------------------------+---------------------------------+ |G3693 |(None, 163693, None) | +--------------------------------+---------------------------------+ |2017 U1 |(None, None, None) | +--------------------------------+---------------------------------+ """ pat = ('(([1-2][0-9]{0,3}[ _][A-Z]{2}[0-9]{0,3})' # designation [0,1] '|([1-9][0-9]{3}[ _](P-L|T-[1-3])))' # Palomar-Leiden [0,2,3] '|([IJKL][0-9]{2}[A-Z][0-9a-z][0-9][A-Z])' # packed desig [4] '|([A-Za-z][0-9]{4})' # packed number [5] '|([A-Z][A-Z]*[a-z][a-z]*[^0-9]*' '[ -]?[A-Z]?[a-z]*[^0-9]*)' # name [6] '|([1-9][0-9]*(\b|$))') # number [7,8] # regex patterns that will be ignored as they might cause # confusion non_pat = ('([1-2][0-9]{0,3}[ _][A-Z][0-9]*(\b|$))') # comet desig if sys.version_info > (3, 0): raw = self.targetname.translate(str.maketrans('()', ' ')).strip() else: import string raw = self.targetname.translate(string.maketrans('()', ' ')).strip() # reject non_pat patterns non_m = re.findall(non_pat, raw) # print('reject', raw, non_m) if len(non_m) > 0: for ps in non_m: for p in ps: if p == '': continue raw = raw[:raw.find(p)] + raw[raw.find(p)+len(p):] # match target patterns m = re.findall(pat, raw) # print(raw, m) desig = None number = None name = None if len(m) > 0: for el in m: # designation if len(el[0]) > 0: desig = el[0] # packed designation (unpack here) elif len(el[4]) > 0: ident = el[4] # old designation style, e.g.: 1989AB if (len(ident.strip()) < 7 and ident[:4].isdigit() and ident[4:6].isalpha()): desig = ident[:4]+' '+ident[4:6] # Palomar Survey elif ident.find("PLS") == 0: desig = ident[3:] + " P-L" # Trojan Surveys elif ident.find("T1S") == 0: desig = ident[3:] + " T-1" elif ident.find("T2S") == 0: desig = ident[3:] + " T-2" elif ident.find("T3S") == 0: desig = ident[3:] + " T-3" # insert blank in designations elif (ident[0:4].isdigit() and ident[4:6].isalpha() and ident[4] != ' '): desig = ident[:4]+" "+ident[4:] # MPC packed 7-digit designation elif (ident[0].isalpha() and ident[1:3].isdigit() and ident[-1].isalpha() and ident[-2].isdigit()): yr = str(_char2int(ident[0]))+ident[1:3] let = ident[3]+ident[-1] num = str(_char2int(ident[4]))+ident[5] num = num.lstrip("0") desig = yr+' '+let+num # nothing to do else: desig = ident # packed number (unpack here) elif len(el[5]) > 0: ident = el[5] number = ident = int(str(_char2int(ident[0]))+ident[1:]) # number elif len(el[7]) > 0: if sys.version_info > (3, 0): number = int(float(el[7].translate(str.maketrans('()', ' ')))) else: import string number = int(float(el[7].translate(string.maketrans('()', ' ')))) # name (strip here) elif len(el[6]) > 0: if len(el[6].strip()) > 1: name = el[6].strip() return (desig, number, name)
[docs] def isorbit_record(self): """`True` if `targetname` appears to be a comet orbit record number. NAIF record numbers are 6 digits, begin with a '9' and can change at any time. """ import re test = re.match('^9[0-9]{5}$', self.targetname.strip()) is not None return test
[docs] def iscomet(self): """`True` if `targetname` appears to be a comet. """ # treat this object as comet if there is a prefix/number if self.comet is not None: return self.comet elif self.asteroid is not None: return not self.asteroid else: return (self.parse_comet()[0] is not None or self.parse_comet()[1] is not None)
[docs] def isasteroid(self): """`True` if `targetname` appears to be an asteroid.""" if self.asteroid is not None: return self.asteroid elif self.comet is not None: return not self.comet else: return any(self.parse_asteroid()) is not None
# set epochs
[docs] def set_epochrange(self, start_epoch, stop_epoch, step_size): """Set a range of epochs, all times are UT :param start_epoch: str; start epoch of the format 'YYYY-MM-DD [HH-MM-SS]' :param stop_epoch: str; final epoch of the format 'YYYY-MM-DD [HH-MM-SS]' :param step_size: str; epoch step size, e.g., '1d' for 1 day, '10m' for 10 minutes... :return: None :example: >>> import callhorizons >>> ceres = callhorizons.query('Ceres') >>> ceres.set_epochrange('2016-02-26', '2016-10-25', '1d') Note that dates are mandatory; if no time is given, midnight is assumed. """ self.start_epoch = start_epoch self.stop_epoch = stop_epoch self.step_size = step_size return None
[docs] def set_discreteepochs(self, discreteepochs): """Set a list of discrete epochs, epochs have to be given as Julian Dates :param discreteepochs: array_like list or 1D array of floats or strings :return: None :example: >>> import callhorizons >>> ceres = callhorizons.query('Ceres') >>> ceres.set_discreteepochs([2457446.177083, 2457446.182343]) """ if not isinstance(discreteepochs, (list, np.ndarray)): discreteepochs = [discreteepochs] self.discreteepochs = list(discreteepochs)
# data access functions @property def fields(self): """returns list of available properties for all epochs""" try: return self.data.dtype.names except AttributeError: return []
[docs] def __len__(self): """returns total number of epochs that have been queried""" try: # Cast to int because a long is returned from shape on Windows. return int(self.data.shape[0]) except AttributeError: return 0
@property def dates(self): """returns list of epochs that have been queried (format 'YYYY-MM-DD HH-MM-SS')""" try: return self.data['datetime'] except: return [] @property def query(self): """returns URL that has been used in calling HORIZONS""" try: return self.url except: return [] @property def dates_jd(self): """returns list of epochs that have been queried (Julian Dates)""" try: return self.data['datetime_jd'] except: return []
[docs] def __repr__(self): """returns brief query information""" return "<callhorizons.query object: %s>" % self.targetname
[docs] def __str__(self): """returns information on the current query as string""" output = "targetname: %s\n" % self.targetname if self.discreteepochs is not None: output += "discrete epochs: %s\n" % \ " ".join([str(epoch) for epoch in self.discreteepochs]) if (self.start_epoch is not None and self.stop_epoch is not None and self.step_size is not None): output += "epoch range from %s to %s in steps of %s\n" % \ (self.start_epoch, self.stop_epoch, self.step_size) output += "%d data sets queried with %d different fields" % \ (len(self), len(self.fields)) return output
[docs] def __getitem__(self, key): """provides access to query data :param key: str/int; epoch index or property key :return: query data according to key """ # check if data exist if self.data is None or len(self.data) == 0: print('CALLHORIZONS ERROR: run get_ephemerides or get_elements', 'first') return None return self.data[key]
# call functions
[docs] def get_ephemerides(self, observatory_code, airmass_lessthan=99, solar_elongation=(0, 180), skip_daylight=False): """Call JPL HORIZONS website to obtain ephemerides based on the provided targetname, epochs, and observatory_code. For a list of valid observatory codes, refer to http://minorplanetcenter.net/iau/lists/ObsCodesF.html :param observatory_code: str/int; observer's location code according to Minor Planet Center :param airmass_lessthan: float; maximum airmass (optional, default: 99) :param solar_elongation: tuple; permissible solar elongation range (optional, deg) :param skip_daylight: boolean; crop daylight epoch during query (optional) :result: int; number of epochs queried :example: >>> ceres = callhorizons.query('Ceres') >>> ceres.set_epochrange('2016-02-23 00:00', '2016-02-24 00:00', '1h') >>> print (ceres.get_ephemerides(568), 'epochs queried') The queried properties and their definitions are: +------------------+-----------------------------------------------+ | Property | Definition | +==================+===============================================+ | targetname | official number, name, designation [string] | +------------------+-----------------------------------------------+ | H | absolute magnitude in V band (float, mag) | +------------------+-----------------------------------------------+ | G | photometric slope parameter (float) | +------------------+-----------------------------------------------+ | datetime | epoch date and time (str, YYYY-MM-DD HH:MM:SS)| +------------------+-----------------------------------------------+ | datetime_jd | epoch Julian Date (float) | +------------------+-----------------------------------------------+ | solar_presence | information on Sun's presence (str) | +------------------+-----------------------------------------------+ | lunar_presence | information on Moon's presence (str) | +------------------+-----------------------------------------------+ | RA | target RA (float, J2000.0) | +------------------+-----------------------------------------------+ | DEC | target DEC (float, J2000.0) | +------------------+-----------------------------------------------+ | RA_rate | target rate RA (float, arcsec/s) | +------------------+-----------------------------------------------+ | DEC_rate | target RA (float, arcsec/s, includes cos(DEC))| +------------------+-----------------------------------------------+ | AZ | Azimuth meas East(90) of North(0) (float, deg)| +------------------+-----------------------------------------------+ | EL | Elevation (float, deg) | +------------------+-----------------------------------------------+ | airmass | target optical airmass (float) | +------------------+-----------------------------------------------+ | magextinct | V-mag extinction due airmass (float, mag) | +------------------+-----------------------------------------------+ | V | V magnitude (comets: total mag) (float, mag) | +------------------+-----------------------------------------------+ | illumination | fraction of illuminated disk (float) | +------------------+-----------------------------------------------+ | EclLon | heliocentr. ecl. long. (float, deg, J2000.0) | +------------------+-----------------------------------------------+ | EclLat | heliocentr. ecl. lat. (float, deg, J2000.0) | +------------------+-----------------------------------------------+ | ObsEclLon | obscentr. ecl. long. (float, deg, J2000.0) | +------------------+-----------------------------------------------+ | ObsEclLat | obscentr. ecl. lat. (float, deg, J2000.0) | +------------------+-----------------------------------------------+ | r | heliocentric distance (float, au) | +------------------+-----------------------------------------------+ | r_rate | heliocentric radial rate (float, km/s) | +------------------+-----------------------------------------------+ | delta | distance from the observer (float, au) | +------------------+-----------------------------------------------+ | delta_rate | obs-centric radial rate (float, km/s) | +------------------+-----------------------------------------------+ | lighttime | one-way light time (float, s) | +------------------+-----------------------------------------------+ | elong | solar elongation (float, deg) | +------------------+-----------------------------------------------+ | elongFlag | app. position relative to Sun (str) | +------------------+-----------------------------------------------+ | alpha | solar phase angle (float, deg) | +------------------+-----------------------------------------------+ | sunTargetPA | PA of Sun->target vector (float, deg, EoN) | +------------------+-----------------------------------------------+ | velocityPA | PA of velocity vector (float, deg, EoN) | +------------------+-----------------------------------------------+ | GlxLon | galactic longitude (float, deg) | +------------------+-----------------------------------------------+ | GlxLat | galactic latitude (float, deg) | +------------------+-----------------------------------------------+ | RA_3sigma | 3sigma pos. unc. in RA (float, arcsec) | +------------------+-----------------------------------------------+ | DEC_3sigma | 3sigma pos. unc. in DEC (float, arcsec) | +------------------+-----------------------------------------------+ """ # queried fields (see HORIZONS website for details) # if fields are added here, also update the field identification below quantities = '1,3,4,8,9,10,18,19,20,21,23,24,27,31,33,36' # encode objectname for use in URL objectname = urllib.quote(self.targetname.encode("utf8")) # construct URL for HORIZONS query url = "https://ssd.jpl.nasa.gov/horizons_batch.cgi?batch=l" \ + "&TABLE_TYPE='OBSERVER'" \ + "&QUANTITIES='" + str(quantities) + "'" \ + "&CSV_FORMAT='YES'" \ + "&ANG_FORMAT='DEG'" \ + "&CAL_FORMAT='BOTH'" \ + "&SOLAR_ELONG='" + str(solar_elongation[0]) + "," \ + str(solar_elongation[1]) + "'" \ + "&CENTER='"+str(observatory_code)+"'" if self.not_smallbody: url += "&COMMAND='" + \ urllib.quote(self.targetname.encode("utf8")) + "'" elif self.cap and self.comet: for ident in self.parse_comet(): if ident is not None: break if ident is None: ident = self.targetname url += "&COMMAND='DES=" + \ urllib.quote(ident.encode("utf8")) + "%3B" + \ ("CAP'" if self.cap else "'") elif self.isorbit_record(): # Comet orbit record. Do not use DES, CAP. This test must # occur before asteroid test. url += "&COMMAND='" + \ urllib.quote(self.targetname.encode("utf8")) + "%3B'" elif self.isasteroid() and not self.comet: # for asteroids, use 'DES="designation";' for ident in self.parse_asteroid(): if ident is not None: break if ident is None: ident = self.targetname url += "&COMMAND='" + \ urllib.quote(str(ident).encode("utf8")) + "%3B'" elif self.iscomet() and not self.asteroid: # for comets, potentially append the current apparition # (CAP) parameter, or the fragmentation flag (NOFRAG) for ident in self.parse_comet(): if ident is not None: break if ident is None: ident = self.targetname url += "&COMMAND='DES=" + \ urllib.quote(ident.encode("utf8")) + "%3B" + \ ("NOFRAG%3B" if self.nofrag else "") + \ ("CAP'" if self.cap else "'") # elif (not self.targetname.replace(' ', '').isalpha() and not # self.targetname.isdigit() and not # self.targetname.islower() and not # self.targetname.isupper()): # # lower case + upper case + numbers = pot. case sensitive designation # url += "&COMMAND='DES=" + \ # urllib.quote(self.targetname.encode("utf8")) + "%3B'" else: url += "&COMMAND='" + \ urllib.quote(self.targetname.encode("utf8")) + "%3B'" if self.discreteepochs is not None: url += "&TLIST=" for date in self.discreteepochs: url += "'" + str(date) + "'" elif (self.start_epoch is not None and self.stop_epoch is not None and self.step_size is not None): url += "&START_TIME='" \ + urllib.quote(self.start_epoch.encode("utf8")) + "'" \ + "&STOP_TIME='" \ + urllib.quote(self.stop_epoch.encode("utf8")) + "'" \ + "&STEP_SIZE='" + str(self.step_size) + "'" else: raise IOError('no epoch information given') if airmass_lessthan < 99: url += "&AIRMASS='" + str(airmass_lessthan) + "'" if skip_daylight: url += "&SKIP_DAYLT='YES'" else: url += "&SKIP_DAYLT='NO'" self.url = url # print (url) # call HORIZONS i = 0 # count number of connection tries while True: try: src = urllib.urlopen(url).readlines() break except urllib.URLError: time.sleep(0.1) # in case the HORIZONS website is blocked (due to another query) # wait 0.1 second and try again i += 1 if i > 50: return 0 # website could not be reached # disseminate website source code # identify header line and extract data block (ephemerides data) # also extract targetname, absolute mag. (H), and slope parameter (G) headerline = [] datablock = [] in_datablock = False H, G = np.nan, np.nan for idx, line in enumerate(src): line = line.decode('UTF-8') if "Date__(UT)__HR:MN" in line: headerline = line.split(',') if "$$EOE\n" in line: in_datablock = False if in_datablock: datablock.append(line) if "$$SOE\n" in line: in_datablock = True if "Target body name" in line: targetname = line[18:50].strip() if ("rotational period in hours)" in src[idx].decode('UTF-8')): HGline = src[idx+2].decode('UTF-8').split('=') if 'B-V' in HGline[2] and 'G' in HGline[1]: try: H = float(HGline[1].rstrip('G')) except ValueError: pass try: G = float(HGline[2].rstrip('B-V')) except ValueError: pass if ("Multiple major-bodies match string" in src[idx].decode('UTF-8') or ("Matching small-bodies" in src[idx].decode('UTF-8') and not "No matches found" in src[idx+1].decode('UTF-8'))): raise ValueError('Ambiguous target name; check URL: %s' % url) if ("Matching small-bodies" in src[idx].decode('UTF-8') and "No matches found" in src[idx+1].decode('UTF-8')): raise ValueError('Unknown target; check URL: %s' % url) # field identification for each line ephemerides = [] for line in datablock: line = line.split(',') # ignore line that don't hold any data if len(line) < len(quantities.split(',')): continue this_eph = [] fieldnames = [] datatypes = [] # create a dictionary for each date (each line) for idx, item in enumerate(headerline): if ('Date__(UT)__HR:MN' in item): this_eph.append(line[idx].strip()) fieldnames.append('datetime') datatypes.append(object) if ('Date_________JDUT' in item): this_eph.append(np.float64(line[idx])) fieldnames.append('datetime_jd') datatypes.append(np.float64) # read out and convert solar presence try: this_eph.append({'*': 'daylight', 'C': 'civil twilight', 'N': 'nautical twilight', 'A': 'astronomical twilight', ' ': 'dark', 't': 'transiting'}[line[idx+1]]) except KeyError: this_eph.append('n.a.') fieldnames.append('solar_presence') datatypes.append(object) # read out and convert lunar presence try: this_eph.append({'m': 'moonlight', ' ': 'dark'}[line[idx+2]]) except KeyError: this_eph.append('n.a.') fieldnames.append('lunar_presence') datatypes.append(object) if (item.find('R.A._(ICRF/J2000.0)') > -1): this_eph.append(np.float64(line[idx])) fieldnames.append('RA') datatypes.append(np.float64) if (item.find('DEC_(ICRF/J2000.0)') > -1): this_eph.append(np.float64(line[idx])) fieldnames.append('DEC') datatypes.append(np.float64) if (item.find('dRA*cosD') > -1): try: this_eph.append(np.float64(line[idx])/3600.) # "/s except ValueError: this_eph.append(np.nan) fieldnames.append('RA_rate') datatypes.append(np.float64) if (item.find('d(DEC)/dt') > -1): try: this_eph.append(np.float64(line[idx])/3600.) # "/s except ValueError: this_eph.append(np.nan) fieldnames.append('DEC_rate') datatypes.append(np.float64) if (item.find('Azi_(a-app)') > -1): try: # if AZ not given, e.g. for space telescopes this_eph.append(np.float64(line[idx])) fieldnames.append('AZ') datatypes.append(np.float64) except ValueError: pass if (item.find('Elev_(a-app)') > -1): try: # if EL not given, e.g. for space telescopes this_eph.append(np.float64(line[idx])) fieldnames.append('EL') datatypes.append(np.float64) except ValueError: pass if (item.find('a-mass') > -1): try: # if airmass not given, e.g. for space telescopes this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('airmass') datatypes.append(np.float64) if (item.find('mag_ex') > -1): try: # if mag_ex not given, e.g. for space telescopes this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('magextinct') datatypes.append(np.float64) if (item.find('APmag') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('V') datatypes.append(np.float64) if (item.find('Illu%') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('illumination') datatypes.append(np.float64) if (item.find('hEcl-Lon') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('EclLon') datatypes.append(np.float64) if (item.find('hEcl-Lat') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('EclLat') datatypes.append(np.float64) if (item.find('ObsEcLon') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('ObsEclLon') datatypes.append(np.float64) if (item.find('ObsEcLat') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('ObsEclLat') datatypes.append(np.float64) if (item.find(' r') > -1) and \ (headerline[idx+1].find("rdot") > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('r') datatypes.append(np.float64) if (item.find('rdot') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('r_rate') datatypes.append(np.float64) if (item.find('delta') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('delta') datatypes.append(np.float64) if (item.find('deldot') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('delta_rate') datatypes.append(np.float64) if (item.find('1-way_LT') > -1): try: this_eph.append(np.float64(line[idx])*60.) # seconds except ValueError: this_eph.append(np.nan) fieldnames.append('lighttime') datatypes.append(np.float64) if (item.find('S-O-T') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('elong') datatypes.append(np.float64) # in the case of space telescopes, '/r S-T-O' is used; # ground-based telescopes have both parameters in separate # columns if (item.find('/r S-T-O') > -1): this_eph.append({'/L': 'leading', '/T': 'trailing'} [line[idx].split()[0]]) fieldnames.append('elongFlag') datatypes.append(object) try: this_eph.append(np.float64(line[idx].split()[1])) except ValueError: this_eph.append(np.nan) fieldnames.append('alpha') datatypes.append(np.float64) elif (item.find('S-T-O') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('alpha') datatypes.append(np.float64) elif (item.find('/r') > -1): this_eph.append({'/L': 'leading', '/T': 'trailing', '/?': 'not defined'} [line[idx]]) fieldnames.append('elongFlag') datatypes.append(object) if (item.find('PsAng') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('sunTargetPA') datatypes.append(np.float64) if (item.find('PsAMV') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('velocityPA') datatypes.append(np.float64) if (item.find('GlxLon') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('GlxLon') datatypes.append(np.float64) if (item.find('GlxLat') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('GlxLat') datatypes.append(np.float64) if (item.find('RA_3sigma') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('RA_3sigma') datatypes.append(np.float64) if (item.find('DEC_3sigma') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('DEC_3sigma') datatypes.append(np.float64) # in the case of a comet, use total mag for V if (item.find('T-mag') > -1): try: this_eph.append(np.float64(line[idx])) except ValueError: this_eph.append(np.nan) fieldnames.append('V') datatypes.append(np.float64) # append target name this_eph.append(targetname) fieldnames.append('targetname') datatypes.append(object) # append H this_eph.append(H) fieldnames.append('H') datatypes.append(np.float64) # append G this_eph.append(G) fieldnames.append('G') datatypes.append(np.float64) if len(this_eph) > 0: ephemerides.append(tuple(this_eph)) if len(ephemerides) == 0: return 0 # combine ephemerides with column names and data types into ndarray assert len(ephemerides[0]) == len(fieldnames) == len(datatypes) self.data = np.array(ephemerides, dtype=[(str(fieldnames[i]), datatypes[i]) for i in range(len(fieldnames))]) return len(self)
[docs] def get_elements(self, center='500@10', asteroid=False, comet=False): """Call JPL HORIZONS website to obtain orbital elements based on the provided targetname, epochs, and center code. For valid center codes, please refer to http://ssd.jpl.nasa.gov/horizons.cgi :param center: str; center body (default: 500@10 = Sun) :result: int; number of epochs queried :example: >>> ceres = callhorizons.query('Ceres') >>> ceres.set_epochrange('2016-02-23 00:00', '2016-02-24 00:00', '1h') >>> print (ceres.get_elements(), 'epochs queried') The queried properties and their definitions are: +------------------+-----------------------------------------------+ | Property | Definition | +==================+===============================================+ | targetname | official number, name, designation [string] | +------------------+-----------------------------------------------+ | H | absolute magnitude in V band (float, mag) | +------------------+-----------------------------------------------+ | G | photometric slope parameter (float) | +------------------+-----------------------------------------------+ | datetime_jd | epoch Julian Date (float) | +------------------+-----------------------------------------------+ | e | eccentricity (float) | +------------------+-----------------------------------------------+ | p | periapsis distance (float, au) | +------------------+-----------------------------------------------+ | a | semi-major axis (float, au) | +------------------+-----------------------------------------------+ | incl | inclination (float, deg) | +------------------+-----------------------------------------------+ | node | longitude of Asc. Node (float, deg) | +------------------+-----------------------------------------------+ | argper | argument of the perifocus (float, deg) | +------------------+-----------------------------------------------+ | Tp | time of periapsis (float, Julian Date) | +------------------+-----------------------------------------------+ | meananomaly | mean anomaly (float, deg) | +------------------+-----------------------------------------------+ | trueanomaly | true anomaly (float, deg) | +------------------+-----------------------------------------------+ | period | orbital period (float, Earth yr) | +------------------+-----------------------------------------------+ | Q | apoapsis distance (float, au) | +------------------+-----------------------------------------------+ """ # encode objectname for use in URL objectname = urllib.quote(self.targetname.encode("utf8")) # call Horizons website and extract data url = "https://ssd.jpl.nasa.gov/horizons_batch.cgi?batch=l" \ + "&TABLE_TYPE='ELEMENTS'" \ + "&CSV_FORMAT='YES'" \ + "&CENTER='" + str(center) + "'" \ + "&OUT_UNITS='AU-D'" \ + "&REF_PLANE='ECLIPTIC'" \ + "REF_SYSTEM='J2000'" \ + "&TP_TYPE='ABSOLUTE'" \ + "&ELEM_LABELS='YES'" \ + "CSV_FORMAT='YES'" \ + "&OBJ_DATA='YES'" # check if self.targetname is a designation # lower case + upper case + numbers = pot. case sensitive designation if self.not_smallbody: url += "&COMMAND='" + \ urllib.quote(self.targetname.encode("utf8")) + "'" elif self.isorbit_record(): # Comet orbit record. Do not use DES, CAP. This test must # occur before asteroid test. url += "&COMMAND='" + \ urllib.quote(self.targetname.encode("utf8")) + "%3B'" elif self.isasteroid() and not self.comet: # for asteroids, use 'DES="designation";' for ident in self.parse_asteroid(): if ident is not None: break if ident is None: ident = self.targetname url += "&COMMAND='" + \ urllib.quote(str(ident).encode("utf8")) + "%3B'" elif self.iscomet() and not self.asteroid: # for comets, potentially append the current apparition # (CAP) parameter, or the fragmentation flag (NOFRAG) for ident in self.parse_comet(): if ident is not None: break if ident is None: ident = self.targetname url += "&COMMAND='DES=" + \ urllib.quote(ident.encode("utf8")) + "%3B" + \ ("NOFRAG%3B" if self.nofrag else "") + \ ("CAP'" if self.cap else "'") # elif (not self.targetname.replace(' ', '').isalpha() and not # self.targetname.isdigit() and not # self.targetname.islower() and not # self.targetname.isupper()): # url += "&COMMAND='DES=" + str(objectname) + "%3B'" else: url += "&COMMAND='" + str(objectname) + "%3B'" if self.discreteepochs is not None: url += "&TLIST=" for date in self.discreteepochs: url += "'" + str(date) + "'" elif (self.start_epoch is not None and self.stop_epoch is not None and self.step_size is not None): url += "&START_TIME='" \ + urllib.quote(self.start_epoch.encode("utf8")) + "'" \ + "&STOP_TIME='" \ + urllib.quote(self.stop_epoch.encode("utf8")) + "'" \ + "&STEP_SIZE='" + str(self.step_size) + "'" else: raise IOError('no epoch information given') self.url = url i = 0 # count number of connection tries while True: try: src = urllib.urlopen(url).readlines() break except urllib.URLError: time.sleep(0.1) # in case the HORIZONS website is blocked (due to another query) # wait 1 second and try again i += 1 if i > 50: return 0 # website could not be reached # disseminate website source code # identify header line and extract data block (elements data) # also extract targetname, abs. magnitude (H), and slope parameter (G) headerline = [] datablock = [] in_datablock = False H, G = np.nan, np.nan for idx, line in enumerate(src): line = line.decode('UTF-8') if 'JDTDB,' in line: headerline = line.split(',') if "$$EOE\n" in line: in_datablock = False if in_datablock: datablock.append(line) if "$$SOE\n" in line: in_datablock = True if "Target body name" in line: targetname = line[18:50].strip() if "rotational period in hours)" in src[idx].decode('UTF-8'): HGline = src[idx+2].decode('UTF-8').split('=') if 'B-V' in HGline[2] and 'G' in HGline[1]: try: H = float(HGline[1].rstrip('G')) except ValueError: pass try: G = float(HGline[2].rstrip('B-V')) except ValueError: pass if ("Multiple major-bodies match string" in src[idx].decode('UTF-8') or ("Matching small-bodies" in src[idx].decode('UTF-8') and not "No matches found" in src[idx+1].decode('UTF-8'))): raise ValueError('Ambiguous target name; check URL: %s' % url) if ("Matching small-bodies" in src[idx].decode('UTF-8') and "No matches found" in src[idx+1].decode('UTF-8')): raise ValueError('Unknown target; check URL: %s' % url) # field identification for each line in elements = [] for line in datablock: line = line.split(',') this_el = [] fieldnames = [] datatypes = [] # create a dictionary for each date (each line) for idx, item in enumerate(headerline): if (item.find('JDTDB') > -1): this_el.append(np.float64(line[idx])) fieldnames.append('datetime_jd') datatypes.append(np.float64) if (item.find('EC') > -1): this_el.append(np.float64(line[idx])) fieldnames.append('e') datatypes.append(np.float64) if (item.find('QR') > -1): this_el.append(np.float64(line[idx])) fieldnames.append('p') datatypes.append(np.float64) if (item.find('A') > -1) and len(item.strip()) == 1: this_el.append(np.float64(line[idx])) fieldnames.append('a') datatypes.append(np.float64) if (item.find('IN') > -1): this_el.append(np.float64(line[idx])) fieldnames.append('incl') datatypes.append(np.float64) if (item.find('OM') > -1): this_el.append(np.float64(line[idx])) fieldnames.append('node') datatypes.append(np.float64) if (item.find('W') > -1): this_el.append(np.float64(line[idx])) fieldnames.append('argper') datatypes.append(np.float64) if (item.find('Tp') > -1): this_el.append(np.float64(line[idx])) fieldnames.append('Tp') datatypes.append(np.float64) if (item.find('MA') > -1): this_el.append(np.float64(line[idx])) fieldnames.append('meananomaly') datatypes.append(np.float64) if (item.find('TA') > -1): this_el.append(np.float64(line[idx])) fieldnames.append('trueanomaly') datatypes.append(np.float64) if (item.find('PR') > -1): # Earth years this_el.append(np.float64(line[idx])/(365.256)) fieldnames.append('period') datatypes.append(np.float64) if (item.find('AD') > -1): this_el.append(np.float64(line[idx])) fieldnames.append('Q') datatypes.append(np.float64) # append targetname this_el.append(targetname) fieldnames.append('targetname') datatypes.append(object) # append H this_el.append(H) fieldnames.append('H') datatypes.append(np.float64) # append G this_el.append(G) fieldnames.append('G') datatypes.append(np.float64) if len(this_el) > 0: elements.append(tuple(this_el)) if len(elements) == 0: return 0 # combine elements with column names and data types into ndarray assert len(elements[0]) == len(fieldnames) == len(datatypes) self.data = np.array(elements, dtype=[(str(fieldnames[i]), datatypes[i]) for i in range(len(fieldnames))]) return len(self)
[docs] def export2pyephem(self, center='500@10', equinox=2000.): """Call JPL HORIZONS website to obtain orbital elements based on the provided targetname, epochs, and center code and create a PyEphem (http://rhodesmill.org/pyephem/) object. This function requires PyEphem to be installed. :param center: str; center body (default: 500@10 = Sun) :param equinox: float; equinox (default: 2000.0) :result: list; list of PyEphem objects, one per epoch :example: >>> import callhorizons >>> import numpy >>> import ephem >>> >>> ceres = callhorizons.query('Ceres') >>> ceres.set_epochrange('2016-02-23 00:00', '2016-02-24 00:00', '1h') >>> ceres_pyephem = ceres.export2pyephem() >>> >>> nau = ephem.Observer() # setup observer site >>> nau.lon = -111.653152/180.*numpy.pi >>> nau.lat = 35.184108/180.*numpy.pi >>> nau.elevation = 2100 # m >>> nau.date = '2015/10/5 01:23' # UT >>> print ('next rising: %s' % nau.next_rising(ceres_pyephem[0])) >>> print ('next transit: %s' % nau.next_transit(ceres_pyephem[0])) >>> print ('next setting: %s' % nau.next_setting(ceres_pyephem[0])) """ try: import ephem except ImportError: raise ImportError( 'export2pyephem requires PyEphem to be installed') # obtain orbital elements self.get_elements(center) objects = [] for el in self.data: n = 0.9856076686/np.sqrt(el['a']**3) # mean daily motion epoch_djd = el['datetime_jd']-2415020.0 # Dublin Julian date epoch = ephem.date(epoch_djd) epoch_str = "%d/%f/%d" % (epoch.triple()[1], epoch.triple()[2], epoch.triple()[0]) # export to PyEphem objects.append(ephem.readdb("%s,e,%f,%f,%f,%f,%f,%f,%f,%s,%i,%f,%f" % (el['targetname'], el['incl'], el['node'], el['argper'], el['a'], n, el['e'], el['meananomaly'], epoch_str, equinox, el['H'], el['G']))) return objects