Pascal Weisenburger, Guido Salvaneschi
TU Darmstadt, Germany
Developing distributed systems is hard
@peer type Master
@peer type Worker
val tasks: List[Task] on Master
= placed { getTaskList() }
@peer type Master { type Tie <: Multiple[Worker] }
@peer type Worker { type Tie <: Single[Master] with Multiple[Worker] }
Crosscutting functionality separated among compilation units
Developers are not forced to modularize along network boundaries
That's only half the battle!
How to modularize code into (distributed) system functionalities?
disentangle distribution and modularization
disentangle distribution and modularization
@multitier trait Offloading[T] {
@peer type Master <: { type Tie <: Multiple[Worker] }
@peer type Worker <: { type Tie <: Single[Master] }
def run(task: Task[T]): Future[T] on Master =
placed { (remote(selectWorker()) call execute(task)).asLocal }
private def execute(task: Task[T]): T on Worker =
placed { task.process() }
}
@multitier trait Monitoring {
@peer type Master <: { type Tie <: Multiple[Worker] }
@peer type Worker <: { type Tie <: Single[Master] }
def monitoredTimedOut(monitored: Remote[Worker]): Unit on Master
}
@multitier trait TaskScheduler[T] extends
Offloading[T] with
Monitoring
@multitier trait BackupService {
@peer type Processor <: { type Tie <: Single[Storage] }
@peer type Storage <: { type Tie <: Single[Processor] }
def store(id: Long, data: Data): Unit on Processor
def load(id: Long): Future[Data] on Processor
}
@multitier trait FileBackup extends BackupService {
def store(id: Long, data: Data): Unit on Processor =
placed { remote call write(id, data) }
def load(id: Long): Future[Data] on Processor =
placed { (remote call read(id)).asLocal }
private def write(id: Long, data: Data): Unit on Storage =
placed { writeToFile(data, s"/storage/$id") }
private def read(id: Long): Data on Storage =
placed { readFromFile[Data](s"/storage/$id") }
}
@multitier trait Editor {
val backup: BackupService
@peer type Client <: backup.Processor {
type Tie <: Single[Server] with Single[backup.Storage] }
@peer type Server <: backup.Storage {
type Tie <: Single[Client] with Single[backup.Processor] }
}
@multitier object editor extends Editor {
@multitier object backup extends FileBackup
}
@multitier trait MutualExclusion[T] {
this: Architecture with LeaderElection[T] =>
def lock(id: T): Boolean on Node = { ... }
def unlock(id: Id): Unit on Node = { ... }
}
@multitier trait LeaderElection[T] {
this: Architecture with Id[T] =>
def electLeader(): Unit on Node
def electedAsLeader(): Unit on Node
}
@multitier abstract class Id[T: Ordering] {
this: Architecture =>
val id: Local[T] on Node
}
@multitier trait HirschbergSinclair[T]
extends LeaderElection[T] {
this: Ring with Id[T] =>
def electLeader() = on[Node] { elect(0) }
private def elect(phase: Int) = on[Node] { /* ... */ }
private def propagate(remoteId: T, hops: Int,
direction: Direction) = on[Node] { /* ... */ }
}
@multitier object locking extends
MutualExclusion[Int] with
HirschbergSinclair[Int] with
Ring with
RandomIntId
@multitier object TaskManager {
@peer type JobManager <: { type Tie <: Multiple[TaskManager] }
@peer type TaskManager <: { type Tie <: Single[JobManager] }
def submitTask(td: TaskDeployment, tm: Remote[TaskManager]) =
on[JobManager] { (remote(tm) call startTask(td)).asLocal }
def startTask(td: TaskDeployment) = on[TaskManager] {
val task = new Task(td)
task.start()
Acknowledge()
}
...
}
@multitier object TaskManagerActions { ... }
@multitier object CheckpointResponder { ... }
@multitier object ResultPartitionConsumableNotifier { ... }
@multitier object PartitionProducerStateChecker { ... }
@multitier object KvStateRegistryListener { ... }
@multitier object TaskDistributionSystem extends
TaskManager with
TaskManagerActions with
CheckpointResponder with
ResultPartitionConsumableNotifier with
PartitionProducerStateChecker with
KvStateRegistryListener