Reflection on Designing Distributed System

Just finished reading “Designing Distributed Systems: Patterns and Paradigms for Scalable, Reliable Services” by Brendan Burns. I enjoyed reading the book. I think this book is going to add value to people who has recently get their hands on docker and orchestration tools like kubernetes. It is going to empower those developers/devops/admins by helping them to learn more about containerization design patterns. This book is only 150+ pages and it discusses wide range of tools and techniques with hands on example but I would definitely recommend this book for the theory and discussion rather than the practical aspect of it.

Containers establish boundaries that helps to provide separation of concerns around specific resources. When the scaling is not requirement and application runs on a single node, but the application is being run only on a single node, containerising still is going to be better choice because not only it helps us to scale but also it can be used to organise and maintain better.

Sometime in an application, we have tasks that runs in the background and other tasks that are served to endusers. Often the tasks that serves endusers are more important. We can prioritise those tasks in our containers by scaling those containers in a node appropriately.

When engineering teams starts to scale, for better engineering performance, usually a team of developers comprises of six to eight people. When more engineers join, usually it increases the number of teams. A better engineering performance, maintainability, accountability and ownership structure can be implemented by assigning the responsibility of a containerised application to a team. Additionally, often some of the components, if factored properly, are reusable modules that can be used by many teams. 

Single node patterns:

  • Sidecar Pattern: Sidecar containers, are a group of containers that runs together and stayed together on the same node, and they share processes, ports and so on. It can be a very useful when dealing with legacy services or 3rd party maintained closed source application about which we have very little knowledge. In sidecar pattern, unencrypted traffic is only sent via the local loopback adapter inside the container group, that makes the data and communication between them safe.
  • Ambassador Pattern: In ambassador pattern the two containers are tightly linked in a symbiotic pairing that is scheduled to a single machine. Ambassador can be used as proxy server. In a sharded system, ambassador pattern can be used to route to a Shard. Ambassador can be used for Service Broker as well.
  • Adapters: Although every application is different, they still have some things in common. They need to be logged, monitored, maintained, etc. Adapter patter takes advantage of this use case and implemented in a sidecar pattern. This common use case can be modularized and use as sidecar container. Popular use case can be accessing and collecting desired logs, monitoring performance by periodic executing a command, etc via a container that runs as sidecar.

Serving Patterns:

Replicated Load-Balanced Services: In replicated load balanced services, the same data is being replicated across multiple server and a service is being used as a load balancer.

Sharded Services: When the size of data becomes huge for a single machine, the data is splitted and distributed across multiple machine. A router is placed in front of the service to identify which shard to route to to fetch that data. Usually a carefully chosen hashing key is used to identify where the data is located and which server to route to. For better performance and reliability, replication and sharding is used together.

Scatter/Gather: Scatter/gather is quite useful when there is a large amount of mostly independent processing that is needed to handle a particular request. Scatter/gather can be seen as sharding the computation necessary to service the request, rather than sharding the data.

When there is more data that a single machine can hold or process in time, it becomes necessary to introduce sharding by splitting the data across multiple devices. Then when it comes to querying, it is going to require query to all the root notes then the partial responses from the nodes returns to the root node and that root node merges all of the responses together to form a comprehensive response for the user. The limitation of this approach is, the response time is dependent on lowest performing node, thus increased parallelism doesn’t always speed things up because of overhead or straggler on each node. Also mathematically speaking, 99th percentile latency on 5 node for individual requests becomes a 95th percentile latency for our complete scatter/gather system. And it only gets worse from there: if we scatter out to 100 leaves, then we are more or less guaranteeing that our overall latency for all requests will be 2 seconds. 

A single replica means that if it fails, all scatter/gather requests will fail for the duration that the shard is unavailable because all requests are required to be processed by all leaf nodes in the scatter/gather pattern. This risk is being addressed by adding multiple replica of a node.

Functions and Event-Driven Processing 

Function level decoupling: In microservice approach we splitted our application in small parts and managed by a team. FaaS takes it to the next level. It forces to strongly decouple each function of service.

 There is a common saying that 80% of the daily users are going to be using 20% of the features. For the rest of 80% features that are not being used extensively, can be an ideal candidate for FaaS that charges based on number of requests so we are only paying for the time when your service is actively serving requested and we are always paying for processor cycles that is largely sitting around waiting for a user request. 

The function in FaaS, dynamically spuns up in response to a user request while the user is waiting, the need to load a lot of detail may significantly impact the latency that the user perceives while interacting with your service and this loading cost can be amortized across a large number of requests. 

This approach also has many limitations, the function communicates with each other via network and each function instance cannot have high local memory. So maybe it is a good fit for responding to temporal events, but it is still not sufficient infrastructure for generic background processing and the requiring states needs to be stored in a storage service which adds more complexity in code.

Because of this highly decoupled nature, from the other functions, there is no real representation of the dependencies or interactions between different functions, it becomes difficult to find bugs. Also the cost of a bug related to infinity loop is high.

Batch Computational Patterns: for reliable, long-running server applications. This section describes patterns for batch processing. In contrast to long- running applications, batch processes are expected to only run for a short period of time. 

Work Queue Systems: In the containerised work queue, there are two interfaces: the source container interface, which provides a stream of work items that need processing, and the worker container interface, which knows how to actually process a work item. The work processor can be implemented by a container that processes Shared Work Queue where items in the queue as batch operation or it can be implemented in multi-worker pattern which transforms a collection of different worker containers into a single unified container that implements the worker interface, yet delegates the actual work to a collection of different, reusable containers.

Depending on workload, shared work queued containers needs to be scaled accordingly. Keeping one or few workers running when the frequency of works coming is very low can be overkill. Depending on the workload for a task we can also consider implementing multi-worker pattern using kubernetes jobs to implement work processor. We can easily write a dynamic job creator, which is going to spawn a new job when there is a new item in the queue.

Event-Driven Batch Processing: Work queues are great for enabling individual transformations of one input to one output. However, there are a number of batch applications where you want to perform more than a single action, or you may need to generate multiple different outputs from a single data input. In these cases, you start to link work queues together so that the output of one work queue becomes the input to one or more other work queues, and so on. This forms a series of processing steps that respond to events, with the events being the completion of the preceding step in the work queue that came before it.

In an complex event driven system, often the event needs to go through many steps where data needs to divided into few queues, sometime it requires to merged to get an output.

Copier: The job of a copier is to take a single stream of work items and duplicate it out into two or more identical streams. 

Filter: The role of a filter is to reduce a stream of work items to a smaller stream of work items by filtering out work items that don’t meet particular criteria. 

Splitter: divide them into two separate work queues without dropping any of them. 

Sharder: divide up a single queue into an evenly divided collection of work items based upon some sort of shard‐ ing function

Merger: the job of a merger is to take two different work queues and turn them into a single work queue. 

Coordinated Batch processing:

To achieve more complex event driven architecture, we are going to need more coordinated batch processing techniques.

Join (or Barrier Synchronization) : In merge, it does not ensure that a complete dataset is present prior to the beginning of processing but when it comes to join, it does not complete until all of the work items that are processed. It reduces the parallelism that is possible in the batch workflow, and thus increases the overall latency of running the workflow. 

Reduce : Rather than waiting until all data has been processed, it optimistically merge together all of the parallel data items into a single comprehensive representation of the full set. In order to produce a complete output, this process is repeated untill all of the data is processed, but the ability to begin early means that the batch com‐ putation executes more quickly overall.