Coder Social home page Coder Social logo

opentraffic / reporter Goto Github PK

View Code? Open in Web Editor NEW
13.0 13.0 16.0 456 KB

OTv2: distributed service that matches raw GPS probe data to OSMLR segments and sends anonymized speeds to Datastore

License: GNU Lesser General Public License v3.0

Python 37.46% Shell 8.87% Java 53.66%

reporter's People

Contributors

dnesbitt61 avatar drewda avatar gknisely avatar heffergm avatar j05u3 avatar kdiluca avatar kevinkreiser avatar stvno avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

reporter's Issues

collect statistics on match failures

Collect the number of match failures by tile and send to centralized Datastore for aggregation and analysis.

This can help us to identify regions where there may be problems with OSMLR segment coverage, OSM tagging, or the map-matching process itself.

Emission Heuristics to Maximize Ability to Measure Variance in Duration

Currently the reporter if it gets 5 observations for a given segment-next-segment pair it averages those all together and reports that when its time. This means that we lose some of the ability to measure variance unless we get some observations for this pair later on in wall time (but for the same point in gps time). So what we'll want to do is not just average all the measurements together. We'll want to at the point when we go to emit these measurements group them in such a way as to still be able to measure variance but also not skew the averages.

Say you have 5 observations for a given segment-next-segment pair. You have your privacy setting to 2 which means you have enough data to emit these observations in some form. Today we average all of these into one measurement with a count of 5. But to preserve the ability to measure variance we should probably emit 2 measurements, one with a count of 2 and one with a count of 3. We need an heuristic to do that though. Lets say of the 5 observations we have durations: 10, 12, 20, 25, 65

How do we group these observations so that we most accurately represent the data?

Separate Reporter Python files

reporter_service.py - Retains the current http service interface. Parses JSON.

matcher.py - Does the work of accumulating GPS points into traces (Redis), performing map-matching on traces, forming statistics for matched segments (including intersection statistics), managing the history within Redis, and outputting stats to the Datastore/Aggregator.

reporter.consumer.py - Interface with Kafka. Message parsing.

The common matcher.py is shared code between the 2 possible interfaces/applications: a Python http service and a Kafka consumer. Note that the Kafka based Reporter would need to be hosted on premises and run by the TNC / Provider. The http based service could potentially be run within the Open Traffic AWS environment (e.g., a hosted service).

Output from the Reporter will continue to be http so it can communicate with an external Datastore.

Use confidence score to improve "pruning" of GPS trace

Valhalla issue valhalla/valhalla#823 allows for returning a confidence score for short traces. Investigate using this score along with elapsed time after the last full matched segment to decide how much of the GPS trace to prune and when to output the last segment (want to be confident that the next segment is correctly matched).

datastore echo server

while we are in the middle of revamping some of the reporter and its outputs it would make our lives easier to not have to interface with the real datastore (since thats also being rewritten). so in the CI tests we should run an echo server instead of the datastore container. this is very easy. you can get an echo server with a bit of bash:

sudo apt-add-repository -y ppa:valhalla-core/valhalla
sudo apt-get update
sudo apt-get install prime-server0.6.3-bin
./prime_echod tcp://*:8003 1

Update Valhalla Traffic Segment Matcher to Identify Invalid Segments

Currently traffic segment matcher does not report on any "invalid" matches: e.g. edges without OSMLR segments. These will now be returned along with a flag indicating if the edges are "internal" (e.g. all internal intersection edges, turn channels, or roundabouts) edges that represent a valid path between OSMLR segments such that a "nextseg" should be reported.

Reporting Interface

After we have returned some statistics about a given segment, we will need to marshall that information into the place t thawe are storing/working on global traffic statistics (Reporting stuff to mission control (the newsroom)). To really complete this, we will need to know what mission control is and how it expects to receive data.

Investigate reporting queue lengths

Approach is to walk forward along GPS points and identify when speed falls below a threshold. The distance from this point to the next intersection becomes the queue length. Several questions arise:

  • What is the required GPS reporting frequency (time interval between locations)?
  • What if no OSMLR segment is reported at the intersection where the queue begins? May end up reporting a queue length to the end of the OSMLR segment, rather than the actual intersection.
  • Assumption: cannot walk backwards from an intersection and detect when speed exceeds threshold (since once the queue moves the speeds increase).
  • A taxi dropping off or picking up a passenger trigger the queue length report - does this matter or is there any way to avoid it?

Identify Threshold Logic for Reporting Segment + Next Segment

Need to determine when it is proper to output a traffic report. Since the report needs to include the next segment (if any) for intersection transition reporting, we need to make sure the match probability is such that the next segment is truly on the matched path. Might involve some time and distance thresholds that need to be exceeded so that we are fairly sure the matched path will not change based on future GPS locations.

Set default map-matching config to optimal values found in Reporter Quality Testing Rig

Based on @mxndrwgrdnr's analysis optimizing map-matching parameters let's set reasonable default values for the primary map-matching parameters that are baked into the default Docker images:

  • sigmz_z
  • beta
  • max_route_time_factor

Any other default parameters worth tweaking?

(Ideally as part of deploying Reporter instances, a data provider will make use of the Reporter Quality Testing Rig to come up with the ideal set of parameters to use given their GPS update frequency, typical GPS accuracy, and local road network density -- but if they don't go through the entire optimization exercise, we should at least give them decent defaults to begin with.)

requests to reporter cause segfault

Interface with libvalhalla Python Bindings

We will need to make sure docker container can apt-get install libvalhalla so that python can import its module and make calls into it. We will need to decide what the calls look like for the interface to the bindings. It would be best if we kept as much of the hard work as possible within the bindings and not within the webserver python.

Kafka configurable message parsing

right now the flink job expects one specific format for parsing messages from kafka or from file, it looks like this:

date_string|id|X|X|X|accuracy|X|X|X|lat|lon|...

we can allow passing the flink job via an argument that tells it the format of the messages. the first to support will be sv separated value. for that we can say what is the separator and then what columns have the 5 important pieces of info: date, id, accuracy, lat, lon

there is the other important aspect of the date format but maybe we can defer this for the beginning.

after getting our feet wet with this step we should be able to move on to supporting things like json messages. to do this might not be that hard? flink uses generics to tell it what the messages are and we have a custom one. presumably we could make a few custom ones and throw that in the template.

allow ingest from a Kafka stream

Reporter currently can ingest from HTTP requests or from CSV files (which are parsed and send in as HTTP requests).

Many data providers use Kafka to stream location data around their own internal infrastructure. We'll add support for Reporter to act as a consumer to such Kafka streams.

Note that data providers may have different serialization formats for the contents of their Kafka messages. And they'll likely have different key names for lat, lon, accuracy, driver/vehicle ID, vehicle mode, etc. So ideally it's straightforward for each provider to write their own mapping function to parse messages into the appropriate values for use within Reporter.

Requires completion of #36

Considerations for horizontal scaling of Kafka Reporters

Even if data is time-ordered there may be locking and contention issues with closely spaced GPS position data. It may be desirable to use Flink in between the Kafka records and the map-matching process within Reporter. Flink could be used to "buffer" single locations into traces.

on premise configuration first steps

we need a couple of things wrt to having other TNCs run this software:

  • a means of authentication with the downstream datastore
  • a means of identifying this TNC (currently hardcoded)

For both we can use environment variables as we do everything else. The former will be a query parameter passed to the datastore. Not secure at all but more secure than nothing. The latter will simply be a string value taken from environment variable rather than being hardcoded

Downstream we'll need to make the datastore validate this authentication query parameter (equality check for now).

Accumulate Privatized Segment Pairs by Tile

As we are getting segment pairs through the stream we'll need another processor to accumulate these into tiles, this will make the datastore's job easier when it needs to merge them together from different reporter instances

be robust to single point input

currently the reporter assumes you are going to give it more than one point at a time. at the same time it also, if some of the end of your trace wasnt used, will store this information in redis to use with your next request. we can extend this logic a bit to allow for requests down to the single point per trace level. to do so we just need to add a bit of logic already found in the GRAB processing script:

  • append input to partial from redis as normal (no extra implementation needed)
  • if there arent enough points in the trace or there isnt enough time passed
    • then put the appended trace back into redis and return
  • else do matching as normal and store partial as normal

decent values for the if condition above can be taking from the prior processing script. there should be one note here that we do expire stuff in redis after 5 minutes or so. this means that if you say give me 3 points and i just keep those in redis and dont match them. then you wait 5 minutes before giving me the rest of the 50 points needed to do matching then we'll lose those 3 points. this basically assures that clients are being timely when sending us stuff

CircleCI build Kafka Stream Java program

right now we have flink Kafka Stream code in Java but it doesnt build in circleci and before we can start using it for windowing in the reporter we need it to do so. we'll want to see if we can use build cache because the flink api build takes forever. building our flink job jar is fast.

note that the readme shows how to get and build flink Kafka, we just need to get it into the circle script

bulk processing

it would be nice to have some bulk processing to generate large numbers of requests from a large corpus of historical (probably) data. the scripts should take a bucket in s3, get the data pipe it to something that makes it into requests to pipe on to xargs and curl.

cannot connect to container based on latest dockerhub build of reporter

Opening an issue per our conversation today @kevinkreiser.

All of my HTTP requests to a running instance of the newest Reporter image hung, not failing or timing out but never completing.

Similarly, Kevin was unable to docker exec -it into a Reporter instance based on the same latest image.

Reverting back to older versions of the container fix the problem, so something must be going on with the latest build, aka opentraffic/reporter:4c206e126c040fa98039611d91ccccd0ee618e09 or opentraffic/reporter:latest. Images starting at opentraffic/reporter: c813127d3d41bf5a11b5a2f8fc38653f8454f60a and earlier all seem to work fine.

Create Webserver

There are lots of easy to use options for a python webserver. Twisted is a good one. I wouldn't use pylons. The built-in one (simplehttpserver or similar) would be good enough to get this up and running. At any rate, we will need to write a chunk of code using one and get a command into a docker script to start it.

Interface with Redis

We will need some bits of python to inter-operate with the Redis cache. There isn't much to do here but make sure that we install the Redis module via pip and learn how to use its api.

Tests

  • Create an IAM user for use by circleci, provide permissions to access specific S3 bucket
  • Create an S3 bucket in the OT org which will house canned data used to test the docker image against. This will include raw Grab data, so the bucket should remain private (hence the above requirement for user/credentials)
  • Generate test data to place in said bucket. This should include the manila tile data and some raw grab data
  • update circle.yml to use the new test data: dependencies should be updated to include aws-cli which can be used to download the data
  • update tests to include a run of the grab data through the reporter

Consider debug/test flag in Python report service

Add a "debug/test" flag to the Reporter Python so that the returned data reflects what is reported to Datastore (and this flag disables sending to the Datastore). This would be a list of the following:

OSMLR segment Id
segment length
next OSMLR segment Id (can be empty or 0)
start time
end time (time at the start of the next OSMLR segment or time at the end of this segment if no next)
queue length (TBD) - Length from the end of the OSMLR segment where the queue begins.

vehicle type and producer Id will be in the return as well (common over the entire list)

The default returned from the Python report method needs to be information that the Kafka/Java code uses to "trim" portions of the trace data. It will have index (trace index) information that allows association of the segment with the input trace. This information is not needed for the Datastore.

It could be that the "test/debug" mode returns both sets of information.

Ephemeral Cache Storage

We need to figure out what exactly is going to go into the Redis cache, how we update it, and what we need to get back out of it.

Exit if failed to load tile data

Presently, if the reporter fails to load tile data, it continues to run. To facilitate automated deployments, and due to the way the docker container agent works in ECS along with opsworks and the need to download planet data to the host, the process should (optionally if preferred) exit if it failes to load any tile at startup.

push histograms to file or s3

the branch of code for writing output currently supports posting and writing to file. neither of these work directly for aws s3 which is our target. we need to add a bit of code to set the proper headers and do a PUT to s3. add that little bit of functionality.

Define Kafka Message Format

As a precursor to #35 - define a message format. Check what potential providers currently have and identify required and optional parameters for input.

Allow single GPS point input (lat, lon, vehicle Id, timestamp are required parameters. Determine if other parameters (e.g., transport mode) are required. Possible optional parameters could include speed (from GPS sensor, GPS accuracy, ...).

Potentially allow for multiple GPS point inputs (in cases where a provider might batch locations from a single vehicle). Probably need to specify a maximum so that message size is constrained.

flink in docker compose

we need to add a flink container from dockerhub into our compose file. flink maintainers keep one so it should be very easy

POST/GET Request Parser

We will need something to parse the POST/GET requests. For starters, we can assume its csv because that is fast and easy. In the long run, we may want to make this functionality modular with a well-defined interface so that other on-premise people can implement their own in case they have some exotic format.

Changes to Support Intersection Transition Times

Proposal:

Change from sending prior OSMLR segment to sending next OSMLR segment. When crossing an internal intersection edge, roundabout, or turn channel (which do not have OSMLR segments at this time) the Reporter will not output until either the matched path enters an OSMLR segment or enters a road without OSMLR segment (that is not internal intersection, roundabout, or turn channel). null is sent as the next OSMLR segment if the matched path enters a "local" road.

Set the end time as the time the next OSMLR segment is entered. If there is no next OSMLR segment then the end time is the time at the end of the current OSMLR segment.

Send length as the length of the matched segment plus the length along any internal intersection edges, roundabout edges, or turn channels used to get to the next OSMLR segment.

Request for canned or other suitable endpoint to service ELB healthchecks

Grant Heffernan [11:57 AM]
in semi related new, we’re going to want some sort of either canned health check endpoint for these, or I just need something else that will work when things are healthy and break when they aren’t. Want me to just put it in an issue?

Kevin Kreiser [11:58 AM]
issue is good. i think it has to be canned and we can do some special code to handle it

[11:58]
to make sure the system is running etc

[11:58]
without actually making data

Grant Heffernan [11:58 AM]
k

Kevin Kreiser [11:58 AM]
i mean actually

[11:58]
a trace of a single point could be enough

[11:59]
it will never get a non partial match

[11:59]
and it will check redis and all o fthat stuff

Simulation and Testing Framework

As a basis for testing the reporter it would be good to measure the impacts of changes we are making as we make them. Currently our approach to this is spread through libvalhalla as well as reporter ci. @kpwebb suggested a much more practical and straightforward way for us to get a quantitative measure of this though.

The basis of the idea is as follows:

  • plot a route: have a routing engine plot a route returning per edge attribution (what traffic segments and what shape)
  • simulate a trace: use the shape to simulate a sample trace by adding noise to the data. unevenly sample in space along the shape, offset from the shapes geometry, include fluctuations in horizontal accuracy and assign timestamps that will allow for validating speed
  • match: submit the simulated trace to the traffic matcher to get back the result
  • validate: check that what we did to the simulated trace affected the output in the desired way and report some statistics. we should see speeds that make sense within a threshold based on the times we assigned and we should see the same sequences of osmlr segments and non segments as we did on the original path

To accomplish the following, one would most likely add to the libvalhalla python bindings the ability to get a path and the attributes noted above (step 1). Then the reporter could be modified to do the remaining steps for some number of iterations.

Batch Tiles to Datastore

we need a way to move tiles out of a given kv store and into a place where the datastore is expecting them. we'll do this with a configuration parameter. i think we need to support putting them to disk as well as posting them to s3 (with possibly an authenitcation handshake).

we also need to define the file naming conventions and what the contents of the file will be formatted to.

for the file name lets do:

epoch_seconds_start-epoch_seconds_end/tile_id/some_reporter_instance_uuid

for the stuff in the file we can do human readable:

segment_id,next_segment_id,avg_duration,length,avg_queue_length,min_timestamp,max_timestamp

if we find this data is too large in this format we'll entertain other formats

Need to reject invalid times

Some conditions lead to invalid times (-1) while the length can be valid. This is rare and ultimately the fix should be in Valhalla traffic_segment_matcher, but double check here and do not forward segments with invalid times to Datastore.

Bug: Timestamps with -1

sometimes the reports coming back from the python reporter have either t0 or t1 with a -1 value. This causes the kafka java reporter code to make a time range for a given segment that is huge and crosses many time bins. That results in an enormous number of tiles which take eons to purge.

Python Kafka Producer for test files

Write a Python based Kafka producer to inject archived historical data into the Kafka-based Reporter. This will allow early testing, and may have uses later for importing other historical datasets.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.