lecture notes

1 March 2019

some lecture notes

This page will be silently updated with any new stuff that I am picking up along the way:

-=-=-=-=-=-=-=-=-=

I have been learning the machine learning course by Andrew Ng; on youtube I had to write down my lecture notes, as I have been learning while commuting by train. Here are my lecture notes pdf file - link to pdf file.


Some stuff that I learned while working on storage systems. pdf notes here - link to pdf file.


Also wrote down my notes on the best book on computer networks: Networking on TCP/IP Illustrated,Volume 1 The Protocols W. Richard Stevens pdf notes here - link to pdf file.

(have to admit it tha I still don’t quite understand ipv6 after all these years, have to learn it some day)


Here are my notes on learning about eBPF


Here are my notes on lerarning about Kubernetes pdf notes here

and here some notes on learning about Kubernetes operators

Some notes on learning the go language

Here are my notes on learning tmux, a real time saver learning tmux


Some notes on how to do a good hash table


lately I had to do some work with Java, and noticed that a lot of code uses the stream feature of java8; so I had to learn this nice application of monads. Here are my notes on the subject


I came across a nice explanation of how bitcoin and this blockchain thing works; so here are my notes on the subject


Also I learned the following tricks in the process:

  • I did the notes on Libre office writer; it turns out that it has a working formula editor with a very nice syntax; Microsoft word is not usable for this purpose, as its formula editor keeps crashing.
  • Also Libre office has a very nice feature - it can save a document as PDF file (I think MS Office doesn’t know how to do that);
  • in github you can produce a binary release for a project that contains these pdf files (of course we know that it is a no-no to add binary files to a git archive), the blog in turn can link to these pdf files; this trick saves me the headache of having to reformat everying in markdown and then having to figure out where this mess fails to display properly. This way this blog becomes even more locked down on the github platform (three cheers for github!!!)
  • (i hope they will not pull another trick here, like the forced transition from redcloth to markdown markup. At least writing most stuff as a pdf will not make a difference when they do)
  • Some meta observations: most of the notes here are divided into nested lists, this helps me to come up with a more concise representation of complicated syntax structures; In a way this is similar to lisp with nested s-expr syntax; s-expr syntax is more concise than Algol like languages, however it creates problems for programs developed by large teams over a long period of time. Now maybe the whole point of s-expr/lisp is to teach another mode of thinking – by programming in lisp you train your mind to think in more abstract patterns! (it takes a lot to overcome the initial hurdles though – that is another reason why it so far didn’t turn into a mainstream programming language)

Other note on learning about distributed systems:

Map-reduce

(i was told tha map-reduce is no longer in use at google, however this paper is quite interesting if you want to learn about storage & distributed systems)

https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf

problem: working on large data sets that are distributed among a huge set of machines: each program needs to deal with node failures and distribution of data and load balancing; that results in lots of duplicated work; Instead use inversion to have a general mechanism to solve these problems in generalized manner.

Simplified model: most computations are two-step

  • perform a map function - it passes over input records and fill output multimap hash table (key can have multiple values)
  • apply reduce function - it passes over intermediate multimap and combines values with the same key into output value; this output value is passed to a Reduce function as input)

Implementation principles

  • works over data stored on google file system
  • the job is subdivided into set of splits (splits processed in parallel on different machines)
  • one process applies the map function over a single split
  • for each split the reduce function is subdivided: reduce can be subdivided into sub sets by applying processor-Index = hash(Key) % NumSets

Process

  • input data is split into distinct sets - splits (one split is 16MB)
  • one process is the master (coordinator) - it assigns an idle processing not to do a work unit (either MAP on split or REDUCE on subset of keys of a map produced by split)
  • the output of the map stage is buffered in memory.
  • when the map output buffer is written to disk (flushed): the output is partitioned into disjoined sets by hash(key) % R (prepare for the next reduce action)
  • master then selects idle processor for reduce step - the input file has already prepared by previous stage
  • reduce step: it first sorts the value set of a single key value (in order to remove duplicate values)
  • reduce step each writes its own output file (why not a global output file?)

Output: multiple files (one for each invocation of reduce step); - no need to combine into a global file: most often this is the input for another map/reduce program.

Data structures

  • for each task (either map or reduce) keep the state (idle, in-progress, complete), for completed jobs keep the result file (intermediate file) and its size
  • master pings each worker machine, if it fails then running tasks (or completed tasks - the result is kept on local machine!) are reset to idle state and resubmitted.
  • handles dependencies (reduce task depend on data task) given that the intermediate file may be gone due to worker machine failure.
  • a single master process (still writes check points so that a failure will be apparent from the output file)
  • bandwidth is conserved by use of GPF (google file system) feature - storage of chunks locally on the same machine as the worker process.
  • backup tasks: some workers are very slow (bad disk causes long IO times/machine is loaded) so a few machines are very slow (few tasks at the end of a batch run take very long time to complete); therefore, at the end of a run they duplicate remaining tasks (often that results in significant speed up).

  • the result of each map subtask is subdivided into R reduce tasks, so we have to assign the intermediate result (key, values) to a particular reduce task. This is often done by applying hash on key value (hash(key) % R = indexofreduceset) However, if key is URL then they may want to group all results of the same host, so they would need (hash(hostname(uri-key)) % R = indexofreduceset) as hash function (user may specify custom function)

  • combiner function: may have additional step; function for doing reduce on local machine (before sending it to reduce task, to reduce bandwidth (like combining away intermediate results)

  • reader: standard modes for interpreting input: key/value pairs, treat each line as input record, etc. cam specify additional optional component for customized parsing of input file.

Google file system

http://static.googleusercontent.com/media/research.google.com/en//archive/gfs-sosp2003.pdf

Principles

  • done as user mode library (not in kernel); does not do POSIX file semantics (even can have read from stale regions, temporarily - no operations on directory tree - it is maintained implicitly … read on)
  • components fail frequent; resilience and monitoring are very important (commodity hardware) must do replication.
  • dataset: large files (not many small files); this means block size and IO size must be tuned to this (no optimizations for small files!)
  • mostly appending files; almost no overwrites of existing data.
  • most read operations are sequential (read 1MB or more) (because most programs in google are data analysis / passes on the whole data set); (some reads are from random offset - small in size : applications often batch and sort small reads - to avoid performance problems (minimize seek times?))
  • caching is not important - (that doesn’t help with sequential access).
  • how do application considerations influence priorities? - they added append so that multiple clients can append simultaneously (for map/reduce, apparently), consistency model also designed per requirements of applications

Services

  • files are subdivided into equal sized chunks (64 mb) identified by 64bit handle. and multiple replicas of a chunks are maintained.
  • file: operations create/delete/open/close/read/write
  • snapshot create (fast copy of file or directory)/record append (allow to append concurrently - atomically)
  • file renaming atomically (as there is one master server that holds all metadata)

Architecture:

Actors:

Master - holds metadata Chunk server - holds chunk data (chunk is fixed sized range of 64MB) Client - talks to Master to get lease info, then with that info does IO on chunk via Chunk server

master

  • single instance
  • has metadata on all files hosted (file names, subdivision into chunks, where chunks are stored (set of chunk server addresses));
  • each file is divided into fixed sized chunks (64 MB size), each chunk is identified by a 64 bit chunk id
  • one master instance! no data is read through master (so that it does not become bottleneck, only metadata is handled);
  • all metadata in memory (no paging).
  • master periodically checks each chunk server for status (HEARTBEAT messages - the response includes which chunks are hosted by the chunks server instance (that makes relatively big messages!))
  • master handles replication of cached copies (of chunks among different chunk server instances). (may start to re-replicate chunks upon to meet high loads of particular chunks)
  • master creates log of metadata changes (so that it can restore its state in event of master server crash)
  • the logs do NOT have info on location of chunks (on what machine a chunk is hosted); The master asks the chunk servers on start-up and gets this information through HEARTBEAT checks (can’t store this info persistently in master - a chunk server may get corrupted disk, or other permanent failures) chunk server join and leave the cluster anyway.
  • the metadata record log is replicated on several machines (global consistency!); user gets ack on request only when the log for that change has been stored persistently - in all replicas.
  • when log grows to large they put in a serialized dump of metadata (BTREE, this entry is called checkpoint) and start with the logs after that (deltas relative to checkpoint)
  • they do not block updates upon metadata flush (writing of checkpoint); if metadata changes need to be done while checkpoint is being created and written, then this change is written into separate log file instance for this purpose! if incomplete checkpoint transaction then the tail is gathered from such a separate log file instance (but how is this temporal log file merged back into the main sequence when checkpoint has been completed?)
  • server cleans out stale regions (each region info returned in HEARBEAT has version tag - can have region that becomes invalid after update (update completed while a chunk server was down)
  • HEARTBEAT even has checksums on the data and tries to identify corrupted data so that it must replicate bad instances! (says so in 2.7.1) (? Limited guarantee - CRC checksums can go very bad with zero runs?)

chunk server

  • hosts chunk data
  • multiple computers that are communicate with master.
  • chunk server hosts several chunks, a client communicates with chunk server to read/modify data in a chunk.
  • responds to heartbeat messages sent by master to check chunk server status and send status of chunks)

Client

  • opens file with master;
  • obtains temporary leases on chunks from master (chunked = offset / CHUNK_SIZE); IO on chunk goes to chunk server;
  • no file data is cached by client, client caches metadata (what are files? where is the chunk?) for a limited period.
  • upon READ - translates offset to chunk id (= OFFET / FIXED_CHUNK_SIZE) - calls master for lease on chunk (and possibly several following chunks.
  • the server replies with location of all chunk replicas; the client chooses the nearest replica instance for read access.

Consistency model

(now it gets complicated)

Consistent region := all clients see the same data on all replicas (failed mutation makes region inconsistent)

Defined region := (upon modification by client A) region must be consistent and all clients saw the modifications of client A (problem: concurrent modifications leave a region defined but not consistent - writes from different clients become intermingled)

? didn’t they just say that only one client can get a write lease at a time?

Types of modifications:

  • Write - the application says what offset within a region will change
  • Append - an atomic ‘record’ append (? says at least once?) the offset of the write is determined by GPFS
  • Regular append - a write at the end of the file (at offset that is believed to be the end of file)

A write has to be copied among all chunk servers to become ‘Consistent’ (? When is this replication done? upon each write or is there an explicit action like ‘release lease’ that triggers this action?)

Now each region instance has version tag; (this is also used to deleted ‘garbage’/’stale regions’ that are out of date (chunk server down while another party modified the chunk) (? So the tag must be if hierarchic nature to detect such situations? not just a version number?)

Problem: client may temporarily read from stale replica (this problem is not solved, but read leases are temporary so that the problem is not quite as bad, at least stale replica can’t get modified)

HEARTBEAT even has checksums on the data and tries to identify corrupted data so that it must replicate bad instances! (says so in 2.7.1) (? Limited guarantee - CRC checksums can go very bad with zero runs?)

Can have data loss if all replicas of a region are bad, but system identifies this situation.

write flow

  • for a file/chunk the client gets a write lease from server (granted to one client exclusively)
  • server lease response includes list of addresses of all chunk servers that holds the chunk
  • client writes the data to all chunk servers (that hold this chunk id); chunk servers buffers this data (no write yet - doesn’t have serial id)
  • when all acks are in: send write request to master server
  • master assigns serial id for this write; (master log records written - apparently) - master sends write request to all chunk servers (with serial number)
  • chunk servers respond with write status; the status is returned to the client; the write is considered failed by the client if at least some chunk servers had errors while writing the data to disk. In this event the write is repeated with the current lease (for a couple of times); If it failed for all retries then client retries the write op from from the beginning.

Problems: write that cross chunk boundaries - are done as separate IO requests (each can fail, servers fail), so the file may be left in interesting global state (consistent but undefined)

write flow for record append

Now record append may have a situation where some chunk servers failed, in this situation the client does some record padding and retries the write at a different offset within the chunk; (padding may be up to a third of chunk size! - if the record is near the end of the chunk). If write crosses chunk boundary (tricky cases reported by some chunk server instances) then part of the data must be rewritten in the next chunk (wow, application that uses this stuff must be really complex)

communication

Actually the client is writing the chunk data to chunk server S1, S1 sends the data to chunk server S2 …. (each one chooses the closes neighbor (on the same link preferred, or smallest number of in-between switches). this saves bandwidth. (data is pipelined, S1 forwards to S2 as the data is received!)

snapshots

copying files just creates a reference (comparatively fast, still involves writing record to log), the actual chunk data is copied upon modification (copy on write) Has to do the following:

  • invalidates all outstanding leases (next interaction with the master will give it a chance to cope)
  • writes record to log

Master operations

many gory details here

namespace management

GFS has filenames like this aaa/bbb/ccc/ddd.file - it applies prefix compression, so that that common prefixes are represented as tree nodes, each tree node has a read/write lock. Any operation (like obtaining a lease) needs to get a lock on every component of this implicit tree (therefore can’t create name component(aka directory) while a file of the same name is being written)

where are new replica of chunks placed?

policy must maximize both bandwidth utilization and reliability.

  • Can’t place all replica on the same network link (rack) - if the switch is down then all replica are not available.
  • but some are on the same net link (this maximize bandwidth because data is routed preferably between instances on the same link)

When creating new chunk:

  • choose a machine with low disk utilization (to even things out)
  • limit number of recently created chunks on same machine (to prevent hot spots)

Re-replication: copy existing replicates when some replicated data has become corrupted and we need to get target number of replicates.

  • if a chunk has lost two replicas than it is in higher priority in the queue of pending replications than one that has lost one replica
  • chunks belonging to recently modified or active files is in higher priority for replication.
  • chunks ‘blocking client progress’ are in highest priority if replication needed. Also rebalancing: moving chunks with aim of evening disk utilization/improving network utilization/

space reclamation

when file is deleted

  • master writes metadata record
  • space is not reclaimed immediately! chunk server leaves the chunk data (may not be read via new leases - as the master has no metadata); chunk data is renamed to be a ‘hidden file’
  • space is freed by ‘garbage collection’: after 3 days the ‘hidden file’ get deleted and space can be reused.
  • during HEARBEAT: in each message the chunk server reports a subset of its chunks and the status/version; the MASTER reply contains the ids of chunk that are superseded by new version numbers, or where no metadata exists (orphaned). chunk server deletes these files

fault tolerance & diagnostics

fast startup

both master & chunk servers start fast. no special abnormal termination scenario - all exits are equal.

reliability

chunk data is replicated, master log too.

there is a shadow executable of the master that only reads logs + is ready any time. If current master crashes, then the DNS server sets its record to the shadow copy of master. (new access to master via DNS lookup) (? still it doesn’t know where the replica are - this info it gets when HEARBEAT messages arrives?)

On disk data corruption: chunk divided into 32k blocks, each block has its checksum (multi bit failures?) chunk server checks this checksum against the checksum computed over data that is about to be returned. write of new data is easy, overwrite of existing data needs to check the checksum of old data, overwrite then check checksum of newly written data (!)

TAO: Facebook distributed data store for social graph

https://www.usenix.org/system/files/conference/atc13/atc13-bronson.pdf https://www.usenix.org/conference/atc13/technical-sessions/presentation/bronson

  • First they used SQL DB (mySQL) and Memcache as lookaside cache
  • walking the local environment of a graph is probably faster than ploughing through an SQL table in search of every relevant table entry so they came up with a graph oriented database.
  • TAO still uses mySQL as persistent storage, but for the cache they now have a graph abstraction; (TAO is mostly read access, infrequent writes)

Design goals:

Optimized for reads / favor efficiency reliability over consistency

All objects are nodes in a graph; different object types - user node (each instance is a different user); checked-in-at (event type node); a comment is a node of type COMMENT.

TAO concepts:

TAO objects are typed nodes in a graph;

TAO objects := each object has a unique 64 bit integer id, object type (otype)

Tao associations - typed direct edges in a graph (unidirectional link)

Tao associations := source object (id1), target object (id2), association type (atype)

type - determines what attributes a object or associations can hold (there is a schema (type definition) that defines the set of attributes + default values)

All associations have a 32 bit time value (the schema for all associations has a time value)

Object: (id) -> (otype, (key value)?)

Assoc.: (id1, atype, id2) -> (time, (key value)?)

Tao interface:

  • creation of new object/find & delete object by id
  • creation of link (also provides automatic creation/update/deletion of an inverse relationship instance - if inverse relationship is asked for/defined in schema)
  • no atomic compare and set (they do eventual consistency)
  • manipulating associations
    • assoc-add(id1, atype, id2, time, (k->v)*)
    • assoc-delete(id1, atype, id2)
    • assoc-change-type(id1, atype, id2, newtype)

Query interface

Association list for object with id := list of edges (associations) that connect to object id, sorted by creation type

Association List: (id1, atype) -> [ a - association (edge) anew . . .aold]

now they got queries to ask for the Association list given that you have an id (and other constraints: like link type and optionally a time filter.

The API only returns up to a maximum of objects - if there are more than this they need repeated queries.

  • assoc get( /in/ id1, /in/ atype, /out/ id2set, /out/ high?, /out/ low?)
  • assoc count( /in/ id1, /in/ atype)

Architecture

  • use SQL DB (mySQLl) as backing store.
  • (important for such thing as backups / bulk updates)
  • use shards (DB is too big); each object id includes the ID of the shard that holds it (? can’t add any new shards?)
  • links don’t have a shard ID in their object id (they are tied to the shard of the object) links are stored on the shard of obj1 (source of edge) (bidirectional links may need to update two shards (if id1 and id2 are in different shards) (No atomicity: if the backward edge was not written then this edge will be cleaned up by background task).

cache layer

set of cache servers form a tier - a tier can handle any TAO request. Individual servers within a tear are sharded (similar to the DB sharding) cache eviction - Least recently used entries get thrown out.

For each tear there is a cache leader server (cache server that does updates to from mySQL DB There are second tear servers (followers) that are only in memory store, if cache miss then this one talks to leader server.

When leader fails: follows read directly from DB; for writes it sets a temporary leader (from among the followers)

when followers fail: For each tear there is a backup tier; When request to a shard server within a tear fail then request is still sent to corresponding shard server in backup tier.

Problems: consistency: an update in leader needs to send cache invalidation messages to each instance of following leader (change eventually will propagate - eventual consistency) Each cached item has version, when leader gets modify request it sends update with version number (important: follower cache must check if its current data is not stale relative to this version number) The follower cache update must be completed before the write request returns.

Advantage: can add following leaders as much as required.

Each geographical region has its own replica of DB + leader + follower caches. Now DB must be replicated across regions(!) Leader server of each region can also talk directly to leader of each other region (!) (says they have millisecond latency for communication between regions (!)

Caching server - modified Memcache

  • manages LRU (least recently used) for objects of the same size, uses SLAB allocator (set of pools, each pool of fixed size allocations)
  • memory is divided into Arenas (some object+association types get into the same arena) - this is for ‘better isolation: allows to have different cache lifetime for important object types)

  • refrain from keeping small items in separate hash bucket; here they have a dynamic array per bucket.
  • active pairs of (id, atype) are represented by a 16 bit integer index.

MySQL schema

  • all attributes of an object are in the same data column (do not have a separate table per object type)
  • for associations: must have range queries to that association table has an index based on (id,atype,time)
  • cache Sharding? (multiple follower caches for each Shard)