Saturday, October 21, 2017

How Kafka Achieves Durability

Durability is a guarantee that, once the Kafka broker confirms that the data is written, it will be permanent. Databases implement it by storing it in non-volatile storage. Kafka doesn't follow the DB approach!

Short Answer
Short answer is that, Kafka doesn't rely on the physical storage (i.e. file system) as the criteria that a message write is complete. It relies on the replicas.

Long Answer
When the message arrives to the broker, it first writes it to the in-memory copy of replica leader. Now it has following things to do before considering the write successful. 
Assume that, replication factor > 1. 

1. Persist the message in the file system of the partition leader.
2. Replicate the message to the all ISRs (in-sync replicas).

In ideal scenario, both above are important and should be done irrespective of order. But, the real question is, when does Kafka considers that the message write is complete? To answer this, let's try to answer below question-

If a consumer asks for a message which just go persisted on the leader at offset k, Will the leader return the data? And the answer is NO!

It's interesting to note that, not all the data that exists on the leader is available for clients to read. Clients can read only those messages that were written to in-sync replicas. The replica leader knows which messages were replicated to which replica, so until it's replicated it will not be returned to the client. Attempt to read those messages will result in empty response.

So, now it's obvious, just writing the message to leader (including persisting to the file system) is hardly of any use. Kafka considers a message written only if it's replicated to all in-sync replicas.

~Happy replication!

Saturday, October 7, 2017

My favourite fiz-buzz problem for Senior Programmers

This post, I will be discussing one of my favourite fiz-buzz problems for senior programmers/engineers. 

Find Kth largest element from a list of 1 million integers. 


Find Kth largest element at any given point of time from a stream of integers, count is not known. 

This problem is interesting as it has multiple approaches to solve and it checks the fundamentals of algorithms and data structure. Quite often, candidate start with asking questions like is the list sorted? Or can I sort the list ? In such case, I go and check on which sorting algorithm the candidate proposes. This gives me an opportunity to start conversation around complexity of the approach (particularly, time complexity). Most of the candidates are quick to point out algorithms (like Quick Sort , Merge Sort) which take O(NlogN) for sorting a list. This is right time to point out that why do you need to sort the complete array/list if you just need to find out 100th or kth largest/smallest element. Now the conversation usually go in either of the direction - 
  1. Candidate sometime suggest that, sorting is more quicker way to solve this problem - missing altogether the complexity aspect. If someone doesn't even realize that sorting is not the right way to handle this problem, then it kind of red signal for me. 
  2. At times candidates acknowledge the in-efficiency of sorting approach and then start looking for better approach. I suggest, candidates to think out loud which will give me insight about their thought process and how are they approaching it. When I see them not moving ahead; I suggest them on optimizing Quick sort approach ? Is there any way to cut down the problem size in half in every iteration ? Can you use divide and concur to improve on your O(NlogN) complexity ?   
This problem can be solved by Quick Select as well as using Heap data structure. This problem also has a brute force approach (i.e. run loop for k time; in each iteration find the maximum number lower than the last one). 

If the candidate doesn't make much progress then I try to simplify the problem by saying - find 3rd or 2nd largest element. I have seen some of the senior programmers failing to solve this trivial version as well. This is clear Reject sign for me.

Also, sometime I don't even ask candidate to code. I use this problem to just get an idea and skip the coding part if i see a programmer sitting right across me :)

-Happy problem solving !

Identifying Right Node in Couchbase

This post covers - how Couchbase achieves data partitioning and Replication. 

doc = "{"key1":"value1".....}"  ; doc-id = id


  • Based on key (or document id) the hash gets calculated.
  • Hash returns a value in the range [0,1023] both inclusive. This is known as partition id.
    • number basically maps the document to one of the vBuckets. 
  • Next task is to map the vBucket to a physical node. This gets decided by vBucket Map.
    • This maps tells which is the primary node for the document and which all are the backup nodes.   vBucket Map will have 1024 entries, one of each vBucket. And each entry also an array. The first value is for primary node and rest all are replica nodes.
  • Server list stores list of live nodes. So based on the index of the vBucketMap, we get to know the physical node IP address.

Friday, August 11, 2017

RAM sizing Data Node of Couchbase Cluster

This post talks about finding out how much RAM does your Couchbase cluster needs for holding your Data (in RAM)! 

RAM Calculator 

RAM is one of the most crucial areas to size correctly. Cached documents allow the reads to be served at low latency and high throughput.  Please note that, this doesn't not incorporate RAM requirement from the host/VM OS and other applications running along with Couchbase.

Enter below fields to estimate RAM -

Sample Document        (key)    (Value) 
This is required as document content length as well as ID length impacts RAM. Be mindful of the size aspect when deciding your key generation strategy. 

# Replicas                                        
Couchbase only supports upto 3 replicas. So enter either - 1, 2 or 3.

% Of Data you want to be in RAM  %
For best throughput you need to have all your documents in RAM i.e. 100% . This way any request will be served from RAM and there will be no IO.  In the field please enter only the value like 80, 100 etc. 

# Documents                                   
Number of documents in the cluster. When your application is starting from scratch then you can start with a number depending on the load of the application and then you need to evaluate it regularly and adjust your RAM quota if required. So, you can start with say 10000 or 1000000 documents. 

Type of Storage                                SSD        HDD
If storage is SSD then overhead % is 25 else it's 30%. SSD will bring better performance in disk throughput and latency. SSD storage will help improved performance if all data is not in the RAM. 

Couchbase Version                        < 2.1       2.1 or higher  
Size of meta data for 2.1 and higher versions is 56 bytes but for lower versions it's 64. 

High Water Mark                             %
If you want to use default value enter 85. 
If the amount of RAM used by documents reaches high water mark (upper threshold), both primary and replica documents are ejected until the memory usage reaches low Water Mark (lower threshold). 


Based on the RAM requirement for the cluster, you can plan how many nodes are required. Another important aspect in deciding number of data nodes is how you expect your system to behave if 1, 2 or more nodes go down at the same time. This link, I have discussed about Replication factor and how it affects your system performance. So, take your call wisely!

The value got calculated as explained in the Couchbase link, here.
Reference for calculating document size is, here

--- happy sizing :)

What's so special about Java 8 stream API

Java 8 has added functional programming and one of the major addition in terms of API is, stream.

A mechanical analogy is car-manufacturing line where a stream of cars is queued between processing stations. Each take a car, does some modification/operation and then pass it to next station for further processing.

Main benefit of stream API is that, now in Java (8) you can program at higher level of abstraction. So you can transform stream of one type to stream of other type rather than processing each item at a time (using for loop or iterator). With this Java 8 can run a pipeline of stream operations on several CPU cores on different components of the input. This way you are getting parallelism almost free instead of hard work using threads and locks.

Stream focuses on partitioning the data rather than coordinating access to it. 


Collection is mostly about storing and accessing the data, whereas stream is mostly about describing computation on data.