Disco + EC2 spot instance = WIN
Tue, Oct 30, 2012 Tweet Vote on HNtl;dr version : This is how I spent a Saturday evening.
<blink>Warning: If you like to write Java, stop reading now. Go back to using Hadoop. It's a much more mature project.</blink>
As a part of my job, I do a lot of number processing. Over the course of last few weeks, I shifted to doing most of it using MapReduce using Disco. Its a wonderful approach to processing big data where the time to process data is directly proportional to the amount of hardware you throw at it and the quantity of data. The amount of data to be processed can (in theory) be unlimited. While I don't do anything of Google scale, I deal with Small Big Data. My datasets for an individual job would probably not exceed 1 GB. I can currently afford to continue not use MapReduce, but as my data set grows, I would have to do distributed computing, so better start early.
Getting started with Disco
If you, like me, had given up on MapReduce in the past after trying to deal with administrating Hadoop, now is a great time to look into Disco. Installation is pretty easy. Follow the docs. Within 5 minutes I was writing Jobs in python to process data, would have been faster if I knew before-hand that SSH daemon should be listening on port 22.
Python for user scripts + Erlang for backend == match made in heaven
Enter disposable Disco
I made a set of python scripts to launch and manage Disco clusters on EC2 where there is no need for any data to be stored. In my usecase, the input is read from Amazon S3 and output goes back into S3.
There are some issues with running disco on EC2.
- Must have ssh/keys setup such that Master can ssh into slaves.
- Must have a file with erlang cookie with same contents on all slaves
- Must inform master the hostnames of the slaves. FQDN or anything with a dot gets rejected
- The default root directories have very limited storage space, usually 8GB
disposabledisco takes care of the above things and more. Everything needed to run the cluster is defined in a config file. First generate a sample config file.
python create_config.py > config.json
This creates a new file with some pre-populated values. For my case the config file looks like this(some info masked)
{ "AWS_SECRET": "SNIPPED", "ADDITIONAL_PACKAGES": [ "git", "libwww-perl", "mongodb-clients", "python-numpy", "python-scipy", "libzmq-dev", "s3cmd", "ntp", "libguess1", "python-dnspython", "python-dateutil", "pigz" ], "SLAVE_MULTIPLIER": 1, "PIP_REQUIREMENTS": [ "iso8601", "pygeoip" ], "MASTER_MULTIPLIER": 1, "MGMT_KEY": "ssh-rsa SNIPPED\n", "SECURITY_GROUPS": ["disco"], "BASE_PACKAGES": [ "python-pip", "python-dev", "lighttpd" ], "TAG_KEY": "disposabledisco", "NUM_SLAVES": 30, "KEY_NAME": "SNIPPED", "AWS_ACCESS": "SNIPPED", "INSTANCE_TYPE": "c1.medium", "AMI": "ami-6d3f9704", "MAX_BID": "0.04", "POST_INIT": "echo \"[default]\naccess_key = SNIPPED\nsecret_key = SNIPPED\" > /tmp/s3cfg\ncd /tmp\ns3cmd -c /tmp/s3cfg get s3://SNIPPED/GeoIPASNum.dat.gz\ns3cmd -c /tmp/s3cfg get s3://SNIPPED/GeoIP.dat.gz\ns3cmd -c /tmp/s3cfg get s3://SNIPPED/GeoLiteCity.dat.gz\ns3cmd -c /tmp/s3cfg get s3://SNIPPED/GeoIPRegion.dat.gz\ngunzip *.gz\nchown disco:disco *.dat\n\n" }
This tells disposabledisco that I want a cluster with 1 master and 30 slaces all of type c1.medium, and use ami-6d3f9704 as the starting point. It lists out the packages to be installed via apt-get and python dependencies to be installed using PIP. You can link to external tar, git repo, etc. Basically anything pip allows after pip install
The POST_INIT portion is bash script that runs as root after rest of the install. In my case I am downloading and uncompressing different GeoIP databases archived in a S3 bucket for use from within disco jobs.
Once the config file is ready run the following command many times. The output is fairly verbose.
python create_cluster.py config.json
Why many times? Cause there is no state stored in the system. All state is managed using EC2 tags. This is what the script does on each run
- Check if master is running. If not request a spot instance for it (and kill any zombie slaves lying around from previous runs).
-
If master us up and running.
- Print the ssh command needed to setup port forwarding. After running the given ssh command you can see http://localhost:8090 on the browser to see disco's UI in all its glory.
- print the command to export DISCO_PROXY so you can create jobs locally
- Check inventory of slaves. A slave can have 3 statuses. 1) pending - spot instance requested. 2) running - the instance is running. 3) bootstrapped - slave is completely setup and can be added to master.
- If total number of slaves is less than NUM_SLAVES launch the remaining
- Try and bootstrap any running instances. If bootstrap was successful, change the EC2 tag.
- Finally, update the master's disco config. Telling it hostnames of instances to use and number of workers.
- ???
- Profit
Many steps involve EC2 provisioning spot instances, waiting for instance to get initialized, etc..
To help with shipping output to S3, I made some output classes for Disco
- S3Output - Each key, value returned creates a new file in S3 with the key as S3 key and value as String thats dumped inside it. So, one key should be yielded only once from reduce.
- S3LineOutput Similar to S3Output, but now it stores the output, and joins the output as one big file. has options for sorting, unique, etc.
Both these functions can be configured gzip the contents before uploading.
As far as input is concerned, I send it a list of signed S3 urls. (Sidenote: It seems disco cannot handle https inputs at the moment, so I use http). A sample job run might look like..
def get_urls(): urls = [] for k in bucket.list(prefix="processed"): if k.name.endswith("gz"): urls += [k.generate_url(3660).replace("https", "http")] return urls MyExampleJob().run( input=get_urls(), params={ "AWS_KEY": "SNIP", "AWS_SECRET": "SNIP", "BUCKET_NAME": "SNIP", "gzip": True }, partitions=10, required_files=["s3lineoutput.py"], reduce_output_stream=s3_line_output_stream ).wait()
Bonus - MagicList - Memory efficient way to store and process potentially infinite lists.
We used Disco to compute numbers for a series of blogposts on CDN Planet. For this analysis it was painful process for me to manually launch Disco clusters, which lead me to create the helper scripts.
- Part 1 : Google DNS, OpenDNS and CDN performance
- Part 2 : Which CDNs support edns-client-subnet?
- Part 3 : Real-world CDN performance for Google DNS and OpenDNS users
Shameless plug
Turbobytes, multi-CDN made easyHave your static content delivered by 6 global content delivery networks, not just 1. Turbobytes' platform closely monitors CDN performance and makes sure your content is always delivered by the fastest CDN, automatically.