Friday, September 18, 2020

Designing Data Intensive Applications: Notes

https://learning.oreilly.com/library/view/designing-data-intensive-applications
--------------
Hard disks are reported as having a mean time to failure (MTTF) of about 10 to 50 years [5, 6]. Thus, on a storage cluster with 10,000 disks, we should expect on average one disk to die per day.

Rule of thumb. Number of failures per day = Total Disks(hardware)/MTTF in days
So here 10k/10k(30 years) = 1.
-------------
Algorithms to calculate a good approximation of percentiles at minimal CPU and memory cost are: forward decay, t-digest or HdrHistogram.
-----------
Chapter 2: Data Models and Query Languages: Summary

Data models are a huge subject, and in this chapter we have taken a quick look at a broad variety of different models. We didn’t have space to go into all the details of each model, but hopefully the overview has been enough to whet your appetite to find out more about the model that best fits your application’s requirements.

Historically, data started out being represented as one big tree (the hierarchical model), but that wasn’t good for representing many-to-many relationships, so the relational model was invented to solve that problem. More recently, developers found that some applications don’t fit well in the relational model either. New nonrelational “NoSQL” datastores have diverged in two main directions:

Document databases target use cases where data comes in self-contained documents and relationships between one document and another are rare.

Graph databases go in the opposite direction, targeting use cases where anything is potentially related to everything.

All three models (document, relational, and graph) are widely used today, and each is good in its respective domain. One model can be emulated in terms of another model—for example, graph data can be represented in a relational database—but the result is often awkward. That’s why we have different systems for different purposes, not a single one-size-fits-all solution.

One thing that document and graph databases have in common is that they typically don’t enforce a schema for the data they store, which can make it easier to adapt applications to changing requirements. However, your application most likely still assumes that data has a certain structure; it’s just a question of whether the schema is explicit (enforced on write) or implicit (assumed on read).

Each data model comes with its own query language or framework, and we discussed several examples: SQL, MapReduce, MongoDB’s aggregation pipeline, Cypher, SPARQL, and Datalog. We also touched on CSS and XSL/XPath, which aren’t database query languages but have interesting parallels.
-------------------------------

Chapter 3: Storage and retrieval

As a rule of thumb, LSM-trees(Log structured merge tree) are typically faster for writes, whereas B-trees are thought to be faster for reads. Reads are typically slower on LSM-trees because they have to check several different data structures and SSTables at different stages of compaction.

A downside of log-structured storage is that the compaction process can sometimes interfere with the performance of ongoing reads and writes.

An advantage of B-trees is that each key exists in exactly one place in the index, whereas a log-structured storage engine may have multiple copies of the same key in different segments. This aspect makes B-trees attractive in databases that want to offer strong transactional semantics: in many relational databases, transaction isolation is implemented using locks on ranges of keys, and in a B-tree index, those locks can be directly attached to the tree.

B-trees are very ingrained in the architecture of databases and provide consistently good performance for many workloads, so it’s unlikely that they will go away anytime soon. In new datastores, log-structured indexes are becoming increasingly popular. There is no quick and easy rule for determining which type of storage engine is better for your use case, so it is worth testing empirically.

----------

Indexing: A compromise between a clustered index (storing all row data within the index) and a nonclustered index (storing only references to the data within the index) is known as a covering index or index with included columns, which stores some of a table’s columns within the index.This allows some queries to be answered by using the index alone (in which case, the index is said to cover the query)

In MySQL’s InnoDB storage engine, the primary key of a table is always a clustered index, and secondary indexes refer to the primary key (rather than a heap file location).
-------
Data warehousing (OLAP as opposed to OLTP)
Column storage: Column compression using bitmaps. 
We are going to duplicate data anyways so why not sort each copy of data differently and for a given query choose the appropriate replica.

--------

Chapter 4: Encoding etc.
Protobuf/Thrift vs Avro => all are binary encodings.
Avro better suited for dynamic schema.
Parquet: column oriented format.
Forward/Backward compatibility
Avro - Writer's and Reader's schemas can be different. Avro has a way to resolve the differences.

--------------------
Chapter 5: Replication

single-leader, multi-leader, leaderless
synchronous, asynchronous, semi-synchronous(one follower is sync, others async)
Mysql => statement based replication vs row based replication. Row based replication is used when there is non-determinism in the statement(for e.g. NOW() or RAND())

How to ensure read-after-write consistency?
Route the requests to the leader for a while after an update is made by the user as the followers might be lagging.
Issues to consider: cross device requests, devices might be on different networks and hence routed to different DCs. Or the client can remember update timestamp and leader can route to the follower which is up-to-date till that timestamp.

Montonic reads -  a guarantee that user won't see content going backwards in time. Stronger than eventual consistency but weaker than strong consistency. One way to ensure is to always read from the same replica for a given user.

Consistent prefix reads - this guarantee says that if a sequence of writes happens in a certain order, then anyone reading those writes will see them appear in the same order.

Multi leader configuration is a dangerous territory which should be avoided whenever possible. But is unavoidable at times - for e.g. Calendar apps(or Collaborative editing apps like Google Docs) on various devices - each device has a local DB which has to accept the write even when offline. It's multi-leader config taken to extreme wherein each device is a DC.

For collaborative editing - you either take a lock on the document or make the unit of change very small - e.g. one keystroke.

CouchDB aims to make multi-leader config simpler.

CONFLICT AVOIDANCE - The simplest strategy for dealing with conflicts is to avoid them: if the application can ensure that all writes for a particular record go through the same leader, then conflicts cannot occur. Since many implementations of multi-leader replication handle conflicts quite poorly, avoiding conflicts is a frequently recommended approach.

CONFLICT RESOLUTION APPROACH - Last write wins(LWW - based on the highest timestamp(or id))

Some databases have a provision for conflict handler when the conflict is detected at read or write time.

There has been some interesting research into automatically resolving conflicts caused by concurrent data modifications. 

Conflict-free replicated datatypes (CRDTs) are a family of data structures for sets, maps (dictionaries), ordered lists, counters, etc. that can be concurrently edited by multiple users, and which automatically resolve conflicts in sensible ways.

Mergeable persistent data structures - track history explicitly, similarly to the Git version control system, and use a three-way merge function (whereas CRDTs use two-way merges).

Operational transformation - is the conflict resolution algorithm behind collaborative editing applications such as Etherpad and Google Docs. It was designed particularly for concurrent editing of an ordered list of items, such as the list of characters that constitute a text document.

LEADERLESS REPLICATION - came back in vogue after Amazon's DynamoDB.

In some systems, clients send requests to all the nodes whereas in others there is a co-ordinator node.

Writes are sent to all 3 replicas, if one node is down the client would still proceed since it received 2 OKs. While reading, the read requests should go to all the 3 nodes and based on the version of the record, client would decide which one to use.

Read repair - when a client detects a stale version from a replica, it writes back the newer version back to the replica. But it won't work well for values which ain't frequently read.

Anti-Entropy process - a process which keeps checking for differences in replicas in background.

Quorum Condition - If there are 'n' replicas and you wait for 'w' writes and 'r' reads to be acked you are guaranteed to get the latest value as long as w + r > n. 'w' and 'r' can be configured based on the workload. For e.g. w = n and r = 1 for read heavy loads, but here the writes will fail even if one node is unavailable.

But even with w + r > n there are edge cases when the replicas may end up with old writes, for e.g. concurrent writes, concurrent r+w, writes partially succeeding but not rolled back, a failed node restored with an older replica etc.

Sloppy quorom and hinted handoff - for a write, if the designated 'w' nodes for that record are not reachable, you can write to any 'w' nodes and once the 'home' nodes come back up the records are 'handed off'.

Merging records - for e.g. multiple clients adding items to shopping cart. Increment the version every time the cart is updated. For deleted items, add a tombstone marker which tells us the version in which the item was deleted. Handling it with multiple replicas calls for 'version vectors' or 'dotted version vectors'.

---------------------------

Chapter 6: Partitioning

SKEWED LOAD - Today, most data systems are not able to automatically compensate for such a highly skewed workload, so it’s the responsibility of the application to reduce the skew. For example, if one key is known to be very hot, a simple technique is to add a random number to the beginning or end of the key. Just a two-digit decimal random number would split the writes to the key evenly across 100 different keys, allowing those keys to be distributed to different partitions.

However, having split the writes across different keys, any reads now have to do additional work, as they have to read the data from all 100 keys and combine it. This technique also requires additional bookkeeping: it only makes sense to append the random number for the small number of hot keys; for the vast majority of keys with low write throughput this would be unnecessary overhead. Thus, you also need some way of keeping track of which keys are being split.

SECONDARY INDEXES - A lot of DBs don't support secondary indexes but they are the raison d’ĂȘtre of search servers such as Solr and Elasticsearch. The problem with secondary indexes is that they don’t map neatly to partitions. There are two main approaches to partitioning a database with secondary indexes: document-based partitioning and term-based partitioning.

PARTITIONING SECONDARY INDEXES BY DOCUMENTS (Local Indexing) - In this indexing approach, each partition is completely separate: each partition maintains its own secondary indexes, covering only the documents in that partition. For that reason, a document-partitioned index is also known as a local index. If you want to search for red cars, you need to send the query to all partitions, and combine all the results you get back. This approach is known as scatter/gather, and it can make read queries on secondary indexes quite expensive. Even if you query the partitions in parallel, scatter/gather is prone to tail latency amplification Nevertheless, it is widely used: MongoDB, Riak [15], Cassandra [16], Elasticsearch [17], SolrCloud [18], and VoltDB [19] all use document-partitioned secondary indexes. Most database vendors recommend that you structure your partitioning scheme so that secondary index queries can be served from a single partition, but that is not always possible, especially when you’re using multiple secondary indexes in a single query (such as filtering cars by color and by make at the same time).

PARTITIONING SECONDARY INDEXES BY TERMS (Global Indexing) - For e.g. entities with colors starting a-d reside in one partition and e-h in another. This means that reads are faster as you have to search only one partition but writes are expensive as you have to update 2 partitions(more if you have more than 1 secondary index).

REBALANCING PARTITIONS (Assigning partitions to nodes)
1. Don't do Hash Mod N. If we divide hashes keys into ranges and assign to a node by doing hash mod N a lot of keys will have to be reassigned if number of nodes changes. N is number of nodes here.
2. Fixed number of partitions - for e.g. fix the number of partitions to 1000 for a 10 node cluster. Assignment of a key to a partition is always fixed. If a new node is added, some partitions will be moved to the new node. If a node goes down, its partitions will be distributed to the rest of the nodes. So essentially only partitions move around to nodes, assignment of a key is always fixed to a partition. This approach won't work well if the dataset size is highly variable. 
3. Dynamic partitioning - start with a fixed set of partitions but keep merging and splitting based on the partition sizes.

So in 2. size of partitions is proportional to the dataset size where in 3. the number of partitions is proportional to the dataset size.

4. Fixed number of partitions per node - if a new node joins it splits some existing partitions and moves to itself. So the load per node is fairly balanced.

REQUEST ROUTING(Service Discovery) - to which IP/Port request for a key is routed?
3 approaches - 1. A routing layer 2. Partition aware client 3. Send to any node which is responsible for forwarding it correctly. Often the metadata is tracked by ZooKeeper which is informed by every node of its partitions. Updates published by ZooKeeper can be consumed by the routing tier or clients.
Often Gossip protocol is used when using 3.

Summary:
Key range partitioning good for range scan queries as the partition can keep the keys sorted.
Hash based partitioning destroys the ordering but achieves fair distribution of load.
---------------------------

Chapter 7: Transactions
IMPLEMENTING READ COMMITTED - One way is to acquire lock for reads but that's slow so DBs hold the value before the writer takes a lock which is given to readers. After the transaction is committed, DB updates it to the new value.

However, Read Committed can't prevent read skew and non-repeatable reads. It causes problems with backups, since they may reflect partial information. Or any large analytic/integrity queries would fail too. SNAPSHOT ISOLATION is the solution here. It ensures that every transaction reads from a consistent snapshot of the DB.

IMPLEMENTING SNAPSHOT ISOLATION - each row has multiple versions. Each row has created_by = id_of_txn_which_created_it and deleted_by = id_of_txn_which_deleted_it. An update is treated as Delete + Create. A read transaction can't see rows which were modified by in-progress/aborted/committed_with_a_bigger_txn_id. It's called MVCC - multi version concurrency control. It's an enhancement over the implementation of READ COMMITTED. Garbage collection gets rid of older rows.

How to update indexes - one way is to have the index point to all the row versions and filter out those which are not visible to the current transaction. There are other better ways though.

Snapshot isolation is often known as Serializable/Repeatable read.

LOST UPDATE - read-modify-write done by two txns in parallel. Only one will survive. Some DBs ensure that value = value + 1 is atomic and concurrency safe. Need to watch out for ORMs which use read-modify-write and not DB's atomic operations. Another alt is to take locks on those rows. Some DBs(not MySql) can detect lost updates using Snapshot Isolation and abort the offending txn.

Another way to handle lost updates is Compare and Set.
Update table set val = 'new' where id =1 and val = 'old';
So it will work only if no one has modified the value since it was first read.
But some DBs may implement it incorrectly if they read values from an old snapshot.

Locks and Compare-and-Set will not work well in replicated systems where there are multiple copies of DB. So the onus is on DB/Application to figure out how to merge them well. Atomic operations can work well in replicated context if they are commutative. Last Write Wins is prone to lost updates

WRITE SKEW - This effect, where a write in one transaction changes the result of a search query in another transaction, is called a phantom . Snapshot isolation avoids phantoms in read-only queries, but in read-write transactions like the examples we discussed, phantoms can lead to particularly tricky cases of write skew.
For e.g. at least one doctor has to be on-call. If two on-call doctors go off at the same time, you may end up with zero on-calls. Or multiple bookings for same room. In a multiplayer game, two players might move to same location. Double spending - where a user might the same money twice.
The problem is that we are reading some rows and two different transactions update two different rows. So there is no single object to attach a lock to.

One solution is Materializing Conflicts - for e.g. for room booking case create a table with all possible time slots(15 minutes) and lock the required slots for a given transaction. But it's a difficult approach in general.

Another solution is SERIALIZABLE ISOLATION LEVEL. It is usually regarded as the strongest isolation level. Here the DB prevents all possible race conditions. Net outcome is as if all transactions ran serially one after other. If it's so good, why isn't everyone using it? To understand that, let's explore how could it be possibly implemented. There are 3 ways:

1. Executing the transactions actually serially - it's been possible since around 2007 as the RAM has become cheaper and the entire active dataset can be kept it memory. Redis does it. Single threaded execution loop. But throughput it limited to one CPU core. To make single threaded executions efficient, Stored procedure transactions are used in which the entire transaction code is stored as a procedure with the DB and only the parameters are passed. This avoids network costs for the regular interactive transactions where the client first issue a query and then submits another query based on its results and so on. Also, it removes the need for keeping many transactions open.

But Stored procedures are not very popular in general due to - archaic programming languages, harder debugging, version control, deployment, metrics collection. A DB is much more performance sensitive than an application server and again performance debugging is difficult.

But these can be overcome. Modern day stored procedures use general purpose programming languages such as Lua for Redis, others us Java etc. VoltDB also uses stored procs for replication assuming they are deterministic(ref date/time functions). To use all the CPU cores, you can partition your data and allocate CPU cores to them. But cross partition transactions would be slower(by an order of magnitude in VoltDB). When the data has multiple secondary indexes, cross partition transactions would be much slower. For write heavy workloads, these things are not good.

So serial execution of transactions can be used when: active dataset fits into memory, each transaction is small and fast so as to not block others, write load is low enough to be served with single CPU core, cross partition transactions slowness can be tolerated.


2PL - 2 PHASE LOCKING(different from 2 phase commit): This was the only choice for almost 30 years to achieve serialization. In Snapshot Isolation the mantra is writers never block readers and readers never block writers whereas in 2PL writers block both readers/writers and readers block both readers/writers. Implementation of 2PL involves taking locks in shared/exclusive mode and detecting deadlocks. Details:
1. If a txn wants to read an object it must acquire a shared lock. Several txns can hold the shared lock but if a txn already holds an ex-lock the reading txns must wait for it to finish.
2. If a txn wants to write it must obtain an ex-lock, no other txn can hold any lock on this object whether ex or shared. This txn must wait for all the existing locks to be released.

So it has performance problems in which one long transaction can block all others.

Predicate locks - it applies to results of a query. For e.g. the room booking example - the txn must take the lock on the query(select * from bookings where room_id =1 and start_date >= and end_date <= y). It will ensure that only one txn is able to make a booking since it has got the lock on this predicate. The key idea is that the predicate lock applies to even those objects which are not yet in the DB.

Predicate locks have poor performance. When there are many active transactions, matching for predicates becomes very time consuming.
Due to the poor performance of predicate locks many DBs implement index range locking to achieve 2PL. A query for room booking can be generalized for all bookings with that room_id or all bookings for the time range. And the corresponding range in the index can be locked. If there is no index suitable for this query, a shared lock can be taken on the entire table.

So far it seems that concurrency control in DBs has 2 options - stronger Serializable isolation with poor performance(2PL) or poor scalability(serial execution in-memory single-core Redis) OR weak isolation with better performance but many race conditions.

But there is a new(2008) algo - Serializable Snapshot Isolation(SSI) - which has full serializability with only small performance penalty as compared to Snapshot Isolation. It might become new default in future.

Optimistic Concurrency Control - Proceed with the transactions in the hope that nothing bad will happen and before committing make sure that nothing bad happened.
-----------------------------
Chapter 8: The Trouble with Distributed Systems:
Monotonic clock vs time_of_the_day_clock => Monotonic clock is guaranteed to move forward as opposed to the time of the day clock which may jump back in time.
Issues with NTP servers: network congestion might result in delayed updates, NTP servers might be firewalled, some NTP servers might give wrong time. Clock drift should be closely monitored. Leap seconds might cause issues like time going back.

Due to all these factors, time returned by the system clock has a confidence interval or uncertainty attached to it. Google’s TrueTime API in Spanner, which explicitly reports the confidence interval on the local clock. When you ask it for the current time, you get back two values: [earliest, latest], which are the earliest possible and the latest possible timestamp. Based on its uncertainty calculations, the clock knows that the actual current time is somewhere within that interval. The width of the interval depends, among other things, on how long it has been since the local quartz clock was last synchronized with a more accurate clock source.

Consider Snapshot isolation for transactions happening in multiple DB nodes. Snapshot isolation needs to know which txn happened earlier and which one later? But how do you order these txns across nodes, since you don't know for sure whether clocks are in sync across nodes? Using the TrueTime API's confidence intervals, you can decide if they don't overlap. If they do overlap, then it's tricky. Google's DCs have GPS receivers to ensure that clocks are synced upto 7 ms.

Process Pauses:
while (true) {    
  request = getIncomingRequest();
  // Ensure that the lease always has at least 10 seconds remaining    
  if (lease.expiryTimeMillis - System.currentTimeMillis() < 10000) {       
    lease = lease.renew();   
  }   
  if (lease.isValid()) {
        process(request);   
  }
}

If there is a single leader per partition who has to renew the lease periodically and the above code is being used, there are multiple issues. Firstly, the lease expiry time and current time are computed on different machines so drift will come into play. Also the monotonic clock is not being used. Secondly, process may pause for various reasons(GC, Live VM Migration, Thread Context Switch, IO waits - disk/network) and in the meantime another node may take the lease and do something unsafe.

Limiting the scope of Garbage collection - one approach is to do GC only for short lived objects and periodically restart process to deal with long lived objects while redirecting traffic to other nodes. Other is to not handle traffic on a node when GC is about to start. Something like rolling upgrades.

Fencing - client 1 takes lock to write to a file but GC starts and its lease expires, in the meantime client 2 takes the lock and writes to the file, later client 1 wakes up and writes again. To avoid this the file storage server could ask for an auto incrementing lock token. Client 2 would send 34 and client 1 33 so client 1's request would be rejected.

So far we have assumed that nodes are unreliable but honest. What if some nodes "lie"? May be they were compromised. This leads us to Byzantine systems - A system is Byzantine fault-tolerant if it continues to operate correctly even if some of the nodes are malfunctioning and not obeying the protocol, or if malicious attackers are interfering with the network. But in most server-side data systems, the cost of deploying Byzantine fault-tolerant solutions makes them impracticable.
-------------------
Chapter 9: Consistency and Consensus
Linearizability - once a more recent value has been read, subsequent reads shouldn't get the older value. If a DB offers both 
Linearizability and Serializability, it's called strict serializability.
Linearizability essentially means “ such systems behave as though there is only a single copy of the data, and all operations on it are atomic,”

When is Linearizability useful? Leader election. Once a leader has been elected, everyone should agree about who holds this lock. Distributed locking.

Uniqueness constraints also require 
Linearizability. It also avoids seat overbooking, double spend of balance etc.

Cross-channel timing dependencies - Often these problems arise when there is an additional communication channel between 2 services.

Implementing Linearizable systems - single leader replication is possibly linearizable but not multi leader and leaderless systems. As multi leader systems are quite useful in the multi DC setup, insisting on Linearizability would stop us from getting stronger systems. Thus, applications that don’t require linearizability can be more tolerant of network problems. This insight is popularly known as the CAP theorem.

A better way of phrasing CAP would be either Consistent or Available when Partitioned.

Although linearizability is a useful guarantee, surprisingly few systems are actually linearizable in practice. For example, even RAM on a modern multi-core CPU is not linearizable. If one thread writes and another thread reads afterwards it might hit the memory cache. The reason to tolerate this is performance. Same is true for many DBs which avoid 
Linearizability.

Can’t we maybe find a more efficient implementation of linearizable storage? It seems the answer is no: Attiya and Welch prove that if you want linearizability, the response time of read and write requests is at least proportional to the uncertainty of delays in the network. In a network with highly variable delays, like most computer networks, the response time of linearizable reads and writes is inevitably going to be high.

If a system obeys the ordering imposed by causality, we say that it is causally consistent. For example, snapshot isolation provides causal consistency: when you read from the database, and you see some piece of data, then you must also be able to see any data that causally precedes it (assuming it has not been deleted in the meantime).

Linearizability - In a linearizable system, we have a total order of operations: if the system behaves as if there is only a single copy of the data, and every operation is atomic, this means that for any two operations we can always say which one happened first.

Causality We said that two operations are concurrent if neither happened before the other (see “The “happens-before” relationship and concurrency”). Put another way, two events are ordered if they are causally related (one happened before the other), but they are incomparable if they are concurrent. This means that causality defines a partial order, not a total order: some operations are ordered with respect to each other, but some are incomparable.

As per the above there are no concurrent operations in a linearizable datastore: there must be a single timeline along which all operations are totally ordered. Concurrency would mean that the timeline branches and merges again—and in this case, operations on different branches are incomparable (i.e., concurrent).

So Linearizability implies Causality and hence stronger than it. In many cases, systems that appear to require linearizability in fact only really require causal consistency, which can be implemented more efficiently. Based on this observation, researchers are exploring new kinds of databases that preserve causality, with performance and availability characteristics that are similar to those of eventually consistent systems [49, 50, 51]. As this research is quite recent, not much of it has yet made its way into production systems, and there are still challenges to be overcome [52, 53]. However, it is a promising direction for future systems.

Generating total order with sequence numbers - use Lamport Timestamps.

Atomic commit and 2 phase commit - 
Two-phase commit is an algorithm for achieving atomic transaction commit across multiple nodes—i.e., to ensure that either all nodes commit or all nodes abort.

2PC uses a new component that does not normally appear in single-node transactions: a coordinator (also known as transaction manager). The coordinator is often implemented as a library within the same application process that is requesting the transaction.

A 2PC transaction begins with the application reading and writing data on multiple database nodes, as normal. We call these database nodes participants in the transaction. When the application is ready to commit, the coordinator begins phase 1: it sends a prepare request to each of the nodes, asking them whether they are able to commit. The coordinator then tracks the responses from the participants: If all participants reply “yes,” indicating they are ready to commit, then the coordinator sends out a commit request in phase 2, and the commit actually takes place. If any of the participants replies “no,” the coordinator sends an abort request to all nodes in phase 2.

========================

--------------------------------
Chapter 10: Batch Processing


Services vs Batch processing vs Stream processing

Services => Online, Req/Resp, quality measured by response time

Batch processing => Offline job processing, measured by throughput

Stream processing => Near real-time, somewhere between online and offline


Batch processing =>

Unix tools, piped processing, independent of each other but work well with each other due to the same interface

MapReduce is similar, uses HDFS which is shared-nothing architecture, unlike SAN and NAS.


First, each map task partitions its output by reducer, based on the hash of the key. Each of these partitions is written to a sorted file on the mapper’s local disk.

Whenever a mapper finishes reading its input file and writing its sorted output files, the MapReduce scheduler notifies the reducers that they can start fetching the output files from that mapper. The reducers connect to each of the mappers and download the files of sorted key-value pairs for their partition.

The reduce task takes the files from the mappers and merges them together, preserving the sort order. Thus, if different mappers produced records with the same key, they will be adjacent in the merged reducer input. The reducer is called with a key and an iterator that sequentially scans over all records with the same key (which may in some cases not all fit in memory).


Joins in batch processing:

Joins in case of a single entity in a normal online service are best handled with an index.

But in case of batch processing, we want to resolve all the entities.
Map side joins.
Reduce side joins.

Comparing Hadoop to Distributed Databases:

The biggest difference is that MPP(massively parallel processing) databases focus on parallel execution of analytic SQL queries on a cluster of machines, while the combination of MapReduce and a distributed filesystem provides something much more like a general-purpose operating system that can run arbitrary programs.

And this is why MapReduce is designed to tolerate frequent unexpected task termination: it’s not because the hardware is particularly unreliable, it’s because the freedom to arbitrarily terminate processes enables better resource utilization in a computing cluster.

In an environment where tasks are not so often terminated, the design decisions of MapReduce make less sense.

So 
Improvements over MapReduce:

Dataflow Engines: Get rid of intermediate file writing to disk(materialization) and stream the output directly to the next job. This will work unless sorting is required.

Pregel model for iterative processing on Graphs: Repeat until done. Very complicated to do it via MapReduce. For e.g. PageRank.

=============================

Chapter 11:

Stream processing

In principle, a file or database is sufficient to connect producers and consumers: a producer writes every event that it generates to the datastore, and each consumer periodically polls the datastore to check for events that have appeared since it last ran.

However polling becomes expensive if the datastore is not designed for this kind of usage. The more often you poll, the lower the percentage of requests that return new events, and thus the higher the overheads become. Instead, it is better for consumers to be notified when new events appear. Databases have traditionally not supported this kind of notification mechanism very well: relational databases commonly have triggers, which can react to a change but they are very limited in what they can do and have been somewhat of an afterthought in database design. Instead, specialized tools have been developed for the purpose of delivering event notifications.
-----------------------------------------------------------------

One option is to directly connect producers with consumers: for e.g. TCP or UDP sockets.

Message brokers: specialized databases optimized for these scenarios.

Typically message brokers don't persist messages after they have been delivered.

Why can we not have a hybrid, combining the durable storage approach of databases with the low-latency notification facilities of messaging? This is the idea behind log-based message brokers.

More recently, there has been growing interest in change data capture (CDC), which is the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems. CDC is especially interesting if changes are made available as a stream, immediately as they are written.

For example, you can capture the changes in a database and continually apply the same changes to a search index. If the log of changes is applied in the same order, you can expect the data in the search index to match the data in the database. The search index and any other derived data systems are just consumers of the change stream.

Apache Kafka provides CDC connectors for various databases.

Increasingly, databases are beginning to support change streams as a first-class interface, rather than the typical retrofitted and reverse-engineered CDC efforts.

Command and event: Command is the user request to do something. Event is generated when the Command is found to be valid and successfully executed.

Stream analytics systems sometimes use probabilistic algorithms, such as Bloom filters (which we encountered in “Performance optimizations”) for set membership, HyperLogLog  for cardinality estimation, and various percentile estimation algorithms.

This use of approximation algorithms sometimes leads people to believe that stream processing systems are always lossy and inexact, but that is wrong: there is nothing inherently approximate about stream processing, and probabilistic algorithms are merely an optimization.


Sunday, September 6, 2020

free video editing tools

handbrake => burn subtitles into mp4
avidemux => cut video anywhere(not only the ends)
freemake => vob to mp4 converter

Thursday, August 13, 2020

grep extract only matching text

 cat filename | grep --extended-regexp --only-matching --regexp="data-text=\"[a-zA-Z0-9 ]*\""

I had an html where I wanted to extract text like "data-text=abcd"

Wednesday, August 5, 2020

html video player with zooming

<html><head><meta name="viewport" content="width=device-width"></head><body>
<style>
.video-container {
  position: absolute;
  top: 0;
  bottom: 0;
  width: 100%;
  height: 100%; 
  overflow: hidden;
}

.video-container video {
  /* Make video to at least 100% wide and tall */
  min-width: 100%; 
  min-height: 100%; 
  
  /* Setting width & height to auto prevents the browser from stretching or squishing the video */
  width: auto;
  height: auto;
  
  /* Center the video */
  position: absolute;
  top: 50%;
  left: 50%;
  transform: translate(-50%,-50%);
}


////////////////////////////////////////

body { 
  background: #000;
  color: #fff;
  font-family: 'Oxygen', sans-serif;
}
  
.overlay {
  background: rgba(0,0,0,0.5);
  position: absolute;
  bottom: 0;
  left: 0;
  right: 0;
  width: 95%;
  max-width: 45em;
  margin: auto auto 1em;
  box-sizing: border-box;
  padding: 2em;
  line-height: 1.5;
  text-align: center;
  
  :last-child { margin-bottom: 0; }

  h1 {
    font-size: 14pt;
    font-weight: normal;
    text-shadow: 0 0 .3em #000;
    margin: 0 0 1em;
  }

  p {
    font-size: 11pt;
    text-shadow: 0 0 .3em #000;
    margin: 1em 0;
  }
  a {
    color: #fff;
  }
}

code { font-family: monospace; }
</style>

<div class="video-container">
<video controls="" autoplay="" name="media"><source src="mp4link" type="video/mp4"></video>
</div>
</body></html>

Monday, July 27, 2020

Ignoring whitespace diff in Git GUI

Edit => Options => Additional Diff Parameters(Both in NRP repository
and Global) = -w --ignore-all-space

Thursday, April 23, 2020

Create a self signed cert, convert .pem to .cer


Create private key:
openssl genrsa -out privkey.pem 2048
 
See what's inside the private key:
openssl rsa -noout -text -in privkey.pem

Create public key using the private key
openssl req -new -x509 -key privkey.pem -out cacert.pem -days 3650

See what's inside the public key:
openssl x509 -in cacert.pem -text

Convert public key from .pem to .cer
openssl x509 -inform PEM -in cacert.pem -outform DER -out a.cer

Tuesday, April 21, 2020

System design interviews

Some common points:
1. Logging - async logging - traceId which is passed to every layer to enable better debugging and log collation. Hot/cold storage for logs - ssd(recent logs) vs hdd(older logs)

2. For media(videos) - how to handle long connections - LB will maintain state while backend undergoing change - for real time streaming, it's ok - just resume from current time. For static videos, resume from last known location. Use DSR, where LB is bypassed in the reverse direction and data directly goes to client.

3. Analytics - separate storage for archival. EMR jobs. HDFS. Pig scripts.

4. Caching - Distributed vs. each node having the full replica. In the second case, each one just listens for updates which flow from the master. How much RAM does each node have? Prevents a hop but not scalable. (Aerospike vs Redis distributed). Routing in the first case - client frequently syncs with the master for latest routing table updates so as to directly hit the backend vs. touching master for every call.

5. Queuing - Kafka. Producers/consumers/intermediate storage - each layer should be able to independently scale. How to partition for listeners? How much data to store waiting for consumers to consume. Each consumer should be able to go back and forth in the queue irrespective of how much has been processed.

6. LB - DNS(round robin/geo aware) -> L3(BGMP)->L4(TCP)->L7(http/redis). Passthru vs connection termination. DSR? SSL termination? Http2?

7. Search - Lucene/Autocomplete based on tries/

8. Social media content - Taiji - User clique based DC routing. A user is likely to consume content generated by friends. So cache data for a clique in the same DC.

9. Separate out read/write flows - scale indep'ly. 100:1 read:write ratio.

10. Redundancy - region/zone

11. News feed - push/pull/hybrid

12. DB partitioning - distribute tables? foreign keys? joins?

13. Security,permissions,file sharing.

14. Dropbox - files split in chunks - to better manage retries.  Exponential backoff. Mobile clients do pull based sync to preserve data/battery. Response queue for each client. Request queue global. Inline Dedupe vs post processing dedupe.

15. Yelp/Uber - quad tree

16. Ticketmaster - transaction isolation levels

17. 

Friday, March 13, 2020

Multi tier load balancing with Linux

Multi-tier load-balancing with Linux


Summary in my words:
 
Multiple L7 load balancers talk to multiple backend servers. Backend servers don't have to worry about path MTU discovery, TCP congestion control algorithms, avoidance of the TIME-WAIT state and various other low-level details. But how do you load balance the L7 LBs?

L3 load balancing can help. All L7 LBs can advertise the same IP to their nearby routers. With a combination of ECMP and BGP protocols, routers can load balance the packets to L7 LBs. But adding/removing an L7 LB in this setup breaks existing connections(which is problematic for long connections like downloads) since routers don't use consistent hashing. Also, if one router becomes unavailable, other routers may forward the packets to a different LB since every router decides individually.

To mitigate this, use L4 LBs between L3 and L7. The purpose of this tier is to ensure that all members take same scheduling decision for an incoming packet based on destination IP and port. Consistent hashing is used to route packets to L7 LBs so removal/addition of an L7 LB isn't problematic. Additionally L7 LBs return response directly to L3 routers(DSR) to improve perf.

So, routers use ECMP + BGP to route packets to L4 LBs(all of which advertise the same IP). L4 LBs use consistent hashing scheme to talk to L7 LBs. And L7 LBs return the response directly to L3.

Actual Snippets from the article:

A combination of:
 
  1. ECMP routing
  2. Stateless L4 load-balancing
  3. Stateful L7 load-balancing

L7 (Last Tier): 

Working in the highest layers of the OSI model, it can also offer additional services, like TLS-termination, HTTP routing, header rewriting, rate-limiting of unauthenticated users, and so on. Being stateful, it can leverage complex load-balancing algorithm. Being the first point of contact with backend servers, it should ease maintenances and minimize impact during daily changes.

It also terminates client TCP connections. This introduces some loose coupling between the load-balancing components and the backend servers with the following benefits:
  1. connections to servers can be kept open for lower resource use and latency,
  2. requests can be retried transparently in case of failure,
  3. clients can use a different IP protocol than servers,
  4. and servers do not have to care about path MTU discovery, TCP congestion control algorithms, avoidance of the TIME-WAIT state and various other low-level details.

As is, this solution is not complete. We have just moved the availability and scalability problem somewhere else. How do we load-balance the requests between the load-balancers?

First tier: ECMP routing (L3)
 
On most modern routed IP networks, redundant paths exist between clients and servers. For each packet, routers have to choose a path. When the cost associated to each path is equal, incoming flows are load-balanced among the available destinations. This characteristic can be used to balance connections among available load-balancers.

If we assume you already have BGP-enabled routers available, ExaBGP is a flexible solution to let the load-balancers advertise their availability.

Unfortunately, this solution is not resilient when an expected or unexpected change happens. Notably, when adding or removing a load-balancer, the number of available routes for a destination changes. The hashing algorithm used by routers is not consistent and flows are reshuffled among the available load-balancers, breaking existing connections.

Moreover, each router may choose its own routes. When a router becomes unavailable, the second one may route the same flows differently which again breaks existing connections.

If you think this is not an acceptable outcome, notably if you need to handle long connections like file downloads, video streaming or websocket connections, you need an additional tier. Keep reading!

Second tier: L4 load-balancing

The second tier is the glue between the stateless world of IP routers and the stateful land of L7 load-balancing. It is implemented with L4 load-balancing. The terminology can be a bit confusing here: this tier routes IP datagrams (no TCP termination) but the scheduler uses both destination IP and port to choose an available L7 load-balancer. The purpose of this tier is to ensure all members take the same scheduling decision for an incoming packet. There are two options:

  1. stateful L4 load-balancing with state synchronization across the members,
  2. or stateless L4 load-balancing with consistent hashing.

The first option increases complexity and limits scalability. We won’t use it. The second option is less resilient during some changes but can be enhanced with a hybrid approach using a local state.

We use IPVS, a performant L4 load-balancer running inside the Linux kernel, with Keepalived, a frontend to IPVS with a set of healthcheckers to kick out an unhealthy component. IPVS is configured to use the Maglev scheduler, a consistent hashing algorithm from Google. Among its family, this is a great algorithm because it spreads connections fairly, minimizes disruptions during changes and is quite fast at building its lookup table. Finally, to improve performance, we let the last tier—the L7 load-balancers—sends back answers directly to the clients without involving the second tier—the L4 load-balancers. This is referred to as direct server return (DSR) or direct routing (DR).
   
With such a setup, we expect packets from a flow to be able to move freely between the components of the first two tiers while sticking to the same L7 load-balancer.

Tier 0: DNS load-balancing

Optionally, you can add DNS load-balancing to the mix. This is useful either if your setup is spanned across multiple datacenters, or multiple cloud regions, or if you want to break a large load-balancing cluster into smaller ones. It is not intended to replace the first tier as it doesn’t share the same characteristics: load-balancing is unfair (it is not flow-based) and recovery from a failure is slow.



Monday, March 9, 2020

Interview Questions

Counting sort




(diagram gives the impression of DAG, but it's DG => Directed and cyclic, find the shortest path from source to all)
non heap approach as well

Interval merging/intersection/Task Scheduling
31. https://leetcode.com/problems/task-scheduler/

32. https://leetcode.com/problems/non-overlapping-intervals/ - sort by end time ascending, maximize number of tasks finished
33. https://leetcode.com/problems/minimum-number-of-arrows-to-burst-balloons - same as above, sort by end time

34. https://leetcode.com/problems/interval-list-intersections/
35. https://leetcode.com/problems/merge-intervals/ - start time ascending sort, keep merging with top of the stack
36. https://leetcode.com/problems/meeting-rooms/

These 4 problems have exact same solution:
37. https://leetcode.com/problems/meeting-rooms-ii/
38. https://leetcode.com/problems/my-calendar-i/
39. https://leetcode.com/problems/my-calendar-ii/
40. https://leetcode.com/problems/my-calendar-iii/

41. https://leetcode.com/problems/data-stream-as-disjoint-intervals/

Binary Tree/Binary Search Tree
https://leetcode.com/problems/binary-tree-coloring-game/

42. Inorder Traversal => https://leetcode.com/problems/binary-search-tree-iterator/
43. Inorder Traversal => https://leetcode.com/problems/validate-binary-search-tree/
44. InOrder => https://leetcode.com/problems/kth-smallest-element-in-a-bst
45. InOrder => https://leetcode.com/problems/increasing-order-search-tree
46. PostOrder => https://leetcode.com/problems/serialize-and-deserialize-binary-tree/
47. PostOrder => https://leetcode.com/problems/range-sum-of-bst/
48. PostOrder => https://leetcode.com/problems/flatten-binary-tree-to-linked-list
49. DFS (backtracking) https://leetcode.com/problems/lowest-common-ancestor-of-a-binary-tree/
50. DFS => https://leetcode.com/problems/lowest-common-ancestor-of-deepest-leaves
51. DFS https://leetcode.com/problems/all-nodes-distance-k-in-binary-tree/
52. BFS => https://leetcode.com/problems/check-completeness-of-a-binary-tree/
53. BFS => https://leetcode.com/problems/binary-tree-right-side-view/
54. https://leetcode.com/problems/binary-tree-vertical-order-traversal/
55. https://leetcode.com/problems/closest-binary-search-tree-value/
56. https://leetcode.com/problems/smallest-subtree-with-all-the-deepest-nodes/
57. https://leetcode.com/problems/minimum-depth-of-binary-tree/
58. https://leetcode.com/problems/binary-tree-longest-consecutive-sequence/
59. https://leetcode.com/problems/binary-tree-longest-consecutive-sequence-ii/
60. https://leetcode.com/problems/split-bst/
61. https://leetcode.com/problems/populating-next-right-pointers-in-each-node
62. https://leetcode.com/problems/univalued-binary-tree
63. https://leetcode.com/problems/diameter-of-binary-tree

Binary Search
64. https://leetcode.com/problems/missing-element-in-sorted-array/
65. https://leetcode.com/problems/random-pick-with-weight/
66. https://leetcode.com/problems/find-first-and-last-position-of-element-in-sorted-array/
67. https://leetcode.com/problems/first-bad-version/
68. https://leetcode.com/problems/fixed-point/
69. https://leetcode.com/problems/count-negative-numbers-in-a-sorted-matrix
https://leetcode.com/problems/shortest-way-to-form-string/

DP
78. https://leetcode.com/problems/partition-equal-subset-sum
79. https://leetcode.com/problems/minimum-cost-for-tickets/
80. https://leetcode.com/problems/longest-arithmetic-sequence/
81. https://leetcode.com/problems/decode-ways/
82. https://leetcode.com/problems/minimum-knight-moves/
83. https://leetcode.com/problems/target-sum/
84. https://leetcode.com/problems/maximal-square
85. https://leetcode.com/problems/word-break/
86. https://leetcode.com/problems/longest-line-of-consecutive-one-in-matrix
87. https://leetcode.com/problems/knight-probability-in-chessboard/
88. https://leetcode.com/problems/valid-palindrome-iii/
89. https://leetcode.com/problems/longest-palindromic-subsequence
90. https://leetcode.com/problems/number-of-ways-to-stay-in-the-same-place-after-some-steps/
https://leetcode.com/problems/perfect-squares/
https://leetcode.com/problems/campus-bikes-ii/
https://leetcode.com/problems/cherry-pickup-ii/
https://leetcode.com/problems/minimum-distance-to-type-a-word-using-two-fingers/
https://leetcode.com/problems/minimum-swaps-to-make-sequences-increasing/
https://leetcode.com/problems/odd-even-jump/
https://leetcode.com/problems/count-square-submatrices-with-all-ones/

https://leetcode.com/problems/wiggle-sort/

Deep Copy
105. https://leetcode.com/problems/clone-graph/
106. https://leetcode.com/problems/copy-list-with-random-pointer/

Arithmetic operations/Math
107. https://leetcode.com/problems/multiply-strings/
108. https://leetcode.com/problems/divide-two-integers/ (Read the Solution section, quite good)
109. https://leetcode.com/problems/bulb-switcher/
110. https://leetcode.com/problems/fraction-to-recurring-decimal/
111. https://leetcode.com/problems/basic-calculator-ii/
112. Geometry => https://leetcode.com/problems/minimum-area-rectangle
113. https://leetcode.com/problems/largest-number/
114. https://leetcode.com/problems/projection-area-of-3d-shapes/
115. https://leetcode.com/problems/number-of-steps-to-reduce-a-number-to-zero
116. https://leetcode.com/problems/plus-one/


Blog Archive