Coder Social home page Coder Social logo

phoebus's Introduction

PHOEBUS
=======

Contents
--------
* Introduction
* Computational Model
* Distributed Processing
* Getting it to work
* Pluggable Storage and True Multi-Node Distribution with HDFS and Thrift
* Next Steps
* References
  

Introduction
------------
Phoebus is a system written in Erlang for Distributed processing of very large graphs that span billions of vertices and edges. It is basically an implementation of Google's Pregel[1] paper.
It supports a Distributed model of computation similar to MapReduce[2], but more tuned to Graph processing. 


Computational Model
-------------------
* A Graph is partitioned into a groups of Records.
* A Record consists of a Vertex and its outgoing Edges (An Edge is a Tuple consisting of the edge weight and the target vertex name).
* A User specifies a 'Compute' function that is applied to each Record.
* Computation on the graph happens in a sequence of incremental Super Steps.
* At each Super step, the Compute function is applied to all 'active' vertice of the graph.
* Vertices communicate with each other via Message Passing.
* The Compute function is provided with the Vertex record and all Messages sent to the Vertex in the previous SuperStep.
* A Compute funtion can
  - Mutate the value associated to a vertex
  - Add/Remove outgoing edges.
  - Mutate Edge weight
  - Send a Message to any other vertex in the graph.
  - Change state of the vertex from 'active' to 'hold'.
* At the begining of each SuperStep, if there are no more active vertices -and- if there are no messages to be sent to any vertex, the algorithm terminates.
* A User may additionally specify a 'MaxSteps' to stop the algorithm after a some number of super steps.
* A User may additionally specify a 'Combine' funtion that is applied to the all the Messages targetted at a Vertex before the Compute function is applied to it. 


Distributed Processing
----------------------
* The Computational model allows the algorithm to be parallelly performed by a cluster of phoebus nodes.
* A 'Job' submitted to a Phoebus cluster is managed by a 'Master' process running on the node that receives the Job.
* The Master partitions the input graph and spawns a 'Worker' for each partition on one of the nodes of the cluster.
* The Master then askes the Worker to perform a Super step on its partition and awaits notification from the Worker of Step completion.
* The Step number is incremented untill all Workers report that they have no more 'active' Vertices and no more outstanding messages to be deliverd.


Getting it to work (Tested on Mac OS X Snow Leopard)
------------------
Requirement: 
* rebar (Download from http://hg.basho.com/rebar/downloads/rebar)
* git
* erlang (tested on R14B)

1) Clone github..
  
  $ git clone git://github.com/xslogic/phoebus.git
  $ cd phoebus

2) Compile and create release..

  $ ./generate 
    ==> rel (compile)
    ==> phoebus (compile)
    ....
    ==> rel (generate)
    Usage: phoebus {start|stop|restart|reboot|ping|console|attach}

3) Create a sample output directory

  $ mkdir /tmp/output

4) Start a two node Phoebus cluster... 

  Terminal 1:
  $ ./run_phoebus 1 
  .....
  Erlang R13B04 (erts-5.7.5) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:5] [hipe] [kernel-poll:true]

  Eshell V5.7.5  (abort with ^G)
  (phoebusr1@my-machine)1>



  Terminal 2:
  $ ./run_phoebus 2 
  .....
  Erlang R13B04 (erts-5.7.5) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:5] [hipe] [kernel-poll:true]

  Eshell V5.7.5  (abort with ^G)
  (phoebusr2@my-machine)1>


5) Creating input data set: Currently, Phoebus requires each Input record be line delimited. It must be of the form 
    "<VertexName>\t<VertexValue>\t<EdgeWeight1>\t<TargetVertexName1>\t<EdgeWeight2>\t<TargetVertexName2>...\n"
   The module "algos" that comes with Phoebus has a utility function that can generate Binary Tree as a sample input data set.

  (phoebusr1@my-machine)1> algos:create_binary_tree("/tmp/input", 4, 1000).
  ok

  The create_binary_tree function has created an input data set in the directory "/tmp/input".
  It has created a 1000 node binary tree with root as "1"
  It has split the input into 4 files.

  $ head -5 infile1 
  1 1 1 2 1 3 
  2 2 1 4 1 5 
  3 3 1 6 1 7 
  4 4 1 8 1 9 
  5 5 1 10  1 11  


6) Running a sample algo: The module "algos" has a sample Compute function that calculates shortest path to a Node    

  (phoebusr1@my-machine)1> AFun = fun algos:shortest_path/2.
  #Fun<algos.shortest_path.2> 
  (phoebusr1@my-machine)1> phoebus_master:start_link([{name, "first_ever"}, {max_steps, 100}, {algo_fun, AFun}, {input_dir, "file:///tmp/input/"}, {output_dir, "file:///tmp/output/"}]).
  ok

  Since the input has 4 files, phoebous spawns 4 workers.. 2 on each node...

7) Wait for Algorithm to end.. Once it finishes.. output will be written to "/tmp/output". Listing all Vertices with names starting with "20"... 

  $ cat /tmp/output/* | grep '^20'
  200 100:50:25:12:6:3:1  1 400 1 401 
  204 102:51:25:12:6:3:1  1 408 1 409 
  20  10:5:2:1  1 40  1 41  
  201 100:50:25:12:6:3:1  1 402 1 403 
  203 101:50:25:12:6:3:1  1 406 1 407 

  The Second column (The value of the Vertex) gives the shortest path to the vertex from the root of the binary tree 


Pluggable Storage and True Multi-Node Distribution with HDFS[3] and Thrift[4]
-----------------------------------------------------------------------------
* A true Multi-Node setup requires a Distributed storage layer.
* Phoebus defines an 'external_store' behaviour, an implementation of which is 'external_store_file'.
* 'external_store_file' was used in the above section to read/store vertices from the local file system.
* Phobues can be mode to read/store to HDFS using the 'external_store_hdfs' module.
* The store layer is decided by the URI scheme of the "input_dir" and "output_dir" parameters passed to 'phoebus_master' when submitting a job.

  for eg:
  ....
  (phoebusr1@my-machine)1> phoebus_master:start_link([{name, "first_ever"}, {max_steps, 100}, {algo_fun, AFun}, {input_dir, "hdfs://localhost:9000/tmp/input/"}, {output_dir, "file:///tmp/output/"}]).
  ....  

* HDFS exposes only Java APIs. Thus, 'external_store_hdfs' is implemented as a wrapper over an erlang thrift client that talks to an external thrift server.
* Phoebus has been tested with the HDFS thrift server that is bundled with Cloudera CDH3 Hadoop distribution[5].


Next Steps
----------
* Need to fix Fault tolerence and Error Handling.. If Worker dies, master must ask another worker on another node to take up work
* The Pregel paper talks of an 'Aggregate' Function... need to implement.. 
* Support Jobs written in Python.
* Add support for Disco DFS[6]. 


References
----------
1. (http://portal.acm.org/citation.cfm?id=1582716.1582723) Grzegorz Malewicz, Matthew H. Austern, Aart J. C. Bik, James C. Dehnert, Ilan Horn, Naty Leiser, and Grzegorz Czajkowski, Pregel: A System for Large-Scale Graph Processing
2. MapReduce (http://en.wikipedia.org/wiki/MapReduce)
3. Hadoop Distributed File system (http://hadoop.apache.org/hdfs/)
4. Thrift (http://incubator.apache.org/thrift/)
5. Cloudera's CDH3 Hadoop tarball (http://archive.cloudera.com/cdh/3/hadoop-0.20.2+320.tar.gz).
6. Disco Distributed File System (http://discoproject.org/doc/start/ddfs.html) 

phoebus's People

Contributors

xslogic avatar

Watchers

James Cloos avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.