Peer-to-peer distributed computing on an ad-hoc Matlab cluster

In a recent meeting with the SPM developers, we discussed parallel computing using the Matlab distributed computing toolbox, Star-P, Sun Grid Engine, and other batch systems that can be linked to Matlab. These are all limited in their usefulness for the typical neuroimaging research setting in that they are based on a centralized job distribution system. That may work fine on a large cluster with a centralized configuration and system administration, but even then the usefullness is limited because all input and output data (which are typically large) have to be send over the network twice: first to the job manager, then to the compute node (and vice versa for the results)

Another idea that came up was to use the computers of your colleagues for the computations. This documentation describes a suggestion for peer-to-peer distributed computing using matlab without the need for a centralized server/manager.

Getting started

You can run it on multiple computers or on a single computer with multiple CPUs or multiple cores. Note that starting multiple Matlab sessions on a single computer only requires one Matlab license. Let's assume that you have N computers.

On the first computer, you start matlab and type

peermaster

On all N-1 subsequent computers (or the same computer if has multiple CPUs or cores), you start matlab and type

peerslave

You will see the peerslave printing the date and time every second. Each of the slaves is now waiting for a job to be executed.

Then you go back to the first “master” matlab session and type

peercellfun('power', {1, 2, 3}, {2, 2, 2})

which should return

[1, 4, 9]

What happened is that peercellfun distributed the execution of these three jobs over all slaves

power(1, 2)
power(2, 2)
power(3, 2)

Most applications of peercellfun will return non-scalar values, which cannot be appended into a single vector. In that case, consistent with cellfun, it will return an error unless you specify that the output will be non-uniform. E.g.

peercellfun('rand', {1, 2, 3}, 'UniformOutput', false)

which returns

{[1x1]  [2x2]  [3x3]}

In these small computations the overhead of the communication between the peers takes more time than the actual computations, so parallelization will not result in a speed increase. To get a speed increase, the ratio between the computational effort and the data size should be more balanced towards the first. The following example demonstrates a computationally heavy job which can benefit from parallelization

a = randn(400,400);
tic;     cellfun('pinv', {a, a, a, a, a}, 'UniformOutput', false); toc 
tic; peercellfun('pinv', {a, a, a, a, a}, 'UniformOutput', false); toc

You probably have to play with the size of the matrix “a” and with the number of jobs to see the largest effect on the timing of the non-parallel (cellfun) and parallel (peercellfun) version of the computations.

How it works

The idea is loosely based on the FieldTrip buffer, which is a multithreaded TCP server implemented as a mex file. The TCP server runs in a seperate thread attached to Matlab, but not blocked by Matlab command execution. Let's refer to this as the “peer server”. The peer server has the following responsibilities:

  1. announce itself on the network
  2. discover the other peers
  3. send data to another peers
  4. receive data from other peers

Sending and receiving data implies sending the input arguments of a function that is remotely evaluated and receiving the output arguments of that function call.

On top of this peer server a number of regular Matlab functions are implemented in a m-file. These regular functions encapsulate the low-level server details. On the master computer the commands could look like

jobid  = peerfeval(functionname, argin);
result = peerget(jobid);

and the command on a slave server would look like

peerslave('maxtime', 3600);

which would keep running as a slave for one hour (3600 seconds) and evaluate the jobs that are sent.

Implemented functionality

The peer-to-peer parallel toolbox for Matlab consists of a compiled mex file that implements the low-level functionality, plus a collection of end-user functions. The most important end-user functions are descibed here.

Peerfeval(...)

The Matlab session that executes the peereval command searches for a peer that acts as slave, and sends the job (the function name and the input arguments) to the available slave peer. The slave peer evaluates the function on the input arguments and subsequently writes the output arguments back to the peer server of the host Matlab session, i.e. the session that initiated the job.

Multiple peerevals can be executed without explicitely waiting for the results to return, hence the peer server (running on the master) should be able to receive and hold multiple “argouts”.

The jobid should include information about the peer to which the job was assigned. Furthermore, information about the begin and end time would be usefull to estimate the time it takes to execute similar jobs. Its interface is identical to the standard Matlab feval() command.

Peerget(...)

This gets the input arguments (function name and input arguments) from the local server, or the output arguments that have been returned to local peer server.

When getting a job to be executed: the job should include the function name, the input arguments and the id of the host to which the results have to be returned.

When getting the result of a job: if the job has not finished yet, it should indicate that it is still in progress.

Peercellfun(...)

This executes a function on all the elements of a cell-array. Each cell is executed on another peer, and the execution is in parallel. Once all cells have been executed, the results are gathered and returned to the user as cell array. For example

peercellfun('plus', {1, 2, 3}, {1, 2, 3})

would return

{2, 4, 6}

and

peercellfun('rand', {1, 2, 3}, 'UniformOutput', false)

would return

{[1x1]  [2x2]  [3x3]}

Its interface is identical to the standard Matlab cellfun() command.

Peermaster(...)

This ensures that all threads are running and that the peer acts as master. The server will not accept jobs to be executed, but does accept the output arguments of jobs that have been executed on other peers.

Peerzombie(...)

This ensures that all threads are running and sets the peer in zombie mode. As a zombie, the peer will not allow any job requests or job results to be written to it. It still announces itself to the other peers in the network and you can think of this as t he default/unspecified mode.

Peerslave(...)

In slave mode a peer accepts the input arguments of a single job. It constantly checks for the availability of a job (on it's own server), and if a job is available it is executed.

The code inside peerslave looks like this

while (true)
  job = peerget('job');
  if ~isempty(job)
    argout = feval(job.functionname, job.argin{:});
    peerput(job.hostid, argout);
  else
    sleep(0.1);
  end
end

Announce & Discover

All peers are able to locate each other automatically by two threads that are running in the background of the peer server. It is an ad-hoc network with auto-discovery, so is not necessary to manage a list with all the nodes that participate in the peer-to-peer network.

The announce thread multicasts a message with some host information (address, port, status) over the network. The announce packet is sent once every second.

The discover thread listens to the network. Each time an announce packet is detected, it is added to the list of known hosts. Besides adding the hostname, it attached (or updates) a timestamp at which the host was seen.

A third expire thread is running which removes old peers from the list. If a previously announced peer is not seen for a few seconds, it expires and is removed from the list of known hosts. This ensures that a peer which is shut down will be removed.

This list of known hosts is used to determine which peers are available for receiving a computational job.

Communicating the input data for a job and returning the results

The tcpserver thread is constantly listening for incoming TCP connections.

If the peer is running in idle slave mode, it accepts a incoming connection that can write the input data for a job. If the peer is running in busy slave mode (i.e. busy executing a job), no connections are allowed. After finishing the computation, the slave writes the results of the job back to the tcpserver thread that the master is running.

The tcpserver thread of the master allows for multiple incoming connections, because multiple slaves might finish their computations at around the same time and hence send their results back simultaneously.

The tcpserver of a peer that runs as zombie does not allow any incoming connection.

Fair sharing of resources

If multiple end-users are simultaneously distributing their computational jobs, a problem can arise in distributing the limited resources to these users. One limited resource is cpu time, another is memory.

Peers that run in slave mode announce their host details, including the available memory. The optimization of jobs with respect to memory is currently implemented in peerfeval, which selects the slave that has the best fit. It first selects all slave with sufficient memory, and from those it selects the one with the least memory available.

Optimizing CPU time requirements of the jobs is challenging. Consider the following example: There are two master (i.e. end-user) nodes in the peer-to-peer network, and a single slave node. That means that the two master nodes compete for the slave. Subsequently consider the following two distributed processes on the two master nodes:

tic; peercellfun('pause', {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}); toc

and

tic; peercellfun('pause', {10, 10, 10, 10, 10, 10, 10, 10, 10, 10}); toc

The second job takes 10x as much (virtual) CPU time in total. However, if both processes have an equal probability of getting their job submitted to and executed on the slave, you would expect the two processes to take the same amount of time. Fair allocation of the CPU time should result in the process on the first master finishing approximately 10x faster than the process on the second master.

Fair sharing of CPU time is currently implemented by manipulating the likelihood that a job is accepted by the slave. A Job that takes a long time to finish, is more likely to be rejected. As long as a job is rejected, the master will retry submitting the job. In the mean time, the other master with the short jobs has more success in getting his jobs accepted and executed. In the example above, the change of accepting a job from the second master should be 1/10th of the change of accepting a job from the first master.

Both the efficient memory planning and the fair sharing of CPU time require that an estimate of the memory and CPU time need to be present. A fixed initial estimate is used, which is refined once the single job results are returned to the master.

Some considerations

The following is a list with details that are already implemented and/or supported.

  • each matlab session is either master (i.e. sending/receiving jobs) or slave (performing jobs)
  • there can be multiple masters and multiple slaves on the same network (preferably many more slaves than masters)
  • at the end of the day people would keep Matlab running, and type “peerslave”, resulting in the while-loop mentioned above
  • the next morning people return to their computer and do ctrl-c
  • the pwd and the path are passed along with the job, so that the peer can load users data and/or feval users scripts
  • fair sharing is implemented based on the estimated execution time
  • warnings and errors are captured and sent back to the master
  • by using additional information from the announce packet (speed and/or memory), the master selects the preferred slave for job execution (typically the one with the best memory match)
  • a restriction could be implemented as a list of users that is allowed to join a p2p network (requires the user id/name to be determined from the OS)
  • it should be possible to use localhost as the host name and to restrict requests to localhost
  • a restriction could be implemented on the hosts (based on a list of hostnames or IP addresses) to limit the p2p network to a trusted group
  • it might be convenient to give each p2p network a name, c.f. WORKGROUP on windows networks, especially in combination with host-restrictions and user-restrictions
  • if a job does not return in a given amount of time, it will be resubmitted
  • output on screen can be captured in a diary file and sent back to the master

The following is a list with unassorted ideas and considerations for improving and/or using the peer-to-peer parallel toolbox in an efficient manner in the typical research lab setting.

  • localhost slaves should be preferred over remote hosts
  • security is not part of this design, but can be implemented by running the Matlab slave session in a sandbox (i.e. as unpriviledged user)
  • a peerfor() function can be implemented on top of the peereval() function
  • penalizing
  • auto chunking
  • we could add cfg.inputfile and cfg.outputfile to all fieldtrip functions to allow parallelization over subjects when the data is too large to be accumulated on the peer master, e.g. peercelfun('preprocessing', {cfg1p, cfg2p, …}), followed by peercelfun('freqanalysis', {cfg1f, cfg2f, …})

Security

The whole mechanism does not have inherent security mechanisms implemented. An malevolent user could do

 peerfeval('system', 'rm -rf *')

erasing all users files on the computer hosting the slave. To solve this problem, the Matlab session with the peerslave server should be running under an unpriviledged account, i.e. as a user without write access to the important parts of the file system.

Relation between multiple peers

The schematic figure below shows how Matlab and the peer server (running as a mex file) work together in writing and receiving data from other peers.

Additional documentation on peer-to-peer distributed computing

development/peer.txt · Last modified: 2010/07/12 13:21 by robert
Back to top
chimeric.de = chi`s home Creative Commons License Valid CSS Driven by DokuWiki do yourself a favour and use a real browser - get firefox!! Recent changes RSS feed Valid XHTML 1.0