Coder Social home page Coder Social logo

intake-bluesky's Introduction

Intake Bluesky

This project has been merged into https://github.com/bluesky/databroker. It was a space for prototyping databroker's integration with intake. It has now been deprecated.

intake-bluesky's People

Contributors

danielballan avatar gwbischof avatar jklynch avatar mrakitin avatar ronpandolfi avatar tacaswell avatar

Stargazers

 avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

intake-bluesky's Issues

list index out of range

run_id: 25de2b3e-e880-455e-9061-0b326c40eb49


IndexError Traceback (most recent call last)
in
2 print(run)
3 if run not in ['fec131f6-74e9-4309-bfdb-ec500f7c1c33','5789e7e3-0c61-4836-8f77-eeea456741c0']:
----> 4 a[run].primary.read()

~/conda_envs/garrett/lib/python3.7/site-packages/intake_xarray/base.py in read(self)
38 def read(self):
39 """Return a version of the xarray with all the data in memory"""
---> 40 self._load_metadata()
41 return self._ds.load()
42

~/conda_envs/garrett/lib/python3.7/site-packages/intake/source/base.py in _load_metadata(self)
115 """load metadata only if needed"""
116 if self._schema is None:
--> 117 self._schema = self._get_schema()
118 self.datashape = self._schema.datashape
119 self.dtype = self._schema.dtype

~/conda_envs/garrett/lib/python3.7/site-packages/intake_xarray/base.py in _get_schema(self)
17
18 if self._ds is None:
---> 19 self._open_dataset()
20
21 metadata = {

~/dama/intake-bluesky/intake_bluesky/core.py in _open_dataset(self)
592 get_datum_cursor=self._get_datum_cursor,
593 include=self.include,
--> 594 exclude=self.exclude)
595
596

~/dama/intake-bluesky/intake_bluesky/core.py in documents_to_xarray(start_doc, stop_doc, descriptor_docs, event_docs, filler, get_resource, get_datum, get_datum_cursor, include, exclude)
106 field_metadata = data_keys[key]
107 # Verify the actual ndim by looking at the data.
--> 108 ndim = numpy.asarray(data_table[key][0]).ndim
109 dims = None
110 if 'dims' in field_metadata:

IndexError: list index out of range
โ€‹

Make tests run faster by doing intake_server fixture per module not per function

The tests run slowly right now because each tests stand up a fresh intake_server fixture, which involves starting up a Tornado server. A separate server per test has advantages---it ensures that tests don't mutually interfere via persisted state on the server---and it probably makes sense for intake tests to take that approach.

But if we can assume that the intake server itself it working, I think it is safe to set up one server per module rather than one server per test. My guess is that this would reduce the test time by ~100X and make development much nicer.

ConnectionError: HTTPConnectionPool failed to establish new connection.


self = <urllib3.connection.HTTPConnection object at 0x7f617c245550>

    def _new_conn(self):
        """ Establish a socket connection and set nodelay settings on it.
    
        :return: New socket connection.
        """
        extra_kw = {}
        if self.source_address:
            extra_kw['source_address'] = self.source_address
    
        if self.socket_options:
            extra_kw['socket_options'] = self.socket_options
    
        try:
            conn = connection.create_connection(
>               (self._dns_host, self.port), self.timeout, **extra_kw)

../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/urllib3/connection.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

address = ('localhost', 7481), timeout = None, source_address = None, socket_options = [(6, 1, 1)]

    def create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
                          source_address=None, socket_options=None):
        """Connect to *address* and return the socket object.
    
        Convenience function.  Connect to *address* (a 2-tuple ``(host,
        port)``) and return the socket object.  Passing the optional
        *timeout* parameter will set the timeout on the socket instance
        before attempting to connect.  If no *timeout* is supplied, the
        global default timeout setting returned by :func:`getdefaulttimeout`
        is used.  If *source_address* is set it must be a tuple of (host, port)
        for the socket to bind as a source address before making the connection.
        An host of '' or port 0 tells the OS to use the default.
        """
    
        host, port = address
        if host.startswith('['):
            host = host.strip('[]')
        err = None
    
        # Using the value from allowed_gai_family() in the context of getaddrinfo lets
        # us select whether to work with IPv4 DNS records, IPv6 records, or both.
        # The original create_connection function always returns all records.
        family = allowed_gai_family()
    
        for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
            af, socktype, proto, canonname, sa = res
            sock = None
            try:
                sock = socket.socket(af, socktype, proto)
    
                # If provided, set socket level options before connecting.
                _set_socket_options(sock, socket_options)
    
                if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
                    sock.settimeout(timeout)
                if source_address:
                    sock.bind(source_address)
                sock.connect(sa)
                return sock
    
            except socket.error as e:
                err = e
                if sock is not None:
                    sock.close()
                    sock = None
    
        if err is not None:
>           raise err

../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/urllib3/util/connection.py:80: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

address = ('localhost', 7481), timeout = None, source_address = None, socket_options = [(6, 1, 1)]

    def create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
                          source_address=None, socket_options=None):
        """Connect to *address* and return the socket object.
    
        Convenience function.  Connect to *address* (a 2-tuple ``(host,
        port)``) and return the socket object.  Passing the optional
        *timeout* parameter will set the timeout on the socket instance
        before attempting to connect.  If no *timeout* is supplied, the
        global default timeout setting returned by :func:`getdefaulttimeout`
        is used.  If *source_address* is set it must be a tuple of (host, port)
        for the socket to bind as a source address before making the connection.
        An host of '' or port 0 tells the OS to use the default.
        """
    
        host, port = address
        if host.startswith('['):
            host = host.strip('[]')
        err = None
    
        # Using the value from allowed_gai_family() in the context of getaddrinfo lets
        # us select whether to work with IPv4 DNS records, IPv6 records, or both.
        # The original create_connection function always returns all records.
        family = allowed_gai_family()
    
        for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
            af, socktype, proto, canonname, sa = res
            sock = None
            try:
                sock = socket.socket(af, socktype, proto)
    
                # If provided, set socket level options before connecting.
                _set_socket_options(sock, socket_options)
    
                if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
                    sock.settimeout(timeout)
                if source_address:
                    sock.bind(source_address)
>               sock.connect(sa)
E               ConnectionRefusedError: [Errno 111] Connection refused

../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/urllib3/util/connection.py:70: ConnectionRefusedError

During handling of the above exception, another exception occurred:

self = <urllib3.connectionpool.HTTPConnectionPool object at 0x7f617c2455f8>, method = 'GET'
url = '/v1/info', body = None
headers = {'User-Agent': 'python-requests/2.22.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive'}
retries = Retry(total=0, connect=None, read=False, redirect=None, status=None), redirect = False
assert_same_host = False, timeout = <urllib3.util.timeout.Timeout object at 0x7f617c245208>
pool_timeout = None, release_conn = False, chunked = False, body_pos = None
response_kw = {'decode_content': False, 'preload_content': False}, conn = None, release_this_conn = True
err = None, clean_exit = False, timeout_obj = <urllib3.util.timeout.Timeout object at 0x7f617c245128>
is_new_proxy_conn = False

    def urlopen(self, method, url, body=None, headers=None, retries=None,
                redirect=True, assert_same_host=True, timeout=_Default,
                pool_timeout=None, release_conn=None, chunked=False,
                body_pos=None, **response_kw):
        """
        Get a connection from the pool and perform an HTTP request. This is the
        lowest level call for making a request, so you'll need to specify all
        the raw details.
    
        .. note::
    
           More commonly, it's appropriate to use a convenience method provided
           by :class:`.RequestMethods`, such as :meth:`request`.
    
        .. note::
    
           `release_conn` will only behave as expected if
           `preload_content=False` because we want to make
           `preload_content=False` the default behaviour someday soon without
           breaking backwards compatibility.
    
        :param method:
            HTTP request method (such as GET, POST, PUT, etc.)
    
        :param body:
            Data to send in the request body (useful for creating
            POST requests, see HTTPConnectionPool.post_url for
            more convenience).
    
        :param headers:
            Dictionary of custom headers to send, such as User-Agent,
            If-None-Match, etc. If None, pool headers are used. If provided,
            these headers completely replace any pool-specific headers.
    
        :param retries:
            Configure the number of retries to allow before raising a
            :class:`~urllib3.exceptions.MaxRetryError` exception.
    
            Pass ``None`` to retry until you receive a response. Pass a
            :class:`~urllib3.util.retry.Retry` object for fine-grained control
            over different types of retries.
            Pass an integer number to retry connection errors that many times,
            but no other types of errors. Pass zero to never retry.
    
            If ``False``, then retries are disabled and any exception is raised
            immediately. Also, instead of raising a MaxRetryError on redirects,
            the redirect response will be returned.
    
        :type retries: :class:`~urllib3.util.retry.Retry`, False, or an int.
    
        :param redirect:
            If True, automatically handle redirects (status codes 301, 302,
            303, 307, 308). Each redirect counts as a retry. Disabling retries
            will disable redirect, too.
    
        :param assert_same_host:
            If ``True``, will make sure that the host of the pool requests is
            consistent else will raise HostChangedError. When False, you can
            use the pool on an HTTP proxy and request foreign hosts.
    
        :param timeout:
            If specified, overrides the default timeout for this one
            request. It may be a float (in seconds) or an instance of
            :class:`urllib3.util.Timeout`.
    
        :param pool_timeout:
            If set and the pool is set to block=True, then this method will
            block for ``pool_timeout`` seconds and raise EmptyPoolError if no
            connection is available within the time period.
    
        :param release_conn:
            If False, then the urlopen call will not release the connection
            back into the pool once a response is received (but will release if
            you read the entire contents of the response such as when
            `preload_content=True`). This is useful if you're not preloading
            the response's content immediately. You will need to call
            ``r.release_conn()`` on the response ``r`` to return the connection
            back into the pool. If None, it takes the value of
            ``response_kw.get('preload_content', True)``.
    
        :param chunked:
            If True, urllib3 will send the body using chunked transfer
            encoding. Otherwise, urllib3 will send the body using the standard
            content-length form. Defaults to False.
    
        :param int body_pos:
            Position to seek to in file-like body in the event of a retry or
            redirect. Typically this won't need to be set because urllib3 will
            auto-populate the value when needed.
    
        :param \\**response_kw:
            Additional parameters are passed to
            :meth:`urllib3.response.HTTPResponse.from_httplib`
        """
        if headers is None:
            headers = self.headers
    
        if not isinstance(retries, Retry):
            retries = Retry.from_int(retries, redirect=redirect, default=self.retries)
    
        if release_conn is None:
            release_conn = response_kw.get('preload_content', True)
    
        # Check host
        if assert_same_host and not self.is_same_host(url):
            raise HostChangedError(self, url, retries)
    
        conn = None
    
        # Track whether `conn` needs to be released before
        # returning/raising/recursing. Update this variable if necessary, and
        # leave `release_conn` constant throughout the function. That way, if
        # the function recurses, the original value of `release_conn` will be
        # passed down into the recursive call, and its value will be respected.
        #
        # See issue #651 [1] for details.
        #
        # [1] <https://github.com/shazow/urllib3/issues/651>
        release_this_conn = release_conn
    
        # Merge the proxy headers. Only do this in HTTP. We have to copy the
        # headers dict so we can safely change it without those changes being
        # reflected in anyone else's copy.
        if self.scheme == 'http':
            headers = headers.copy()
            headers.update(self.proxy_headers)
    
        # Must keep the exception bound to a separate variable or else Python 3
        # complains about UnboundLocalError.
        err = None
    
        # Keep track of whether we cleanly exited the except block. This
        # ensures we do proper cleanup in finally.
        clean_exit = False
    
        # Rewind body position, if needed. Record current position
        # for future rewinds in the event of a redirect/retry.
        body_pos = set_file_position(body, body_pos)
    
        try:
            # Request a connection from the queue.
            timeout_obj = self._get_timeout(timeout)
            conn = self._get_conn(timeout=pool_timeout)
    
            conn.timeout = timeout_obj.connect_timeout
    
            is_new_proxy_conn = self.proxy is not None and not getattr(conn, 'sock', None)
            if is_new_proxy_conn:
                self._prepare_proxy(conn)
    
            # Make the request on the httplib connection object.
            httplib_response = self._make_request(conn, method, url,
                                                  timeout=timeout_obj,
                                                  body=body, headers=headers,
>                                                 chunked=chunked)

../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/urllib3/connectionpool.py:603: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <urllib3.connectionpool.HTTPConnectionPool object at 0x7f617c2455f8>
conn = <urllib3.connection.HTTPConnection object at 0x7f617c245550>, method = 'GET', url = '/v1/info'
timeout = <urllib3.util.timeout.Timeout object at 0x7f617c245128>, chunked = False
httplib_request_kw = {'body': None, 'headers': {'User-Agent': 'python-requests/2.22.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive'}}
timeout_obj = <urllib3.util.timeout.Timeout object at 0x7f617c245630>

    def _make_request(self, conn, method, url, timeout=_Default, chunked=False,
                      **httplib_request_kw):
        """
        Perform a request on a given urllib connection object taken from our
        pool.
    
        :param conn:
            a connection from one of our connection pools
    
        :param timeout:
            Socket timeout in seconds for the request. This can be a
            float or integer, which will set the same timeout value for
            the socket connect and the socket read, or an instance of
            :class:`urllib3.util.Timeout`, which gives you more fine-grained
            control over your timeouts.
        """
        self.num_requests += 1
    
        timeout_obj = self._get_timeout(timeout)
        timeout_obj.start_connect()
        conn.timeout = timeout_obj.connect_timeout
    
        # Trigger any extra validation we need to do.
        try:
            self._validate_conn(conn)
        except (SocketTimeout, BaseSSLError) as e:
            # Py2 raises this as a BaseSSLError, Py3 raises it as socket timeout.
            self._raise_timeout(err=e, url=url, timeout_value=conn.timeout)
            raise
    
        # conn.request() calls httplib.*.request, not the method in
        # urllib3.request. It also calls makefile (recv) on the socket.
        if chunked:
            conn.request_chunked(method, url, **httplib_request_kw)
        else:
>           conn.request(method, url, **httplib_request_kw)

../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/urllib3/connectionpool.py:355: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <urllib3.connection.HTTPConnection object at 0x7f617c245550>, method = 'GET', url = '/v1/info'
body = None
headers = {'User-Agent': 'python-requests/2.22.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive'}

    def request(self, method, url, body=None, headers={}, *,
                encode_chunked=False):
        """Send a complete request to the server."""
>       self._send_request(method, url, body, headers, encode_chunked)

../../../anaconda3/envs/garrett2/lib/python3.7/http/client.py:1229: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <urllib3.connection.HTTPConnection object at 0x7f617c245550>, method = 'GET', url = '/v1/info'
body = None
headers = {'User-Agent': 'python-requests/2.22.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive'}
encode_chunked = False

    def _send_request(self, method, url, body, headers, encode_chunked):
        # Honor explicitly requested Host: and Accept-Encoding: headers.
        header_names = frozenset(k.lower() for k in headers)
        skips = {}
        if 'host' in header_names:
            skips['skip_host'] = 1
        if 'accept-encoding' in header_names:
            skips['skip_accept_encoding'] = 1
    
        self.putrequest(method, url, **skips)
    
        # chunked encoding will happen if HTTP/1.1 is used and either
        # the caller passes encode_chunked=True or the following
        # conditions hold:
        # 1. content-length has not been explicitly set
        # 2. the body is a file or iterable, but not a str or bytes-like
        # 3. Transfer-Encoding has NOT been explicitly set by the caller
    
        if 'content-length' not in header_names:
            # only chunk body if not explicitly set for backwards
            # compatibility, assuming the client code is already handling the
            # chunking
            if 'transfer-encoding' not in header_names:
                # if content-length cannot be automatically determined, fall
                # back to chunked encoding
                encode_chunked = False
                content_length = self._get_content_length(body, method)
                if content_length is None:
                    if body is not None:
                        if self.debuglevel > 0:
                            print('Unable to determine size of %r' % body)
                        encode_chunked = True
                        self.putheader('Transfer-Encoding', 'chunked')
                else:
                    self.putheader('Content-Length', str(content_length))
        else:
            encode_chunked = False
    
        for hdr, value in headers.items():
            self.putheader(hdr, value)
        if isinstance(body, str):
            # RFC 2616 Section 3.7.1 says that text default has a
            # default charset of iso-8859-1.
            body = _encode(body, 'body')
>       self.endheaders(body, encode_chunked=encode_chunked)

../../../anaconda3/envs/garrett2/lib/python3.7/http/client.py:1275: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <urllib3.connection.HTTPConnection object at 0x7f617c245550>, message_body = None

    def endheaders(self, message_body=None, *, encode_chunked=False):
        """Indicate that the last header line has been sent to the server.
    
        This method sends the request to the server.  The optional message_body
        argument can be used to pass a message body associated with the
        request.
        """
        if self.__state == _CS_REQ_STARTED:
            self.__state = _CS_REQ_SENT
        else:
            raise CannotSendHeader()
>       self._send_output(message_body, encode_chunked=encode_chunked)

../../../anaconda3/envs/garrett2/lib/python3.7/http/client.py:1224: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <urllib3.connection.HTTPConnection object at 0x7f617c245550>, message_body = None
encode_chunked = False

    def _send_output(self, message_body=None, encode_chunked=False):
        """Send the currently buffered request and clear the buffer.
    
        Appends an extra \\r\\n to the buffer.
        A message_body may be specified, to be appended to the request.
        """
        self._buffer.extend((b"", b""))
        msg = b"\r\n".join(self._buffer)
        del self._buffer[:]
>       self.send(msg)

../../../anaconda3/envs/garrett2/lib/python3.7/http/client.py:1016: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <urllib3.connection.HTTPConnection object at 0x7f617c245550>
data = b'GET /v1/info HTTP/1.1\r\nHost: localhost:7481\r\nUser-Agent: python-requests/2.22.0\r\nAccept-Encoding: gzip, deflate\r\nAccept: */*\r\nConnection: keep-alive\r\n\r\n'

    def send(self, data):
        """Send `data' to the server.
        ``data`` can be a string object, a bytes object, an array object, a
        file-like object that supports a .read() method, or an iterable object.
        """
    
        if self.sock is None:
            if self.auto_open:
>               self.connect()

../../../anaconda3/envs/garrett2/lib/python3.7/http/client.py:956: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <urllib3.connection.HTTPConnection object at 0x7f617c245550>

    def connect(self):
>       conn = self._new_conn()

../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/urllib3/connection.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <urllib3.connection.HTTPConnection object at 0x7f617c245550>

    def _new_conn(self):
        """ Establish a socket connection and set nodelay settings on it.
    
        :return: New socket connection.
        """
        extra_kw = {}
        if self.source_address:
            extra_kw['source_address'] = self.source_address
    
        if self.socket_options:
            extra_kw['socket_options'] = self.socket_options
    
        try:
            conn = connection.create_connection(
                (self._dns_host, self.port), self.timeout, **extra_kw)
    
        except SocketTimeout:
            raise ConnectTimeoutError(
                self, "Connection to %s timed out. (connect timeout=%s)" %
                (self.host, self.timeout))
    
        except SocketError as e:
            raise NewConnectionError(
>               self, "Failed to establish a new connection: %s" % e)
E           urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f617c245550>: Failed to establish a new connection: [Errno 111] Connection refused

../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/urllib3/connection.py:169: NewConnectionError

During handling of the above exception, another exception occurred:

self = <requests.adapters.HTTPAdapter object at 0x7f615c5ff2e8>, request = <PreparedRequest [GET]>
stream = False, timeout = <urllib3.util.timeout.Timeout object at 0x7f617c245208>, verify = True
cert = None, proxies = OrderedDict()

    def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
        """Sends PreparedRequest object. Returns Response object.
    
        :param request: The :class:`PreparedRequest <PreparedRequest>` being sent.
        :param stream: (optional) Whether to stream the request content.
        :param timeout: (optional) How long to wait for the server to send
            data before giving up, as a float, or a :ref:`(connect timeout,
            read timeout) <timeouts>` tuple.
        :type timeout: float or tuple or urllib3 Timeout object
        :param verify: (optional) Either a boolean, in which case it controls whether
            we verify the server's TLS certificate, or a string, in which case it
            must be a path to a CA bundle to use
        :param cert: (optional) Any user-provided SSL certificate to be trusted.
        :param proxies: (optional) The proxies dictionary to apply to the request.
        :rtype: requests.Response
        """
    
        try:
            conn = self.get_connection(request.url, proxies)
        except LocationValueError as e:
            raise InvalidURL(e, request=request)
    
        self.cert_verify(conn, request.url, verify, cert)
        url = self.request_url(request, proxies)
        self.add_headers(request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies)
    
        chunked = not (request.body is None or 'Content-Length' in request.headers)
    
        if isinstance(timeout, tuple):
            try:
                connect, read = timeout
                timeout = TimeoutSauce(connect=connect, read=read)
            except ValueError as e:
                # this may raise a string formatting error.
                err = ("Invalid timeout {}. Pass a (connect, read) "
                       "timeout tuple, or a single float to set "
                       "both timeouts to the same value".format(timeout))
                raise ValueError(err)
        elif isinstance(timeout, TimeoutSauce):
            pass
        else:
            timeout = TimeoutSauce(connect=timeout, read=timeout)
    
        try:
            if not chunked:
                resp = conn.urlopen(
                    method=request.method,
                    url=url,
                    body=request.body,
                    headers=request.headers,
                    redirect=False,
                    assert_same_host=False,
                    preload_content=False,
                    decode_content=False,
                    retries=self.max_retries,
>                   timeout=timeout
                )

../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/requests/adapters.py:449: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <urllib3.connectionpool.HTTPConnectionPool object at 0x7f617c2455f8>, method = 'GET'
url = '/v1/info', body = None
headers = {'User-Agent': 'python-requests/2.22.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive'}
retries = Retry(total=0, connect=None, read=False, redirect=None, status=None), redirect = False
assert_same_host = False, timeout = <urllib3.util.timeout.Timeout object at 0x7f617c245208>
pool_timeout = None, release_conn = False, chunked = False, body_pos = None
response_kw = {'decode_content': False, 'preload_content': False}, conn = None, release_this_conn = True
err = None, clean_exit = False, timeout_obj = <urllib3.util.timeout.Timeout object at 0x7f617c245128>
is_new_proxy_conn = False

    def urlopen(self, method, url, body=None, headers=None, retries=None,
                redirect=True, assert_same_host=True, timeout=_Default,
                pool_timeout=None, release_conn=None, chunked=False,
                body_pos=None, **response_kw):
        """
        Get a connection from the pool and perform an HTTP request. This is the
        lowest level call for making a request, so you'll need to specify all
        the raw details.
    
        .. note::
    
           More commonly, it's appropriate to use a convenience method provided
           by :class:`.RequestMethods`, such as :meth:`request`.
    
        .. note::
    
           `release_conn` will only behave as expected if
           `preload_content=False` because we want to make
           `preload_content=False` the default behaviour someday soon without
           breaking backwards compatibility.
    
        :param method:
            HTTP request method (such as GET, POST, PUT, etc.)
    
        :param body:
            Data to send in the request body (useful for creating
            POST requests, see HTTPConnectionPool.post_url for
            more convenience).
    
        :param headers:
            Dictionary of custom headers to send, such as User-Agent,
            If-None-Match, etc. If None, pool headers are used. If provided,
            these headers completely replace any pool-specific headers.
    
        :param retries:
            Configure the number of retries to allow before raising a
            :class:`~urllib3.exceptions.MaxRetryError` exception.
    
            Pass ``None`` to retry until you receive a response. Pass a
            :class:`~urllib3.util.retry.Retry` object for fine-grained control
            over different types of retries.
            Pass an integer number to retry connection errors that many times,
            but no other types of errors. Pass zero to never retry.
    
            If ``False``, then retries are disabled and any exception is raised
            immediately. Also, instead of raising a MaxRetryError on redirects,
            the redirect response will be returned.
    
        :type retries: :class:`~urllib3.util.retry.Retry`, False, or an int.
    
        :param redirect:
            If True, automatically handle redirects (status codes 301, 302,
            303, 307, 308). Each redirect counts as a retry. Disabling retries
            will disable redirect, too.
    
        :param assert_same_host:
            If ``True``, will make sure that the host of the pool requests is
            consistent else will raise HostChangedError. When False, you can
            use the pool on an HTTP proxy and request foreign hosts.
    
        :param timeout:
            If specified, overrides the default timeout for this one
            request. It may be a float (in seconds) or an instance of
            :class:`urllib3.util.Timeout`.
    
        :param pool_timeout:
            If set and the pool is set to block=True, then this method will
            block for ``pool_timeout`` seconds and raise EmptyPoolError if no
            connection is available within the time period.
    
        :param release_conn:
            If False, then the urlopen call will not release the connection
            back into the pool once a response is received (but will release if
            you read the entire contents of the response such as when
            `preload_content=True`). This is useful if you're not preloading
            the response's content immediately. You will need to call
            ``r.release_conn()`` on the response ``r`` to return the connection
            back into the pool. If None, it takes the value of
            ``response_kw.get('preload_content', True)``.
    
        :param chunked:
            If True, urllib3 will send the body using chunked transfer
            encoding. Otherwise, urllib3 will send the body using the standard
            content-length form. Defaults to False.
    
        :param int body_pos:
            Position to seek to in file-like body in the event of a retry or
            redirect. Typically this won't need to be set because urllib3 will
            auto-populate the value when needed.
    
        :param \\**response_kw:
            Additional parameters are passed to
            :meth:`urllib3.response.HTTPResponse.from_httplib`
        """
        if headers is None:
            headers = self.headers
    
        if not isinstance(retries, Retry):
            retries = Retry.from_int(retries, redirect=redirect, default=self.retries)
    
        if release_conn is None:
            release_conn = response_kw.get('preload_content', True)
    
        # Check host
        if assert_same_host and not self.is_same_host(url):
            raise HostChangedError(self, url, retries)
    
        conn = None
    
        # Track whether `conn` needs to be released before
        # returning/raising/recursing. Update this variable if necessary, and
        # leave `release_conn` constant throughout the function. That way, if
        # the function recurses, the original value of `release_conn` will be
        # passed down into the recursive call, and its value will be respected.
        #
        # See issue #651 [1] for details.
        #
        # [1] <https://github.com/shazow/urllib3/issues/651>
        release_this_conn = release_conn
    
        # Merge the proxy headers. Only do this in HTTP. We have to copy the
        # headers dict so we can safely change it without those changes being
        # reflected in anyone else's copy.
        if self.scheme == 'http':
            headers = headers.copy()
            headers.update(self.proxy_headers)
    
        # Must keep the exception bound to a separate variable or else Python 3
        # complains about UnboundLocalError.
        err = None
    
        # Keep track of whether we cleanly exited the except block. This
        # ensures we do proper cleanup in finally.
        clean_exit = False
    
        # Rewind body position, if needed. Record current position
        # for future rewinds in the event of a redirect/retry.
        body_pos = set_file_position(body, body_pos)
    
        try:
            # Request a connection from the queue.
            timeout_obj = self._get_timeout(timeout)
            conn = self._get_conn(timeout=pool_timeout)
    
            conn.timeout = timeout_obj.connect_timeout
    
            is_new_proxy_conn = self.proxy is not None and not getattr(conn, 'sock', None)
            if is_new_proxy_conn:
                self._prepare_proxy(conn)
    
            # Make the request on the httplib connection object.
            httplib_response = self._make_request(conn, method, url,
                                                  timeout=timeout_obj,
                                                  body=body, headers=headers,
                                                  chunked=chunked)
    
            # If we're going to release the connection in ``finally:``, then
            # the response doesn't need to know about the connection. Otherwise
            # it will also try to release it and we'll have a double-release
            # mess.
            response_conn = conn if not release_conn else None
    
            # Pass method to Response for length checking
            response_kw['request_method'] = method
    
            # Import httplib's response into our own wrapper object
            response = self.ResponseCls.from_httplib(httplib_response,
                                                     pool=self,
                                                     connection=response_conn,
                                                     retries=retries,
                                                     **response_kw)
    
            # Everything went great!
            clean_exit = True
    
        except queue.Empty:
            # Timed out by queue.
            raise EmptyPoolError(self, "No pool connections are available.")
    
        except (TimeoutError, HTTPException, SocketError, ProtocolError,
                BaseSSLError, SSLError, CertificateError) as e:
            # Discard the connection for these exceptions. It will be
            # replaced during the next _get_conn() call.
            clean_exit = False
            if isinstance(e, (BaseSSLError, CertificateError)):
                e = SSLError(e)
            elif isinstance(e, (SocketError, NewConnectionError)) and self.proxy:
                e = ProxyError('Cannot connect to proxy.', e)
            elif isinstance(e, (SocketError, HTTPException)):
                e = ProtocolError('Connection aborted.', e)
    
            retries = retries.increment(method, url, error=e, _pool=self,
>                                       _stacktrace=sys.exc_info()[2])

../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/urllib3/connectionpool.py:641: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Retry(total=0, connect=None, read=False, redirect=None, status=None), method = 'GET'
url = '/v1/info', response = None
error = NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f617c245550>: Failed to establish a new connection: [Errno 111] Connection refused')
_pool = <urllib3.connectionpool.HTTPConnectionPool object at 0x7f617c2455f8>
_stacktrace = <traceback object at 0x7f617c37ea88>

    def increment(self, method=None, url=None, response=None, error=None,
                  _pool=None, _stacktrace=None):
        """ Return a new Retry object with incremented retry counters.
    
        :param response: A response object, or None, if the server did not
            return a response.
        :type response: :class:`~urllib3.response.HTTPResponse`
        :param Exception error: An error encountered during the request, or
            None if the response was received successfully.
    
        :return: A new ``Retry`` object.
        """
        if self.total is False and error:
            # Disabled, indicate to re-raise the error.
            raise six.reraise(type(error), error, _stacktrace)
    
        total = self.total
        if total is not None:
            total -= 1
    
        connect = self.connect
        read = self.read
        redirect = self.redirect
        status_count = self.status
        cause = 'unknown'
        status = None
        redirect_location = None
    
        if error and self._is_connection_error(error):
            # Connect retry?
            if connect is False:
                raise six.reraise(type(error), error, _stacktrace)
            elif connect is not None:
                connect -= 1
    
        elif error and self._is_read_error(error):
            # Read retry?
            if read is False or not self._is_method_retryable(method):
                raise six.reraise(type(error), error, _stacktrace)
            elif read is not None:
                read -= 1
    
        elif response and response.get_redirect_location():
            # Redirect retry?
            if redirect is not None:
                redirect -= 1
            cause = 'too many redirects'
            redirect_location = response.get_redirect_location()
            status = response.status
    
        else:
            # Incrementing because of a server error like a 500 in
            # status_forcelist and a the given method is in the whitelist
            cause = ResponseError.GENERIC_ERROR
            if response and response.status:
                if status_count is not None:
                    status_count -= 1
                cause = ResponseError.SPECIFIC_ERROR.format(
                    status_code=response.status)
                status = response.status
    
        history = self.history + (RequestHistory(method, url, error, status, redirect_location),)
    
        new_retry = self.new(
            total=total,
            connect=connect, read=read, redirect=redirect, status=status_count,
            history=history)
    
        if new_retry.is_exhausted():
>           raise MaxRetryError(_pool, url, error or ResponseError(cause))
E           urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=7481): Max retries exceeded with url: /v1/info (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f617c245550>: Failed to establish a new connection: [Errno 111] Connection refused'))

../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/urllib3/util/retry.py:399: MaxRetryError

During handling of the above exception, another exception occurred:

request = <SubRequest 'intake_server' for <Function test_fixture[local-scalar]>>

    @pytest.fixture(scope="module")
    def intake_server(request):
        os.environ['INTAKE_DEBUG'] = 'true'
        # Catalog path comes from the test module
        path = request.module.TEST_CATALOG_PATH
        if isinstance(path, list):
            catalog_path = [p + '/*' for p in path]
        elif isinstance(path, str) and not path.endswith(
                '.yml') and not path.endswith('.yaml'):
            catalog_path = path + '/*'
        else:
            catalog_path = path
        server_conf = getattr(request.module, 'TEST_SERVER_CONF', None)
    
        # Start a catalog server on nonstandard port
    
        env = dict(os.environ)
        env['INTAKE_TEST'] = 'server'
        if server_conf is not None:
            env['INTAKE_CONF_FILE'] = server_conf
        port = pick_port()
        cmd = [ex, '-m', 'intake.cli.server', '--sys-exit-on-sigterm',
               '--port', str(port)]
        if isinstance(catalog_path, list):
            cmd.extend(catalog_path)
        else:
            cmd.append(catalog_path)
        try:
            p = subprocess.Popen(cmd, env=env)
            url = 'http://localhost:%d/v1/info' % port
    
            # wait for server to finish initalizing, but let the exception through
            # on last retry
            retries = 300
            try:
>               while not ping_server(url, swallow_exception=(retries > 1)):

../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/intake/conftest.py:102: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/intake/conftest.py:51: in ping_server
    raise e
../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/intake/conftest.py:46: in ping_server
    r = requests.get(url)
../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/requests/api.py:75: in get
    return request('get', url, params=params, **kwargs)
../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/requests/api.py:60: in request
    return session.request(method=method, url=url, **kwargs)
../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/requests/sessions.py:533: in request
    resp = self.send(prep, **send_kwargs)
../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/requests/sessions.py:646: in send
    r = adapter.send(request, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <requests.adapters.HTTPAdapter object at 0x7f615c5ff2e8>, request = <PreparedRequest [GET]>
stream = False, timeout = <urllib3.util.timeout.Timeout object at 0x7f617c245208>, verify = True
cert = None, proxies = OrderedDict()

    def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
        """Sends PreparedRequest object. Returns Response object.
    
        :param request: The :class:`PreparedRequest <PreparedRequest>` being sent.
        :param stream: (optional) Whether to stream the request content.
        :param timeout: (optional) How long to wait for the server to send
            data before giving up, as a float, or a :ref:`(connect timeout,
            read timeout) <timeouts>` tuple.
        :type timeout: float or tuple or urllib3 Timeout object
        :param verify: (optional) Either a boolean, in which case it controls whether
            we verify the server's TLS certificate, or a string, in which case it
            must be a path to a CA bundle to use
        :param cert: (optional) Any user-provided SSL certificate to be trusted.
        :param proxies: (optional) The proxies dictionary to apply to the request.
        :rtype: requests.Response
        """
    
        try:
            conn = self.get_connection(request.url, proxies)
        except LocationValueError as e:
            raise InvalidURL(e, request=request)
    
        self.cert_verify(conn, request.url, verify, cert)
        url = self.request_url(request, proxies)
        self.add_headers(request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies)
    
        chunked = not (request.body is None or 'Content-Length' in request.headers)
    
        if isinstance(timeout, tuple):
            try:
                connect, read = timeout
                timeout = TimeoutSauce(connect=connect, read=read)
            except ValueError as e:
                # this may raise a string formatting error.
                err = ("Invalid timeout {}. Pass a (connect, read) "
                       "timeout tuple, or a single float to set "
                       "both timeouts to the same value".format(timeout))
                raise ValueError(err)
        elif isinstance(timeout, TimeoutSauce):
            pass
        else:
            timeout = TimeoutSauce(connect=timeout, read=timeout)
    
        try:
            if not chunked:
                resp = conn.urlopen(
                    method=request.method,
                    url=url,
                    body=request.body,
                    headers=request.headers,
                    redirect=False,
                    assert_same_host=False,
                    preload_content=False,
                    decode_content=False,
                    retries=self.max_retries,
                    timeout=timeout
                )
    
            # Send the request.
            else:
                if hasattr(conn, 'proxy_pool'):
                    conn = conn.proxy_pool
    
                low_conn = conn._get_conn(timeout=DEFAULT_POOL_TIMEOUT)
    
                try:
                    low_conn.putrequest(request.method,
                                        url,
                                        skip_accept_encoding=True)
    
                    for header, value in request.headers.items():
                        low_conn.putheader(header, value)
    
                    low_conn.endheaders()
    
                    for i in request.body:
                        low_conn.send(hex(len(i))[2:].encode('utf-8'))
                        low_conn.send(b'\r\n')
                        low_conn.send(i)
                        low_conn.send(b'\r\n')
                    low_conn.send(b'0\r\n\r\n')
    
                    # Receive the response from the server
                    try:
                        # For Python 2.7, use buffering of HTTP responses
                        r = low_conn.getresponse(buffering=True)
                    except TypeError:
                        # For compatibility with Python 3.3+
                        r = low_conn.getresponse()
    
                    resp = HTTPResponse.from_httplib(
                        r,
                        pool=conn,
                        connection=low_conn,
                        preload_content=False,
                        decode_content=False
                    )
                except:
                    # If we hit any problems here, clean up the connection.
                    # Then, reraise so that we can handle the actual exception.
                    low_conn.close()
                    raise
    
        except (ProtocolError, socket.error) as err:
            raise ConnectionError(err, request=request)
    
        except MaxRetryError as e:
            if isinstance(e.reason, ConnectTimeoutError):
                # TODO: Remove this in 3.0.0: see #2811
                if not isinstance(e.reason, NewConnectionError):
                    raise ConnectTimeout(e, request=request)
    
            if isinstance(e.reason, ResponseError):
                raise RetryError(e, request=request)
    
            if isinstance(e.reason, _ProxyError):
                raise ProxyError(e, request=request)
    
            if isinstance(e.reason, _SSLError):
                # This branch is for urllib3 v1.22 and later.
                raise SSLError(e, request=request)
    
>           raise ConnectionError(e, request=request)
E           requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=7481): Max retries exceeded with url: /v1/info (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f617c245550>: Failed to establish a new connection: [Errno 111] Connection refused'))

../../../anaconda3/envs/garrett2/lib/python3.7/site-packages/requests/adapters.py:516: ConnectionError

Resolve issue with intake 0.5.0+

We are currently pinned to intake 0.4.4 because intake 0.5.5 changed some behavior that causes our remote tests to fail. We will identify the problem and their either submit a patch upstream or adjust our usage, depending on the nature of the problem.

Dictionary changed size during iteration / catalog._run_starts

When trying the bluesky-browser:

Traceback (most recent call last):
  File "/Users/klauer/docs/Repos/bluesky-browser/bluesky_browser/search.py", line 164, in show_results
    for uid, entry in itertools.islice(self._results_catalog.items(), MAX_SEARCH_RESULTS):
  File "/Users/klauer/mc/envs/bluesky-browser/lib/python3.7/site-packages/intake_bluesky/jsonl.py", line 199, in items
    for uid, run_start_doc in catalog._run_starts.items():
RuntimeError: dictionary changed size during iteration
Abort trap: 6

Feel free to move this to the other project if it belongs there - I'm not entirely sure.

problem converting run to xarray

run_uid: fec131f6-74e9-4309-bfdb-ec500f7c1c33


ValueError Traceback (most recent call last)
in
1 for run in latest_runs:
2 print(run)
----> 3 a[run].primary.read()

~/conda_envs/garrett/lib/python3.7/site-packages/intake_xarray/base.py in read(self)
38 def read(self):
39 """Return a version of the xarray with all the data in memory"""
---> 40 self._load_metadata()
41 return self._ds.load()
42

~/conda_envs/garrett/lib/python3.7/site-packages/intake/source/base.py in _load_metadata(self)
115 """load metadata only if needed"""
116 if self._schema is None:
--> 117 self._schema = self._get_schema()
118 self.datashape = self._schema.datashape
119 self.dtype = self._schema.dtype

~/conda_envs/garrett/lib/python3.7/site-packages/intake_xarray/base.py in _get_schema(self)
17
18 if self._ds is None:
---> 19 self._open_dataset()
20
21 metadata = {

~/dama/intake-bluesky/intake_bluesky/core.py in _open_dataset(self)
592 get_datum_cursor=self._get_datum_cursor,
593 include=self.include,
--> 594 exclude=self.exclude)
595
596

~/dama/intake-bluesky/intake_bluesky/core.py in documents_to_xarray(start_doc, stop_doc, descriptor_docs, event_docs, filler, get_resource, get_datum, get_datum_cursor, include, exclude)
179 name='uid')
180
--> 181 datasets.append(xarray.Dataset(data_vars=data_arrays))
182 # Merge Datasets from all Event Descriptors into one representing the
183 # whole stream. (In the future we may simplify to one Event Descriptor

~/conda_envs/garrett/lib/python3.7/site-packages/xarray/core/dataset.py in init(self, data_vars, coords, attrs, compat)
381 coords = {}
382 if data_vars is not None or coords is not None:
--> 383 self._set_init_vars_and_dims(data_vars, coords, compat)
384
385 # TODO(shoyer): expose indexes as a public argument in init

~/conda_envs/garrett/lib/python3.7/site-packages/xarray/core/dataset.py in _set_init_vars_and_dims(self, data_vars, coords, compat)
403
404 variables, coord_names, dims = merge_data_and_coords(
--> 405 data_vars, coords, compat=compat)
406
407 self._variables = variables

~/conda_envs/garrett/lib/python3.7/site-packages/xarray/core/merge.py in merge_data_and_coords(data, coords, compat, join)
375 indexes = dict(extract_indexes(coords))
376 return merge_core(objs, compat, join, explicit_coords=explicit_coords,
--> 377 indexes=indexes)
378
379

~/conda_envs/garrett/lib/python3.7/site-packages/xarray/core/merge.py in merge_core(objs, compat, join, priority_arg, explicit_coords, indexes)
443
444 coerced = coerce_pandas_values(objs)
--> 445 aligned = deep_align(coerced, join=join, copy=False, indexes=indexes)
446 expanded = expand_variable_dicts(aligned)
447

~/conda_envs/garrett/lib/python3.7/site-packages/xarray/core/alignment.py in deep_align(objects, join, copy, indexes, exclude, raise_on_invalid)
215
216 aligned = align(*targets, join=join, copy=copy, indexes=indexes,
--> 217 exclude=exclude)
218
219 for position, key, aligned_obj in zip(positions, keys, aligned):

~/conda_envs/garrett/lib/python3.7/site-packages/xarray/core/alignment.py in align(*objects, **kwargs)
153 'arguments without labels along dimension %r cannot be '
154 'aligned because they have different dimension sizes: %r'
--> 155 % (dim, sizes))
156
157 result = []

ValueError: arguments without labels along dimension 'dim_0' cannot be aligned because they have different dimension sizes: {0, 1, 2, 3, 4, 5, 7, 40, 10, 12, 20}

Reduce effort needed to produce intake-bluesky catalog.

The interface required by an intake-bluesky Catalog is args to pass to RunCatalog. Currently this consists of several callables, which takes some effort to implement. Could it be simplified?

One proposal was to have a second variant (RunCatalogFiles) that accepted collections of documents rather than callables that returned collections of documents, as in some cases that would be simpler and more direct. But in conversation with @ronpandolfi and Dylan McReynolds [GH handle?] today we realized that RunCatalog needs a way to reload its contents for cases where new documents/files are being added. Callables provide a way to reload; collections would not. Thus, this proposal might not be workable.

A second proposal is to reduce the number of callables. We could reduce the API to just one callable get_documents() but I think this would make it complicated to specify slices, which will be important when the number of Event and Datum documents is large. It might be better to reduce it to three callables: get_header_documents() which returns RunStart, RunStop, EventDescriptors, and Resources in one bundle, get_event_cursor and get_datum_cursor.

Access modes

Summary of ways to request data from a Run:

  1. .<stream_name>.read() -> xarray of numpy arrays
  2. .<stream_name>.to_dask() 0> xarray of dask arrays
  3. .read_canonical() -> generator of "filled" documents (no Resource/Datum docs)
  4. .read_canonical_dask() -> generator of data with dask objects as proxies for "filled" data (no Resource/Datum docs)
  5. .read_raw() -> generator of raw ("unfilled") documents---includes Resource/Datum

(1) and (2) are aimed at interactive use. (3) is for piping saved data into streaming-friendly pipelines that also operated on "online" data from the RunEngine. (4) is the lazy version of (3), where the heavy data may be pulled later, incrementally, or never. (5) supports copying a database from one node to another in full detail.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.