opentraffic / reporter Goto Github PK
View Code? Open in Web Editor NEWOTv2: distributed service that matches raw GPS probe data to OSMLR segments and sends anonymized speeds to Datastore
License: GNU Lesser General Public License v3.0
OTv2: distributed service that matches raw GPS probe data to OSMLR segments and sends anonymized speeds to Datastore
License: GNU Lesser General Public License v3.0
Collect the number of match failures by tile and send to centralized Datastore for aggregation and analysis.
This can help us to identify regions where there may be problems with OSMLR segment coverage, OSM tagging, or the map-matching process itself.
the reporter will get a another stream processor which will do the accumulation of segment pairs to conserve privacy. the number of observations to accumulate before emission will be based on a configuration parameter
Currently the reporter if it gets 5 observations for a given segment-next-segment pair it averages those all together and reports that when its time. This means that we lose some of the ability to measure variance unless we get some observations for this pair later on in wall time (but for the same point in gps time). So what we'll want to do is not just average all the measurements together. We'll want to at the point when we go to emit these measurements group them in such a way as to still be able to measure variance but also not skew the averages.
Say you have 5 observations for a given segment-next-segment pair. You have your privacy setting to 2 which means you have enough data to emit these observations in some form. Today we average all of these into one measurement with a count of 5. But to preserve the ability to measure variance we should probably emit 2 measurements, one with a count of 2 and one with a count of 3. We need an heuristic to do that though. Lets say of the 5 observations we have durations: 10, 12, 20, 25, 65
How do we group these observations so that we most accurately represent the data?
reporter_service.py - Retains the current http service interface. Parses JSON.
matcher.py - Does the work of accumulating GPS points into traces (Redis), performing map-matching on traces, forming statistics for matched segments (including intersection statistics), managing the history within Redis, and outputting stats to the Datastore/Aggregator.
reporter.consumer.py - Interface with Kafka. Message parsing.
The common matcher.py is shared code between the 2 possible interfaces/applications: a Python http service and a Kafka consumer. Note that the Kafka based Reporter would need to be hosted on premises and run by the TNC / Provider. The http based service could potentially be run within the Open Traffic AWS environment (e.g., a hosted service).
Output from the Reporter will continue to be http so it can communicate with an external Datastore.
Valhalla issue valhalla/valhalla#823 allows for returning a confidence score for short traces. Investigate using this score along with elapsed time after the last full matched segment to decide how much of the GPS trace to prune and when to output the last segment (want to be confident that the next segment is correctly matched).
Need special logic since the current segment and next segment are "partial" (length = -1).
while we are in the middle of revamping some of the reporter and its outputs it would make our lives easier to not have to interface with the real datastore (since thats also being rewritten). so in the CI tests we should run an echo server instead of the datastore container. this is very easy. you can get an echo server with a bit of bash:
sudo apt-add-repository -y ppa:valhalla-core/valhalla
sudo apt-get update
sudo apt-get install prime-server0.6.3-bin
./prime_echod tcp://*:8003 1
Currently traffic segment matcher does not report on any "invalid" matches: e.g. edges without OSMLR segments. These will now be returned along with a flag indicating if the edges are "internal" (e.g. all internal intersection edges, turn channels, or roundabouts) edges that represent a valid path between OSMLR segments such that a "nextseg" should be reported.
After we have returned some statistics about a given segment, we will need to marshall that information into the place t thawe are storing/working on global traffic statistics (Reporting stuff to mission control (the newsroom)). To really complete this, we will need to know what mission control is and how it expects to receive data.
Approach is to walk forward along GPS points and identify when speed falls below a threshold. The distance from this point to the next intersection becomes the queue length. Several questions arise:
Need to determine when it is proper to output a traffic report. Since the report needs to include the next segment (if any) for intersection transition reporting, we need to make sure the match probability is such that the next segment is truly on the matched path. Might involve some time and distance thresholds that need to be exceeded so that we are fairly sure the matched path will not change based on future GPS locations.
In the future, let's consider versioning releases in:
Based on @mxndrwgrdnr's analysis optimizing map-matching parameters let's set reasonable default values for the primary map-matching parameters that are baked into the default Docker images:
Any other default parameters worth tweaking?
(Ideally as part of deploying Reporter instances, a data provider will make use of the Reporter Quality Testing Rig to come up with the ideal set of parameters to use given their GPS update frequency, typical GPS accuracy, and local road network density -- but if they don't go through the entire optimization exercise, we should at least give them decent defaults to begin with.)
I'm getting a bunch of segfaults with the latest reporter version. Here is one request that consistently causes the reporter to crash: ```
'http://reporter:8003/report?json=%7B%22match_options%22%3A%7B%22turn_penalty_factor%22%3A500%2C%22sigma_z%22%3A4.07%2C%22breakage_distance%22%3A2000%2C%22beta%22%3A3%2C%22gps_accuracy%22%3A1.64%2C%22mode%22%3A%22auto%22%7D%2C%22trace%22%3A%5B%7B%22lat%22%3A33.664699%2C%22lon%22%3A-117.745583%2C%22time%22%3A1504213755%7D%2C%7B%22lat%22%3A33.664804%2C%22lon%22%3A-117.745945%2C%22time%22%3A1504213775%7D%2C%7B%22lat%22%3A33.663643%2C%22lon%22%3A-117.744612%2C%22time%22%3A1504213795%7D%2C%7B%22lat%22%3A33.661609%2C%22lon%22%3A-117.743775%2C%22time%22%3A1504213815%7D%2C%7B%22lat%22%3A33.659555%2C%22lon%22%3A-117.742884%2C%22time%22%3A1504213835%7D%2C%7B%22lat%22%3A33.658358%2C%22lon%22%3A-117.740829%2C%22time%22%3A1504213855%7D%2C%7B%22lat%22%3A33.656478%2C%22lon%22%3A-117.739594%2C%22time%22%3A1504213875%7D%2C%7B%22lat%22%3A33.654319%2C%22lon%22%3A-117.739838%2C%22time%22%3A1504213895%7D%2C%7B%22lat%22%3A33.652383%2C%22lon%22%3A-117.738692%2C%22time%22%3A1504213915%7D%2C%7B%22lat%22%3A33.650433%2C%22lon%22%3A-117.737305%2C%22time%22%3A1504213935%7D%2C%7B%22lat%22%3A33.649665%2C%22lon%22%3A-117.736886%2C%22time%22%3A1504213944%7D%5D%2C%22shape_match%22%3A%22map_snap%22%2C%22uuid%22%3A%22999999%22%7D'
We will need to make sure docker container can apt-get install libvalhalla so that python can import its module and make calls into it. We will need to decide what the calls look like for the interface to the bindings. It would be best if we kept as much of the hard work as possible within the bindings and not within the webserver python.
right now the flink job expects one specific format for parsing messages from kafka or from file, it looks like this:
date_string|id|X|X|X|accuracy|X|X|X|lat|lon|...
we can allow passing the flink job via an argument that tells it the format of the messages. the first to support will be sv
separated value. for that we can say what is the separator and then what columns have the 5 important pieces of info: date, id, accuracy, lat, lon
there is the other important aspect of the date format but maybe we can defer this for the beginning.
after getting our feet wet with this step we should be able to move on to supporting things like json messages. to do this might not be that hard? flink uses generics to tell it what the messages are and we have a custom one. presumably we could make a few custom ones and throw that in the template.
Reporter currently can ingest from HTTP requests or from CSV files (which are parsed and send in as HTTP requests).
Many data providers use Kafka to stream location data around their own internal infrastructure. We'll add support for Reporter to act as a consumer to such Kafka streams.
Note that data providers may have different serialization formats for the contents of their Kafka messages. And they'll likely have different key names for lat, lon, accuracy, driver/vehicle ID, vehicle mode, etc. So ideally it's straightforward for each provider to write their own mapping function to parse messages into the appropriate values for use within Reporter.
Requires completion of #36
Even if data is time-ordered there may be locking and contention issues with closely spaced GPS position data. It may be desirable to use Flink in between the Kafka records and the map-matching process within Reporter. Flink could be used to "buffer" single locations into traces.
we need a couple of things wrt to having other TNCs run this software:
For both we can use environment variables as we do everything else. The former will be a query parameter passed to the datastore. Not secure at all but more secure than nothing. The latter will simply be a string value taken from environment variable rather than being hardcoded
Downstream we'll need to make the datastore validate this authentication query parameter (equality check for now).
As we are getting segment pairs through the stream we'll need another processor to accumulate these into tiles, this will make the datastore's job easier when it needs to merge them together from different reporter instances
currently the reporter assumes you are going to give it more than one point at a time. at the same time it also, if some of the end of your trace wasnt used, will store this information in redis to use with your next request. we can extend this logic a bit to allow for requests down to the single point per trace level. to do so we just need to add a bit of logic already found in the GRAB processing script:
decent values for the if condition above can be taking from the prior processing script. there should be one note here that we do expire stuff in redis after 5 minutes or so. this means that if you say give me 3 points and i just keep those in redis and dont match them. then you wait 5 minutes before giving me the rest of the 50 points needed to do matching then we'll lose those 3 points. this basically assures that clients are being timely when sending us stuff
right now we have flink Kafka Stream code in Java but it doesnt build in circleci and before we can start using it for windowing in the reporter we need it to do so. we'll want to see if we can use build cache because the flink api build takes forever. building our flink job jar is fast.
note that the readme shows how to get and build flink Kafka, we just need to get it into the circle script
it would be nice to have some bulk processing to generate large numbers of requests from a large corpus of historical (probably) data. the scripts should take a bucket in s3, get the data pipe it to something that makes it into requests to pipe on to xargs and curl.
Opening an issue per our conversation today @kevinkreiser.
All of my HTTP requests to a running instance of the newest Reporter image hung, not failing or timing out but never completing.
Similarly, Kevin was unable to docker exec -it
into a Reporter instance based on the same latest image.
Reverting back to older versions of the container fix the problem, so something must be going on with the latest build, aka opentraffic/reporter:4c206e126c040fa98039611d91ccccd0ee618e09
or opentraffic/reporter:latest
. Images starting at opentraffic/reporter: c813127d3d41bf5a11b5a2f8fc38653f8454f60a
and earlier all seem to work fine.
There are lots of easy to use options for a python webserver. Twisted is a good one. I wouldn't use pylons. The built-in one (simplehttpserver or similar) would be good enough to get this up and running. At any rate, we will need to write a chunk of code using one and get a command into a docker script to start it.
We will need some bits of python to inter-operate with the Redis cache. There isn't much to do here but make sure that we install the Redis module via pip and learn how to use its api.
Add a "debug/test" flag to the Reporter Python so that the returned data reflects what is reported to Datastore (and this flag disables sending to the Datastore). This would be a list of the following:
OSMLR segment Id
segment length
next OSMLR segment Id (can be empty or 0)
start time
end time (time at the start of the next OSMLR segment or time at the end of this segment if no next)
queue length (TBD) - Length from the end of the OSMLR segment where the queue begins.
vehicle type and producer Id will be in the return as well (common over the entire list)
The default returned from the Python report method needs to be information that the Kafka/Java code uses to "trim" portions of the trace data. It will have index (trace index) information that allows association of the segment with the input trace. This information is not needed for the Datastore.
It could be that the "test/debug" mode returns both sets of information.
We need to figure out what exactly is going to go into the Redis cache, how we update it, and what we need to get back out of it.
Presently, if the reporter fails to load tile data, it continues to run. To facilitate automated deployments, and due to the way the docker container agent works in ECS along with opsworks and the need to download planet data to the host, the process should (optionally if preferred) exit if it failes to load any tile at startup.
the branch of code for writing output currently supports posting and writing to file. neither of these work directly for aws s3 which is our target. we need to add a bit of code to set the proper headers and do a PUT to s3. add that little bit of functionality.
Logging output should be structured and error conditions documented. In production, logs will be streamed to Cloudwatch Logs, where filters will be built to pick up the structured data and turn it into Cloudwatch metrics which can then be used to trigger alarm conditions.
http://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
As a precursor to #35 - define a message format. Check what potential providers currently have and identify required and optional parameters for input.
Allow single GPS point input (lat, lon, vehicle Id, timestamp are required parameters. Determine if other parameters (e.g., transport mode) are required. Possible optional parameters could include speed (from GPS sensor, GPS accuracy, ...).
Potentially allow for multiple GPS point inputs (in cases where a provider might batch locations from a single vehicle). Probably need to specify a maximum so that message size is constrained.
we need to add a flink container from dockerhub into our compose file. flink maintainers keep one so it should be very easy
We will need something to parse the POST/GET requests. For starters, we can assume its csv because that is fast and easy. In the long run, we may want to make this functionality modular with a well-defined interface so that other on-premise people can implement their own in case they have some exotic format.
update the readme to reflect the various array of arguments to the kafka reporter itself
Proposal:
Change from sending prior OSMLR segment to sending next OSMLR segment. When crossing an internal intersection edge, roundabout, or turn channel (which do not have OSMLR segments at this time) the Reporter will not output until either the matched path enters an OSMLR segment or enters a road without OSMLR segment (that is not internal intersection, roundabout, or turn channel). null is sent as the next OSMLR segment if the matched path enters a "local" road.
Set the end time as the time the next OSMLR segment is entered. If there is no next OSMLR segment then the end time is the time at the end of the current OSMLR segment.
Send length as the length of the matched segment plus the length along any internal intersection edges, roundabout edges, or turn channels used to get to the next OSMLR segment.
we need to update the circle tests to start the flink job manager and submit our flink job. then it can: #61
Grant Heffernan [11:57 AM]
in semi related new, we’re going to want some sort of either canned health check endpoint for these, or I just need something else that will work when things are healthy and break when they aren’t. Want me to just put it in an issue?
Kevin Kreiser [11:58 AM]
issue is good. i think it has to be canned and we can do some special code to handle it
[11:58]
to make sure the system is running etc
[11:58]
without actually making data
Grant Heffernan [11:58 AM]
k
Kevin Kreiser [11:58 AM]
i mean actually
[11:58]
a trace of a single point could be enough
[11:59]
it will never get a non partial match
[11:59]
and it will check redis and all o fthat stuff
As a basis for testing the reporter it would be good to measure the impacts of changes we are making as we make them. Currently our approach to this is spread through libvalhalla as well as reporter ci. @kpwebb suggested a much more practical and straightforward way for us to get a quantitative measure of this though.
The basis of the idea is as follows:
To accomplish the following, one would most likely add to the libvalhalla python bindings the ability to get a path and the attributes noted above (step 1). Then the reporter could be modified to do the remaining steps for some number of iterations.
we need a way to move tiles out of a given kv store and into a place where the datastore is expecting them. we'll do this with a configuration parameter. i think we need to support putting them to disk as well as posting them to s3 (with possibly an authenitcation handshake).
we also need to define the file naming conventions and what the contents of the file will be formatted to.
for the file name lets do:
epoch_seconds_start-epoch_seconds_end/tile_id/some_reporter_instance_uuid
for the stuff in the file we can do human readable:
segment_id,next_segment_id,avg_duration,length,avg_queue_length,min_timestamp,max_timestamp
if we find this data is too large in this format we'll entertain other formats
Some conditions lead to invalid times (-1) while the length can be valid. This is rare and ultimately the fix should be in Valhalla traffic_segment_matcher, but double check here and do not forward segments with invalid times to Datastore.
sometimes the reports coming back from the python reporter have either t0
or t1
with a -1
value. This causes the kafka java reporter code to make a time range for a given segment that is huge and crosses many time bins. That results in an enormous number of tiles which take eons to purge.
In the production environment, it's preferable to mount the volume that houses tile data read only so that N+1 containers reading from it cannot modify shared data. At present if the volume is presented read only to the process, it fails to load it.
coming under valhalla/valhalla#620
Write a Python based Kafka producer to inject archived historical data into the Kafka-based Reporter. This will allow early testing, and may have uses later for importing other historical datasets.
we need to update the circle test to use the kafka producer script to send data into the flink job Kafka Stream program
we need to add a kafka container from dockerhub to our compose file
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.