caproto / caproto Goto Github PK
View Code? Open in Web Editor NEWa bring-your-own-IO implementation of the EPICS Channel Access protocol
Home Page: https://caproto.github.io/caproto/
License: Other
a bring-your-own-IO implementation of the EPICS Channel Access protocol
Home Page: https://caproto.github.io/caproto/
License: Other
Notes to self. Implementation in progress.
PV
instance for this? If so, return that. If not, allocate a cid
and instantiate a corresponding PV
object.PV
objects just created above, send [(name, cid), (name2, cid2)]
to the Broadcaster to search.cid -> PV
mapping and return a list of PV
objects, some or all of which may not be connected yet.(address, (cid1, cid2, ...))
to the Context
. If not, send a SearchRequest
using the cid
provided by the Context
.(address, (cid1, cid2, ...))
to the Context
as updates come in. (I think that all Contexts are current notified, which is a problem because they have different cids.... this could be solved using a queue per context, as we do in ChannelData
.)(address, (cid1, cid2, ...))
from the Broadcaster in its channel creation queue.cid
in its cache to find the PV
, which knows the priority
. Checks: do I have a VirtualCircuitManager
for this (address, prority)
yet? If not, create a socket and a VirtualCircuitManager
.caproto.ClientChanel
. (There is not need for a special Channel
class.)caproto.ClientChannel
and to the VirtualCircuitManager
onto the PV
.VirualCircuitManager.send
.cid
on the VirtualCircuitManager
. Tells the PV
it is connected
via updating a connected
attribute and possibly setting an Event
, to make it possible to wait on PV
connection.If so, how does that work?
There are a couple layers of obsucation here, but it looks like it is possible to read from a channel before it has finished all of it's hand shaking.
___________________________ test_context_disconnect ____________________________
cntx = <caproto.threading.client.Context object at 0x7ffb5c85e938>
def test_context_disconnect(cntx):
str_pv = 'Py:ao1.DESC'
def bootstrap():
cntx.search(str_pv)
chan = cntx.create_channel(str_pv)
assert chan.connected
assert chan.circuit.connected
return chan
def is_happy(chan, cntx):
chan.read()
assert chan.connected
assert chan.circuit.connected
assert cntx.registered
assert cntx.circuits
assert cntx.search_results
> chan = bootstrap()
tests/test_threading_client.py:130:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
def bootstrap():
cntx.search(str_pv)
chan = cntx.create_channel(str_pv)
assert chan.connected
> assert chan.circuit.connected
E assert False
E + where False = <caproto.threading.client.VirtualCircuit object at 0x7ffb5c85e0b0>.connected
E + where <caproto.threading.client.VirtualCircuit object at 0x7ffb5c85e0b0> = <caproto.threading.client.Channel object at 0x7ffb5c70d228>.circuit
tests/test_threading_client.py:119: AssertionError
---------------------------- Captured stdout setup -----------------------------
sending to ('172.17.255.255', 5065) b'\x00\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x7f\x00\x00\x01'
(repeater) RepeaterRegisterRequest(client_address='127.0.0.1') ('172.17.0.1', 33655)
new remote with addr ('172.17.0.1', 33655)
sent confirmation to ('172.17.0.1', 33655)
Registered with repeater
---------------------------- Captured stderr setup -----------------------------
DEBUG:caproto.Broadcaster:Serializing 1 commands into one datagram
DEBUG:caproto.Broadcaster:1 of 1 RepeaterRegisterRequest(client_address='127.0.0.1')
DEBUG:caproto.Broadcaster:Parsed 1 commands from first datagram in queue.
DEBUG:caproto.Broadcaster:0 of 1: RepeaterRegisterRequest(client_address='127.0.0.1')
DEBUG:caproto.Broadcaster:Processing 1/1 commands left in datagram. 0 more datagrams are cached.
DEBUG:caproto.Broadcaster:Serializing 1 commands into one datagram
DEBUG:caproto.Broadcaster:1 of 1 RepeaterConfirmResponse(repeater_address='172.17.0.1')
DEBUG:caproto.Broadcaster:Parsed 1 commands from first datagram in queue.
DEBUG:caproto.Broadcaster:0 of 1: RepeaterConfirmResponse(repeater_address='172.17.0.1')
DEBUG:caproto.Broadcaster:Processing 1/1 commands left in datagram. 0 more datagrams are cached.
----------------------------- Captured stdout call -----------------------------
sending to ('172.17.255.255', 5064) b'\x00\x00\x00\x00\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x10\x00\x05\x00\r\x00\x00\x00\x00\x00\x00\x00\x00Py:ao1.DESC\x00\x00\x00\x00\x00'
<CircuitState states={CLIENT: AWAIT_VERSION_RESPONSE, SERVER: SEND_VERSION_RESPONSE}> <socket.socket fd=23, family=AddressFamily.AF_INET, type=2049, proto=6, laddr=('172.17.0.1', 47552), raddr=('172.17.0.3', 5064)> <caproto.threading.client.SocketThread object at 0x7ffb5c6e4438>
----------------------------- Captured stderr call -----------------------------
DEBUG:caproto.Broadcaster:Serializing 2 commands into one datagram
DEBUG:caproto.Broadcaster:1 of 2 VersionRequest(priority=0, version=13)
DEBUG:caproto.Broadcaster:2 of 2 SearchRequest(name=b'Py:ao1.DESC', cid=0, version=13)
DEBUG:caproto.Broadcaster:Parsed 2 commands from first datagram in queue.
DEBUG:caproto.Broadcaster:0 of 2: VersionResponse(version=13)
DEBUG:caproto.Broadcaster:1 of 2: SearchResponse(port=5064, ip='255.255.255.255', cid=0, version=13)
DEBUG:caproto.Broadcaster:Processing 1/2 commands left in datagram. 0 more datagrams are cached.
DEBUG:caproto.Broadcaster:Processing 1/1 commands left in datagram. 0 more datagrams are cached.
DEBUG:caproto.VC:Received 16 bytes.
DEBUG:caproto.VC:Parsed 16/16 cached bytes into VersionResponse(version=13).
DEBUG:caproto.VC:0 bytes are cached. Need more bytes to parse next command.
DEBUG:caproto.VC:Serializing VersionRequest(priority=0, version=13)
DEBUG:caproto.VC:Serializing HostNameRequest(name=b'testing-gce-5886ec59-be3e-490f-88aa-753')
DEBUG:caproto.VC:Serializing ClientNameRequest(name=b'travis')
DEBUG:caproto.VC:Serializing CreateChanRequest(name=b'Py:ao1.DESC', cid=0, version=13)
DEBUG:caproto.VC:Received 32 bytes.
DEBUG:caproto.VC:Parsed 16/32 cached bytes into AccessRightsResponse(cid=0, access_rights=3).
DEBUG:caproto.VC:Parsed 16/16 cached bytes into CreateChanResponse(data_type=0, data_count=1, cid=0, sid=82).
DEBUG:caproto.VC:0 bytes are cached. Need more bytes to parse next command.
https://travis-ci.org/danielballan/caproto/builds/226633298 <- original
https://travis-ci.org/danielballan/caproto/jobs/226634673 <- this log with more prints an an earlier assert that failed
Upon unintentional disconnection, PV classes should periodically search and try to reconnect
Keeping track of them here, expect more to follow:
=================================== FAILURES ===================================
___________________________ PV_Tests.test_DoubleVal ____________________________
self = <test_thread_pv.PV_Tests testMethod=test_DoubleVal>
def test_DoubleVal(self):
pvn = pvnames.double_pv
pv = PV(pvn)
pv.get()
cdict = pv.get_ctrlvars()
write( 'Testing CTRL Values for a Double (%s)\n' % (pvn))
self.failUnless('severity' in cdict)
self.failUnless(len(pv.host) > 1)
self.assertEqual(pv.count,1)
> self.assertEqual(pv.precision, pvnames.double_pv_prec)
E AssertionError: 0 != 4
tests/test_thread_pv.py:490: AssertionError
----------------------------- Captured stdout call -----------------------------
Testing CTRL Values for a Double (Py:ao1)
----------------------------- Captured stderr call -----------------------------
DEBUG:[cli circuit]:Serializing CreateChanRequest(name=b'Py:ao1', cid=3, version=13)
DEBUG:[cli circuit]:Received 32 bytes.
DEBUG:[cli circuit]:16 bytes -> AccessRightsResponse(cid=3, access_rights=<AccessRights.READ_WRITE: 3>)
DEBUG:[cli circuit]:16 bytes -> CreateChanResponse(data_type=<ChannelType.DOUBLE: 6>, data_count=1, cid=3, sid=5)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing ReadNotifyRequest(data_type=<ChannelType.TIME_DOUBLE: 20>, data_count=0, sid=5, ioid=7)
DEBUG:[cli circuit]:Received 40 bytes.
DEBUG:[cli circuit]:40 bytes -> ReadNotifyResponse(data=array([ 1.]), data_type=<ChannelType.TIME_DOUBLE: 20>, data_count=1, status=1, ioid=7, metadata=DBR_TIME_DOUBLE(status=<AlarmStatus.UDF: 17>, severity=<AlarmSeverity.INVALID_ALARM: 3>, timestamp=631152000.0))
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing ReadNotifyRequest(data_type=<ChannelType.CTRL_DOUBLE: 34>, data_count=0, sid=5, ioid=8)
DEBUG:[cli circuit]:Received 104 bytes.
DEBUG:[cli circuit]:104 bytes -> ReadNotifyResponse(data=array([ 1.]), data_type=<ChannelType.CTRL_DOUBLE: 34>, data_count=1, status=1, ioid=8, metadata=DBR_CTRL_DOUBLE(status=<AlarmStatus.UDF: 17>, severity=<AlarmSeverity.INVALID_ALARM: 3>, upper_disp_limit=0.0, lower_disp_limit=0.0, upper_alarm_limit=nan, upper_warning_limit=nan, lower_warning_limit=nan, lower_alarm_limit=nan, upper_ctrl_limit=0.0, lower_ctrl_limit=0.0, precision=0, units=b''))
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
__________________________ PV_Tests.test_get_callback __________________________
self = <test_thread_pv.PV_Tests testMethod=test_get_callback>
def test_get_callback(self):
write("Callback test: changing PV must be updated\n")
global NEWVALS
mypv = PV(pvnames.updating_pv1)
NEWVALS = []
def onChanges(pvname=None, value=None, char_value=None, **kw):
write( 'PV %s %s, %s Changed!\n' % (pvname, repr(value), char_value))
NEWVALS.append( repr(value))
mypv.add_callback(onChanges)
write('Added a callback. Now wait for changes...\n')
t0 = time.time()
while time.time() - t0 < 3:
time.sleep(1.e-4)
write(' saw %i changes.\n' % len(NEWVALS))
> self.failUnless(len(NEWVALS) > 3)
E AssertionError: False is not true
tests/test_thread_pv.py:327: AssertionError
----------------------------- Captured stdout call -----------------------------
Callback test: changing PV must be updated
PV Py:ao1 array([ 1.]), None Changed!
Added a callback. Now wait for changes...
saw 1 changes.
______________________ PV_Tests.test_get_string_waveform _______________________
self = <test_thread_pv.PV_Tests testMethod=test_get_string_waveform>
def test_get_string_waveform(self):
write('String Array: \n')
with no_simulator_updates():
pv = PV(pvnames.string_arr_pv)
val = pv.get()
> self.failUnless(len(val) > 10)
E AssertionError: False is not true
tests/test_thread_pv.py:217: AssertionError
----------------------------- Captured stdout call -----------------------------
String Array:
=============== 3 failed, 364 passed, 2 skipped in 45.76 seconds ===============
Reported in other issues:
test_put_complete
in #104DBE_ALARM, DBE_VALUE, DBE_LOG, etc. are not handled correctly with the curio server.
Multiple invocations of caproto-repeater
happily start multiple repeaters running at once. These should fail internally when the second repeater tries to bind to ('0.0.0.0', 5065)
.
Note the sets of functions that need to be mutually locked at a higher layer (mutating the internal state or the internal queues).
See this with threading client, but looks like it's root is in _hub
and _commands
In [7]: ct = Context()
In [8]: ct.register()
DEBUG:caproto.Broadcaster:Serializing 1 commands into one datagram
DEBUG:caproto.Broadcaster:1 of 1 RepeaterRegisterRequest(client_address='127.0.0.1')
sending to ('172.17.255.255', 5065) b'\x00\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x7f\x00\x00\x01'
DEBUG:caproto.Broadcaster:Parsed 1 commands from first datagram in queue.
DEBUG:caproto.Broadcaster:0 of 1: RepeaterConfirmResponse(repeater_address='172.17.0.1')
DEBUG:caproto.Broadcaster:Processing 1/1 commands left in datagram. 0 more datagrams are cached.
Registered with repeater
In [11]: ct.register()
DEBUG:caproto.Broadcaster:Serializing 1 commands into one datagram
DEBUG:caproto.Broadcaster:1 of 1 RepeaterRegisterRequest(client_address='127.0.0.1')
sending to ('172.17.255.255', 5065) b'\x00\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x7f\x00\x00\x01'
Registered with repeater
In [12]: Exception in thread Thread-3:
Traceback (most recent call last):
File "/home/tcaswell/.virtualenvs/dd36/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/home/tcaswell/.virtualenvs/dd36/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/home/tcaswell/source/bnl_source/caproto/caproto/threading/client.py", line 41, in __call__
self.target_obj.next_command(bytes_recv, address)
File "/home/tcaswell/source/bnl_source/caproto/caproto/threading/client.py", line 158, in next_command
command = self.broadcaster.next_command()
File "/home/tcaswell/source/bnl_source/caproto/caproto/_hub.py", line 445, in next_command
commands = read_datagram(byteslike, address, self.their_role)
File "/home/tcaswell/source/bnl_source/caproto/caproto/_commands.py", line 262, in read_datagram
_class = get_command_class(role, header)
File "/home/tcaswell/source/bnl_source/caproto/caproto/_commands.py", line 248, in get_command_class
_class = Commands[role][header.command]
KeyError: 24
It looks like os.sysconf
could let us see this cutoff coming: https://docs.python.org/3/library/socket.html#socket.socket.sendmsg
As implemented currently, the RegistrationRequest is intercepted and not broadcast, but any subsequent messages are similarly not broadcast.
Fix by removing RegistrationRequest from message list and broadcasting remaining messages to clients. E.g., [Version(), RegistrationRequest(), SearchRequest()]
-> intercept registration, rebroadcast [Version(), SearchRequest()]
Reference https://github.com/epics-base/epics-base/blob/3.15/src/ca/client/repeater.cpp#L555
I cloned the master branch just now. Should the simply.py example run properly in Windows yet?
Although I also just realized I don't need to run this in Windows for the application I'm working on...
C:\Users\jsinsheimer\AppData\Local\Continuum\Anaconda3\envs\python=3.6>python "C:\Users\jsinsheimer\AppData\Local\Continuum\Anaconda3\envs\python=3.6\Lib\site-packages\caproto\ioc_examples\simple.py"
PVs: ['simple:A', 'simple:B']
Exception OSError(10022, 'An invalid argument was supplied', None, 10022, None) from selector_select ignored
Traceback (most recent call last):
File "C:\Users\jsinsheimer\AppData\Local\Continuum\Anaconda3\envs\python=3.6\lib\site-packages\curio\kernel.py", line 739, in _run_coro
events = selector_select(timeout)
File "C:\Users\jsinsheimer\AppData\Local\Continuum\Anaconda3\envs\python=3.6\lib\selectors.py", line 323, in select
r, w, _ = self._select(self._readers, self._writers, [], timeout)
File "C:\Users\jsinsheimer\AppData\Local\Continuum\Anaconda3\envs\python=3.6\lib\selectors.py", line 314, in _select
r, w, x = select.select(r, w, w, timeout)
OSError: [WinError 10022] An invalid argument was supplied
For more information: https://epics.anl.gov/base/R3-15/5-docs/filters.html
(Thanks @hhslepicka for pointing these out!)
"Are we using the right classfiers in setup.py?" - @tacaswell
#161 stores the entire history of subscription readings until the next callback is added. Unless I am missing something that will lead to OutofMemory errors for long-running clients. I think we only need to cache the single most recent reading and feed that to any new callbacks.
This difference in naming is inherited from CAproto.html (which we scraped). Both of these are integer status codes, so I think there is no useful distinction here.
I discovered that the servers have a hard ophyd dependency, which I think was not intentional.
Is everyone cool with using pip's optional dependency scheme? Maybe
pip install caproto # the threading client and the sync client will work, but no servers
pip install caproto[async] # picks up trio, curio
pip install caproto[all] # picks up numpy, ophyd, asks and whatever else
Almost all of the substantial features are already done and merged in #136, but there are a handful of fiddly formatting-related arguments such as:
Floating point type format:
Default: Use %g format
-e <nr>: Use %e format, with a precision of <nr> digits
-f <nr>: Use %f format, with a precision of <nr> digits
-g <nr>: Use %g format, with a precision of <nr> digits
-s: Get value as string (honors server-side precision)
-lx: Round to long integer and print as hex number
-lo: Round to long integer and print as octal number
-lb: Round to long integer and print as binary number
Integer number format:
Default: Print as decimal number
-0x: Print as hex number
-0o: Print as octal number
-0b: Print as binary number
Alternate output field separator:
-F <ofs>: Use <ofs> as an alternate output field separator
Most of these can be implemented in terms of the existing --format
argument and are somewhat obviated by it, but for the sake of completeness they should be implemented so that it is really safe to do alias caget=caproto-get
.
Additionally, put-completion needs to be made configurable from the CLI.
The full help strings from epics-base CLIs are reproduced here in a 'details' tag here:
$ caget -h
Usage: caget [options] <PV name> ...
-h: Help: Print this message
Channel Access options:
-w <sec>: Wait time, specifies CA timeout, default is 1.000000 second(s)
-c: Asynchronous get (use ca_get_callback and wait for completion)
-p <prio>: CA priority (0-99, default 0=lowest)
Format options:
Default output format is "name value"
-t: Terse mode - print only value, without name
-a: Wide mode "name timestamp value stat sevr" (read PVs as DBR_TIME_xxx)
-d <type>: Request specific dbr type; use string (DBR_ prefix may be omitted)
or number of one of the following types:
DBR_STRING 0 DBR_STS_FLOAT 9 DBR_TIME_LONG 19 DBR_CTRL_SHORT 29
DBR_INT 1 DBR_STS_ENUM 10 DBR_TIME_DOUBLE 20 DBR_CTRL_INT 29
DBR_SHORT 1 DBR_STS_CHAR 11 DBR_GR_STRING 21 DBR_CTRL_FLOAT 30
DBR_FLOAT 2 DBR_STS_LONG 12 DBR_GR_SHORT 22 DBR_CTRL_ENUM 31
DBR_ENUM 3 DBR_STS_DOUBLE 13 DBR_GR_INT 22 DBR_CTRL_CHAR 32
DBR_CHAR 4 DBR_TIME_STRING 14 DBR_GR_FLOAT 23 DBR_CTRL_LONG 33
DBR_LONG 5 DBR_TIME_INT 15 DBR_GR_ENUM 24 DBR_CTRL_DOUBLE 34
DBR_DOUBLE 6 DBR_TIME_SHORT 15 DBR_GR_CHAR 25 DBR_STSACK_STRING 37
DBR_STS_STRING 7 DBR_TIME_FLOAT 16 DBR_GR_LONG 26 DBR_CLASS_NAME 38
DBR_STS_SHORT 8 DBR_TIME_ENUM 17 DBR_GR_DOUBLE 27
DBR_STS_INT 8 DBR_TIME_CHAR 18 DBR_CTRL_STRING 28
Enum format:
-n: Print DBF_ENUM value as number (default is enum string)
Arrays: Value format: print number of requested values, then list of values
Default: Print all values
-# <count>: Print first <count> elements of an array
-S: Print array of char as a string (long string)
Floating point type format:
Default: Use %g format
-e <nr>: Use %e format, with a precision of <nr> digits
-f <nr>: Use %f format, with a precision of <nr> digits
-g <nr>: Use %g format, with a precision of <nr> digits
-s: Get value as string (honors server-side precision)
-lx: Round to long integer and print as hex number
-lo: Round to long integer and print as octal number
-lb: Round to long integer and print as binary number
Integer number format:
Default: Print as decimal number
-0x: Print as hex number
-0o: Print as octal number
-0b: Print as binary number
Alternate output field separator:
-F <ofs>: Use <ofs> as an alternate output field separator
Example: caget -a -f8 my_channel another_channel
(uses wide output format, doubles are printed as %f with precision of 8)
dallan@xf23id1-srv1:~$ camonitor -h
Usage: camonitor [options] <PV name> ...
-h: Help; Print this message
Channel Access options:
-w <sec>: Wait time, specifies CA timeout, default is 1.000000 second(s)
-m <msk>: Specify CA event mask to use. <msk> is any combination of
'v' (value), 'a' (alarm), 'l' (log/archive), 'p' (property).
Default event mask is 'va'
-p <pri>: CA priority (0-99, default 0=lowest)
Timestamps:
Default: Print absolute timestamps (as reported by CA server)
-t <key>: Specify timestamp source(s) and type, with <key> containing
's' = CA server (remote) timestamps
'c' = CA client (local) timestamps (shown in '()'s)
'n' = no timestamps
'r' = relative timestamps (time elapsed since start of program)
'i' = incremental timestamps (time elapsed since last update)
'I' = incremental timestamps (time since last update, by channel)
'r', 'i' or 'I' require 's' or 'c' to select the time source
Enum format:
-n: Print DBF_ENUM values as number (default is enum string)
Array values: Print number of elements, then list of values
Default: Request and print all elements (dynamic arrays supported)
-# <num>: Request and print up to <num> elements
-S: Print arrays of char as a string (long string)
Floating point format:
Default: Use %g format
-e <num>: Use %e format, with a precision of <num> digits
-f <num>: Use %f format, with a precision of <num> digits
-g <num>: Use %g format, with a precision of <num> digits
-s: Get value as string (honors server-side precision)
-lx: Round to long integer and print as hex number
-lo: Round to long integer and print as octal number
-lb: Round to long integer and print as binary number
Integer number format:
Default: Print as decimal number
-0x: Print as hex number
-0o: Print as octal number
-0b: Print as binary number
Alternate output field separator:
-F <ofs>: Use <ofs> to separate fields in output
Example: camonitor -f8 my_channel another_channel
(doubles are printed as %f with precision of 8)
dallan@xf23id1-srv1:~$ caput -h
Usage: caput [options] <PV name> <PV value>
caput -a [options] <PV name> <no of values> <PV value> ...
-h: Help: Print this message
Channel Access options:
-w <sec>: Wait time, specifies CA timeout, default is 1.000000 second(s)
-c: Asynchronous put (use ca_put_callback and wait for completion)
-p <prio>: CA priority (0-99, default 0=lowest)
Format options:
-t: Terse mode - print only sucessfully written value, without name
-l: Long mode "name timestamp value stat sevr" (read PVs as DBR_TIME_xxx)
Enum format:
Default: Auto - try value as ENUM string, then as index number
-n: Force interpretation of values as numbers
-s: Force interpretation of values as strings
Arrays:
-a: Put array
Value format: number of requested values, then list of values
-S: Put string as an array of char (long string)
Example: caput my_channel 1.2
(puts 1.2 to my_channel)
It claims to significantly speed up import times, particularly relevant for CLIs like ours.
https://twitter.com/VictorStinner/status/995418047933571073?s=20
Reuse pattern similar to https://github.com/klauer/recordwhat/blob/master/recordwhat/parsers/db_parsimonious.py#L26
There's interesting potential for performance gains here.
Just curious, seems like a nice approach...
The caget-scraping tests break if we set __len__
to max_length
, as @tacaswell tried to do in #87. What's the correct length? Do we need to fix the scraping tests? Might need @klauer's rescue, as I have helpfully illustrated with Photoshop:
caproto._utils.spawn_daemon
) but forking does weird things from inside IPython.Popen
, which works fine by assumes caproto-repeater
is on the PATH
. It should be, but calling the function directly smells better to me.Both of these approaches always spawn a repeater and let that repeater die if it can't bind to the repeater port. This seems wasteful, but I can't think of a better way. The only way to check whether the port is open is to bind to it, and binding to it causes it to become temporarily unavailable (until reclaimed by the OS). So this process seems problematic:
If I understand correctly, (2) can fail depending on race conditions. Better to do: bind and start a repeater or fail to bind and die. But this is admittedly wasteful and I am open to better suggestions! It could be that my understanding of how binding works is not quite right.
I saw two examples like this, one for 'enum' and one for 'pi' -- probably not related to the specific channels. My guess is that the server it's quite done starting up by the time we're hitting it with caget
. I know that the caget
binary times out very quickly.
if not lines:
> raise RuntimeError('caget failed: {}'.format(stderr))
E RuntimeError: caget failed: Channel connect timed out: 'enum' not found.
E CA Client Library: Ignored duplicate create channel response from CA server?
tests/epics_test_utils.py:98: RuntimeError
This may be related to restarting my network a few time / putting the laptop to sleep /
ERROR:asyncio:Exception in callback _SelectorDatagramTransport._read_ready()
handle: <Handle _SelectorDatagramTransport._read_ready()>
Traceback (most recent call last):
File "/home/tcaswell/mc3/envs/dd36/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "/home/tcaswell/mc3/envs/dd36/lib/python3.6/asyncio/selector_events.py", line 1078, in _read_ready
self._protocol.datagram_received(data, addr)
File "caproto/examples/repeater.py", line 21, in datagram_received
command = self.broadcaster.next_command()
File "/home/tcaswell/src/bnl/epics/caproto/caproto/_hub.py", line 427, in next_command
command = self._parsed_commands.popleft()
IndexError: pop from an empty deque
We should add a Travis-CI build without numpy to ensure that it falls back on array.array
successfully. There is at least one known issue, currently, where the sans-numpy behavior is only stubbed out but not written.
Failing this, we should at least remove the claim (on the index page of the documentation) that numpy is optional.
Now that the client and server implementations are almost feature complete, we should revisit the public API and potentially cull it so that caproto.<TAB>
is a bit less crowded.
from caproto.thread.client import Context
ct = Context()
ct.register()
ct.search('sim:mtr1.RBV')
ct.search('sim:mtr1')
R = ct.create_channel('sim:mtr1.RBV')
W = ct.create_channel('sim:mtr1')
R.subscribe()
In [91]: R.channel.subscriptions
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-104-7ed4da539a84> in <module>()
----> 1 R.channel.subscriptions
/home/tcaswell/src/bnl/epics/caproto/caproto/_hub.py in subscriptions(self)
548 Get cached EventAdd commands for this channel's active subscriptions.
549 """
--> 550 return {k: v for k, v in self.circuit.event_add_commands.items()
551 if v.cid == self.cid}
552
/home/tcaswell/src/bnl/epics/caproto/caproto/_hub.py in <dictcomp>(.0)
549 """
550 return {k: v for k, v in self.circuit.event_add_commands.items()
--> 551 if v.cid == self.cid}
552
553 def kill(self):
AttributeError: 'EventAddRequest' object has no attribute 'cid'
Scenario:
ClearChannelRequest
MUST_CLOSE
on both sidesMUST_CLOSE
stateEventAddResponse
I haven't seen this random failure before. I'm not sure following up needs to be a high priority, but I'll paste the log here just in case:
=================================== FAILURES ===================================
______________________________ test_put_complete _______________________________
cntx = <caproto.threading.client.PVContext object at 0x7f78ef7c3368>
def test_put_complete(cntx):
pv = cntx.get_pv('Py:ao3')
mutable = []
# start in a known initial state
pv.put(0.0, wait=True)
pv.get()
def cb(a):
mutable.append(a)
# put and wait
old_value = pv.get()
result = pv.put(0.1, wait=True)
assert result == old_value
# put and wait with callback (not interesting use of callback)
old_value = pv.get()
result = pv.put(0.2, wait=True, callback=cb, callback_data=('T2',))
assert result == old_value
assert 'T2' in mutable
# put and do not wait
ret_val = pv.put(0.3, wait=False)
assert ret_val is None
result = pv.get()
print('last_reading', pv.chid.last_reading)
assert result == 0.3
# put and do not wait with callback
ret_val = pv.put(0.4, wait=False, callback=cb, callback_data=('T4',))
assert ret_val is None
result = pv.get()
> assert result == 0.4
E assert array([ 284.52237334]) == 0.4
tests/test_threading_pv.py:95: AssertionError
----------------------------- Captured stdout call -----------------------------
last_reading ReadNotifyResponse(data=array([ 0.3]), data_type=20, data_count=1, status=1, ioid=11, metadata=<caproto._dbr.DBR_TIME_DOUBLE object at 0x7f78ef7cdf28>)
----------------------------- Captured stderr call -----------------------------
DEBUG:[cli circuit]:Serializing VersionRequest(priority=0, version=13)
DEBUG:[cli circuit]:Received 16 bytes.
DEBUG:[cli circuit]:Serializing HostNameRequest(name=b'testing-docker-f2409b1d-0751-444a-9922-253e57dc9408')
DEBUG:[cli circuit]:16 bytes -> VersionResponse(version=13)
DEBUG:[cli circuit]:Serializing ClientNameRequest(name=b'travis')
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing CreateChanRequest(name=b'Py:ao3', cid=0, version=13)
DEBUG:[cli circuit]:Received 32 bytes.
DEBUG:[cli circuit]:16 bytes -> AccessRightsResponse(cid=0, access_rights=3)
DEBUG:[cli circuit]:16 bytes -> CreateChanResponse(data_type=6, data_count=1, cid=0, sid=92)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing ReadNotifyRequest(data_type=20, data_count=0, sid=92, ioid=0)
DEBUG:[cli circuit]:Received 40 bytes.
DEBUG:[cli circuit]:40 bytes -> ReadNotifyResponse(data=array([ 287.75644536]), data_type=20, data_count=1, status=1, ioid=0, metadata=<caproto._dbr.DBR_TIME_DOUBLE object at 0x7f78ef7cdf28>)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing WriteNotifyRequest(data=array([ 0.]), data_type=6, data_count=1, sid=92, ioid=1, metadata=None)
DEBUG:[cli circuit]:Received 16 bytes.
DEBUG:[cli circuit]:16 bytes -> WriteNotifyResponse(data_type=6, data_count=1, status=1, ioid=1)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing ReadNotifyRequest(data_type=20, data_count=0, sid=92, ioid=2)
DEBUG:[cli circuit]:Received 40 bytes.
DEBUG:[cli circuit]:40 bytes -> ReadNotifyResponse(data=array([ 0.]), data_type=20, data_count=1, status=1, ioid=2, metadata=<caproto._dbr.DBR_TIME_DOUBLE object at 0x7f78ef7d4620>)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing ReadNotifyRequest(data_type=20, data_count=0, sid=92, ioid=3)
DEBUG:[cli circuit]:Received 40 bytes.
DEBUG:[cli circuit]:40 bytes -> ReadNotifyResponse(data=array([ 0.]), data_type=20, data_count=1, status=1, ioid=3, metadata=<caproto._dbr.DBR_TIME_DOUBLE object at 0x7f78ef7d4488>)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing ReadNotifyRequest(data_type=20, data_count=0, sid=92, ioid=4)
DEBUG:[cli circuit]:Received 40 bytes.
DEBUG:[cli circuit]:40 bytes -> ReadNotifyResponse(data=array([ 0.]), data_type=20, data_count=1, status=1, ioid=4, metadata=<caproto._dbr.DBR_TIME_DOUBLE object at 0x7f78ef7d4268>)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing WriteNotifyRequest(data=array([ 0.1]), data_type=6, data_count=1, sid=92, ioid=5, metadata=None)
DEBUG:[cli circuit]:Received 16 bytes.
DEBUG:[cli circuit]:16 bytes -> WriteNotifyResponse(data_type=6, data_count=1, status=1, ioid=5)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing ReadNotifyRequest(data_type=20, data_count=0, sid=92, ioid=6)
DEBUG:[cli circuit]:Received 40 bytes.
DEBUG:[cli circuit]:40 bytes -> ReadNotifyResponse(data=array([ 0.1]), data_type=20, data_count=1, status=1, ioid=6, metadata=<caproto._dbr.DBR_TIME_DOUBLE object at 0x7f78ef7d4730>)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing ReadNotifyRequest(data_type=20, data_count=0, sid=92, ioid=7)
DEBUG:[cli circuit]:Received 40 bytes.
DEBUG:[cli circuit]:40 bytes -> ReadNotifyResponse(data=array([ 0.1]), data_type=20, data_count=1, status=1, ioid=7, metadata=<caproto._dbr.DBR_TIME_DOUBLE object at 0x7f78ef7d4488>)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing WriteNotifyRequest(data=array([ 0.2]), data_type=6, data_count=1, sid=92, ioid=8, metadata=None)
DEBUG:[cli circuit]:Received 16 bytes.
DEBUG:[cli circuit]:16 bytes -> WriteNotifyResponse(data_type=6, data_count=1, status=1, ioid=8)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing ReadNotifyRequest(data_type=20, data_count=0, sid=92, ioid=9)
DEBUG:[cli circuit]:Received 40 bytes.
DEBUG:[cli circuit]:40 bytes -> ReadNotifyResponse(data=array([ 0.2]), data_type=20, data_count=1, status=1, ioid=9, metadata=<caproto._dbr.DBR_TIME_DOUBLE object at 0x7f78ef7d47b8>)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing WriteNotifyRequest(data=array([ 0.3]), data_type=6, data_count=1, sid=92, ioid=10, metadata=None)
DEBUG:[cli circuit]:Received 16 bytes.
DEBUG:[cli circuit]:16 bytes -> WriteNotifyResponse(data_type=6, data_count=1, status=1, ioid=10)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing ReadNotifyRequest(data_type=20, data_count=0, sid=92, ioid=11)
DEBUG:[cli circuit]:Received 40 bytes.
DEBUG:[cli circuit]:40 bytes -> ReadNotifyResponse(data=array([ 0.3]), data_type=20, data_count=1, status=1, ioid=11, metadata=<caproto._dbr.DBR_TIME_DOUBLE object at 0x7f78ef7d4268>)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing ReadNotifyRequest(data_type=20, data_count=0, sid=92, ioid=12)
DEBUG:[cli circuit]:Received 40 bytes.
DEBUG:[cli circuit]:40 bytes -> ReadNotifyResponse(data=array([ 0.3]), data_type=20, data_count=1, status=1, ioid=12, metadata=<caproto._dbr.DBR_TIME_DOUBLE object at 0x7f78ef7d4378>)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing WriteNotifyRequest(data=array([ 0.4]), data_type=6, data_count=1, sid=92, ioid=13, metadata=None)
DEBUG:[cli circuit]:Received 16 bytes.
DEBUG:[cli circuit]:16 bytes -> WriteNotifyResponse(data_type=6, data_count=1, status=1, ioid=13)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Serializing ReadNotifyRequest(data_type=20, data_count=0, sid=92, ioid=14)
DEBUG:[cli circuit]:Received 40 bytes.
DEBUG:[cli circuit]:40 bytes -> ReadNotifyResponse(data=array([ 284.52237334]), data_type=20, data_count=1, status=1, ioid=14, metadata=<caproto._dbr.DBR_TIME_DOUBLE object at 0x7f78ef7d41e0>)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
--------------------------- Captured stdout teardown ---------------------------
disconnected!
Killing repeater process
Waiting
OK
--------------------------- Captured stderr teardown ---------------------------
DEBUG:[cli circuit]:Serializing ClearChannelRequest(sid=92, cid=0)
DEBUG:[cli circuit]:Received 16 bytes.
DEBUG:[cli circuit]:16 bytes -> ClearChannelResponse(sid=92, cid=0)
DEBUG:[cli circuit]:0 bytes are cached. Need more bytes to parse next command.
DEBUG:[cli circuit]:Zero-length recv; sending disconnect notification
=============== 1 failed, 366 passed, 2 skipped in 45.71 seconds ===============
This should be trivial: the UNIX-specific component of the CLI 'scripts' is extremely thin, and so the Windows one should be simple.
Following up on #107, it would be nice to replace the status
property on commands, which is currently a plain int
, with the corresponding CaStatusCode
object that includes an informative message. I'm not clear on whether the int in status
is the code
or the code_with_severity
. I recall that @klauer dropped all of _status.py
from heaven. Any guidance, Ken?
Several entries in the parameterized caput tests are commented out because the data does not round-trip (due to str vs byte issues, etc.)
Using Python 3.6.0 and numpy 1.11.2, bytes(numpy.array([1, 2, 3]))
fails and thus several tests that use that path fail. The nominally equivalent method, numpy.array([1, 2, 3]).tobytes()
works. This bug was be reproduced on Travis here.
TODO
A client may receive redundant responses from the same server (via multiple network interfaces) or different servers that have a PV of the same name. Both cases should be tested.
Review of discussion:
Currently, the default number of workers in the ThreadPoolExecutor that processes user callbacks is 1. If we increase that number, there is the chance that a worker will start processing "Update #2" before another worker is finished processing "Update #1". This could have significantly bad consequences if the updates relate to control flow, such as motors signaling the beginning and end of their movements or acquire bits of detectors going high and low.
How to solve this? If we isolate each PV to a separate queue or bind it to a specific worker, requiring that updates for a given PV be processed serially, with Update #2 only beginning after Update #1 is complete, sibling PVs on a given device might grow wildly out of sync.
But it does seems desirable to find a way to allow some parallelization because user callbacks commonly do I/O and even sleep.
We have caproto-repeater
. Add caproto-put
, caproto-get
, caproto-monitor
.
Currently, we run a caget
to make it easy to tell if the IOCs are up, but the build proceeds either way. We don't do anything to check the simulator.
Output dumps from failed tests indicate that something is leaving many threads running (a fixture? the threading client?) past when they should be cleaned up. This could be an issue with the client itself, not just a test issue.
Perhaps this could be investigated by asserting that threading.active_count()
is 1 right before tests exit.
Some ideas:
An umbrella issue copied from from #205:
scripts/
on the Windows PATH. Seems like the current setup.py
configuration doesn't do the trick, unclear if we should be using entry_points
or what. (#215)caproto.threading.client.SelectorThread
for Windows. (The current implementation uses a built-in Python module only available on UNIX.) (#215)Possibly of interest to @johnsinsheimer.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.