First of all - I would like to thank you all for following my posts, for all those lovely interactions on and off the blog and encouraging me to do better, all this time!
Its time to say goodbye to the Blogger, as the very experience of writing posts here has become such a pain!! Adding to it, there has been no update to the blogspot, since a long time! I understand that they are busy changing logos! Sure! so be it!!
So, for the obvious reasons - its time to move on!
I've moved all the posts on this site to my new site : http://femgeekz.github.io/, based on a simple jekyll theme.
The site is still a work in progress, but that shouldn't stop me from posting new content, with all new variety!
This site is no longer being updated. Please visit : http://femgeekz.github.io/
Thanks for stopping by! :)
Adieu!
- Swatz
Wednesday, 2 September 2015
Saturday, 3 January 2015
Apache Mesos : Writing your own distributed framework
In the previous post, we saw what mesos is, how is it useful and getting started with it. In this post, we shall see how to write your own framework on mesos.
(In mesos, a framework is any application running on it.)
This post explains about a framework called "mesos-pinspider" which fetches the user profile information and user board information of a pinterest page of a user.
- Driver which submits the tasks to the framework
- Scheduler which registers with the master to be offered resources, takes the tasks and runs them on executor
- Executor process that is launched on slave nodes to run the framework’s tasks
You may check the code here on github. Let's break it down to PinDriver, PinScheduler and PinUserProfileExecutor.
Note: Please note that two ExecutorInfo are used ie. one for fetching user profile information and the other one for user board information for demonstration. This explanation involves only one executorinfo - userProfileExecutorInfo
Executor is a callback interface which is implemented by frameworks' executors. In our implementation, let us concentrate on launchTask()
Scheduler is a callback interface to be implemented by frameworks' schedulers. In our implemenation, let us concentrate on resourceOffers(), statusUpdate() and frameworkMessage()
Happy Learning! :)
(In mesos, a framework is any application running on it.)
This post explains about a framework called "mesos-pinspider" which fetches the user profile information and user board information of a pinterest page of a user.
Mesos Framework
In general, a Mesos framework has three basic components.- Driver which submits the tasks to the framework
- Scheduler which registers with the master to be offered resources, takes the tasks and runs them on executor
- Executor process that is launched on slave nodes to run the framework’s tasks
Pinspider Framework Example
You may check the code here on github. Let's break it down to PinDriver, PinScheduler and PinUserProfileExecutor.
Driver
The driver component of the framework is PinDriver.- Create Executor Info
Protos.ExecutorInfo userProfileExecutorInfo = Protos.ExecutorInfo.newBuilder().setExecutorId(Protos.ExecutorID.newBuilder().setValue("PinUserProfileExecutor")).setCommand(commandInfoUserProfile).setName("PinUserProfileExecutor Java").setSource("java").build();
- Create Framework Info
Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder().setFailoverTimeout(120000).setUser("").setName("Pinspider Framework");
- Instantiate Scheduler
Scheduler scheduler = args.length == 1 ?new PinScheduler(userProfileExecutorInfo,userBoardExecutorInfo) : new PinScheduler(userProfileExecutorInfo, userBoardExecutorInfo, Integer.parseInt(args[1]), args[2]);
Note: Please note that two ExecutorInfo are used ie. one for fetching user profile information and the other one for user board information for demonstration. This explanation involves only one executorinfo - userProfileExecutorInfo
- Starting the mesos scheduler driver.
MesosSchedulerDriver schedulerDriver = new MesosSchedulerDriver(scheduler,frameworkBuilder.build(), args[0]);int status = schedulerDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1;schedulerDriver.stop();System.exit(status);
Executor Implementation
The Executor component of the framework is PinUserProfileExecutor.Executor is a callback interface which is implemented by frameworks' executors. In our implementation, let us concentrate on launchTask()
@Override public void launchTask(final ExecutorDriver executorDriver, final Protos.TaskInfo taskInfo) {}
- Set the task status by setting the ID and the state with a builder pattern.
Protos.TaskStatus taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_RUNNING).build();
- Send the status update to the framework scheduler retrying as necessary until an acknowledgement has been received or the executor is terminated, in which case, a TASK_LOST status update will be sent.
executorDriver.sendStatusUpdate(taskStatus);
- Get the data from the tasks and run your logic.
try { message = ("userprofile :" + getUserProfileInfo(url)).getBytes(); } catch (IOException e) { LOGGER.error("Error parsing the Pinterest URL :" + e.getMessage()); }
- Send the framework the message.
executorDriver.sendFrameworkMessage(message);
- Mark the state of the task as finished and send the status update to the framework scheduler.
taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()) .setState(Protos.TaskState.TASK_FINISHED).build();executorDriver.sendStatusUpdate(taskStatus);
- main() method to create an instance of MesosExecutorDriver and run
mesosExecutorDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1
Scheduler Implementation
The Scheduler component of the framework is PinScheduler.Scheduler is a callback interface to be implemented by frameworks' schedulers. In our implemenation, let us concentrate on resourceOffers(), statusUpdate() and frameworkMessage()
- Constructor : construct with the executor information and the number of launch tasks.
public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor , Protos.ExecutorInfo pinUserBoardExecutor ) {
this(pinUserProfileExecutor,pinUserBoardExecutor, 5, "http://www.pinterest.com/techcrunch");
}
public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor,Protos.ExecutorInfo pinUserBoardExecutor, int totalTasks, String url) { this.pinUserProfileExecutor = pinUserProfileExecutor; this.pinUserBoardExecutor = pinUserBoardExecutor; this.totalTasks = totalTasks; this.crawlQueue = Collections.synchronizedList(new ArrayList<String>()); this.crawlQueue.add(url); }
- Resource Offers
- A resource offer can be resources like CPU, memory etc. From the offers list, get the scalar value of the resources. We need to give our requirements of resources for the tasks while setting the task info.
for (Protos.Offer offer : list) { List<Protos.TaskInfo> taskInfoList = new ArrayList<Protos.TaskInfo>(); double offerCpus = 0; double offerMem = 0; for (Protos.Resource resource : offer.getResourcesList()) { if (resource.getName().equals("cpus")) { offerCpus += resource.getScalar().getValue(); } else if (resource.getName().equals("mem")) { offerMem += resource.getScalar().getValue(); } } LOGGER.info("Received Offer : " + offer.getId().getValue() + " with cpus = " + offerCpus + " and mem =" + offerMem);
- Create task ID.
Protos.TaskID taskID = Protos.TaskID.newBuilder().setValue(Integer.toString(launchedTasks++)).build();
- Create task info by setting task ID, adding resources, setting data and setting executor.
Protos.TaskInfo pinUserProfileTaskInfo = Protos.TaskInfo.newBuilder().setName("task " + taskID.getValue()).setTaskId(taskID).setSlaveId(offer.getSlaveId()).addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(CPUS_PER_TASK))).addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(MEM_PER_TASK))).setData(ByteString.copyFromUtf8(crawlQueue.get(0))).setExecutor(Protos.ExecutorInfo.newBuilder(pinUserProfileExecutor)).build();
- Launch the tasks through the SchedulerDriver.
... taskInfoList.add(pinUserProfileTaskInfo);taskInfoList.add(pinUserBoardTaskInfo);}schedulerDriver.launchTasks(offer.getId(), taskInfoList);
- Status update
@Override public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {... }
- Stop the SchedulerDriver if tasks are finished
if (taskStatus.getState() == Protos.TaskState.TASK_FINISHED) { finishedTasks++; LOGGER.info("Finished tasks : " + finishedTasks); if (finishedTasks == totalTasks) { schedulerDriver.stop(); } }
- Abort the SchedulerDriver if the tasks are killed, lost or failed
if (taskStatus.getState() == Protos.TaskState.TASK_FAILED
|| taskStatus.getState() == Protos.TaskState.TASK_KILLED
|| taskStatus.getState() == Protos.TaskState.TASK_LOST) {
LOGGER.error("Aborting because the task " + taskStatus.getTaskId().getValue() + " is in unexpected state : "
+ taskStatus.getState().getValueDescriptor().getName() + "with reason : " + taskStatus.getReason()
.getValueDescriptor()
.getName()
+ " from source : " + taskStatus.getSource().getValueDescriptor().getName() + " with message : "
+ taskStatus.getMessage());
schedulerDriver.abort();
}
- Framework Message
- Handle your message
Complete code is available here with the instructions to run and sample output.@Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID, byte[] bytes) { String data = new String(bytes); System.out.println(data); LOGGER.info("User Profile Information : " + data); }
Happy Learning! :)
Labels:
apache mesos,
distributed,
distributed systems,
framework,
linux,
mesos,
pinterest
Location:
Los Angeles, CA, USA
Subscribe to:
Posts (Atom)