Elastic MapReduce (EMR)
In general, EMR gives us access to:
- Managed Hadoop framework that runs on EC2 instances
- EMR is a confusing name, because Map Reduce itself is an obsolete part of Hadoop, and EMR is a lot more than just a Hadoop cluster.
- it does include Hadoop, and MapReduce is in there, but it's the stuff that's built on top of Hadoop that you tend to use more often:
- Includes Spark, HBase, Presto, Flink, Hive & more
- EMR Notebooks
- much like a Jupyter notebook, that runs on your EMR cluster
- Several integration points with AWS
This is relevant for the world of machine learning because if you have a massive data set that you need to prepare and normalize / scale or otherwise treat before it goes into your algorithms, EMR provides a way of distributing the load of processing that data across an entire cluster of computers.
For massive data sets you often need a cluster to actually process that data and prepare it for your machine learning training jobs in parallel across an entire cluster.
EMR Clusters
A cluster in EMR is a collection of EC2 instances where every EC2 instance is called a Node. Each node has a role within the cluster which is called the node type:
-
Master node
-
manages the cluster by running software components to coordinate the distribution of data and tasks among the other nodes for processing.
-
It tracks the status of tasks and monitors the health of the cluster
-
Every cluster has a master node.
-
You can also create a node cluster of just a master node, if you have just enough processing that runs on a single machine.
-
Sometimes referred to as the leader node.
-
Single EC2 instance
-
-
Core node:
-
These are nodes with software components that run tasks and store data on the Hadoop distributed file system (HDFS)
-
Multi node clusters will have at least one core node.
-
Hosts HDFS data and runs tasks
-
Can be scaled up & down, but with some risk
-
-
Task node:
-
Runs tasks, does not host data
-
These are nodes with software components that only run tasks and do not store data on HDFS.
-
These are optional but they are a good use of spot instances because you can introduce them and take them out of the cluster as needed, because they don't really impact the storage on your cluster, since they don't actually talk to HDFS
-
There aren't any permanent files stored on the task node that your cluster needs so they're only used for computation.
-
If you have a sudden task that needs to run on a massive amount of data and you're only gonna need that once, you can introduce a task node into your cluster and then remove it when you're done and your cluster will happily run along without it
-
No risk of data loss when removing
-
Good use of spot instances
-
EMR Usage
-
Transient vs Long-Running Clusters
-
A transient cluster is configured to be automatically terminated once all the steps that you've defined to run on it have been completed
- Can save money this way
-
Long running clusters, where you create a cluster and you want to interact with the applications on it directly and then just manually terminate it when you're done with it
-
This is more appropriate for say, ad-hoc queries, or experimenting with data sets where you don't really know what you want to do upfront and you don't have some repeatable sequence that you just want to run over and over again.
-
In that case you would want to have a long running cluster and just terminated by hand when you're done.
-
-
Can spin up task nodes using Spot instances for temporary capacity
-
Can use reserved instances on long-running clusters to save $
-
-
Connect directly to master to run jobs
- Once you have a cluster spun up either way, you can connect to it directly to the master node through EC2, and run jobs from the terminal there
-
Submit ordered steps via the console
- or you can submit ordered steps via the console as well if you can predefine those through the console in AWS
-
EMR Serverless lets AWS scale your nodes automatically
EMR / AWS Integration
-
Amazon EC2 for the instances that comprise the nodes in the cluster
-
Amazon VPC to configure the virtual network in which you launch your instances
-
Amazon S3 to store input and output data
- Instead of HDFS if you wish
-
Can use Amazon CloudWatch to monitor cluster performance and configure alarms
-
Can use AWS IAM to configure permissions
-
AWS CloudTrail to audit requests made to the service
-
AWS Data Pipeline to schedule and start your clusters if you're running predefined set of steps
EMR Storage
-
The default storage solution on Hadoop is HDFS
-
It's a distributed scalable file system for Hadoop.
-
It distributes the data that it stores across every instance in your cluster and it stores multiple copies of data on different instances to ensure that no data is lost if an individual instance fails.
-
Every file in HDFS is stored as blocks and is distributed across Hadoop cluster.
-
By default, the size of a block in a HDFS is 128 megabytes.
-
This is ephemeral storage
-
Once you terminate your cluster, that storage that was stored locally on those nodes goes with it
-
that's the reason not to use HDFS
-
-
However it's a lot faster
-
We don't have to go across the internet to access that data
-
It's all local on the actual notes that are doing the processing of your data and Hadoop has a lot of optimizations built into it so that it tries to optimize things such that, where the code is running the process a bit of data is the same node where that data is stored.
-
-
So HDFS, very good from a performance standpoint, but it has a downside that when you shut down your cluster that data goes away.
-
-
EMRFS: access S3 as if it were HDFS
-
Still pretty fast
-
EMRFS Consistent View - Optional for S3 consistency
-
Uses DynamoDB to track consistency
-
-
Local file system if you want. Not distributed, so only useful on the master node
-
EBS for HDFS
- We can also back HDFS with elastic block storage so EBS and HDFS have a relationship there.
EMR promises
-
EMR charges by the hour
- Plus EC2 charges
-
Provisions new nodes if a core node fails
-
Can add and remove tasks nodes on the fly
- can use spot instances to add additional capacity or remove capacity without impacting the underlying storage on the HDFS file system.
-
Can resize a running cluster's core nodes
Hadoop
When we talk about Hadoop itself, we're usually talking about the HDFS file system, YARN, and MapReduce.
And underlying all of these is something called Hadoop core, or Hadoop common that will be the libraries and utilities required for all of these modules to run on top of. This provides a file system, an operating level system, stuff that it needs to abstract itself from the OS and all of the Java archive files and scripts that are needed to start up Hadoop itself.
At the lowest level above that, would be a HDFS. That is the Hadoop distributed file system, a distributed scalable file system for Hadoop. Again, it distributes the data it stores across the instances in the cluster. Multiple copies of the data are stored on different instances to ensure that no data is lost if an individual instance fails, it is lost upon the termination of the cluster however, but it's useful for caching intermediate results during map reduced processing, or for workloads that have significant random IO.
On top of HDFS we have Hadoop YARN. YARN stands for yet another resource negotiator. It's the component introduced in Hadoop 2.0 to centrally manage cluster resources for multiple data processing frameworks; that enables us to use things other than MapReduce, as we'll see shortly.
MapReduce is a software framework for easily writing applications that process vast amounts of data, in parallel, on large clusters of commodity hardware in a reliable fault tolerant manner. Map reduce consists of mapper functions and reduce functions, that you write in code
-
Map function
-
maps data to sets of key value pairs called the intermediate results
-
Map functions generally do things like transform your data, or reformat it, or extract the data that you need
-
that's why it's relevant to the world of exploratory data analysis.
-
-
Reduce function
- combine those intermediate results and applies additional algorithms and produces the final output
Generally, the mappers are the things that are transforming your data and preparing it, reducers are aggregating that data and distilling it down to a final answer that you want.
Apache Spark
However, these days Apache Spark has largely taken the place of MapReduce and thanks to the existence of YARN, Spark can actually sit on top of your Hadoop cluster and use the underlying resource negotiator and file system that Hadoop offers, but just offer a faster alternative to MapReduce.
Spark, which can be optionally installed on your EMR cluster, is an open-source distributed processing system commonly used for big data workloads - it's really the hotness right now (Instead of MapReduce). It utilizes in-memory caching optimized query execution for fast analytic queries against data of any size.
It uses a directed acyclic graph. That's the main trick for its speed compared to map reduce - it can be smarter about the dependencies and processing and how to schedule those more effectively. Spark has APIs for Java, Scala, python, and R and it supports code reuse across multiple workloads, like batch processing and interactive queries, in real time analytics and machine learning and graph processing.
It has a bunch of different use cases that we'll talk about; in stream processing, machine learning, interactive SQL, however spark is generally NOT used for OLTP (Online Transaction Processing is a type of data processing that consists of executing a number of transactions occurring concurrently) or batch processing jobs, it's more for transforming data as it comes in.
How Spark Works
Spark applications are run as an independent set of processes on a cluster. They're all coordinated by the spark context object of the main program, i.e. the driver program. That's the actual code that you write to make your spark job run.
The spark context connects to different cluster managers which allocate the resources across the applications so for example, YARN or Spark has its own built in one you can use as well if you're not on a Hadoop cluster.
Upon connecting, spark will acquire executors on nodes in the cluster. The executors are processes that run computations and stored data for your applications. The application code is sent to the executors and in the final step the spark context will send tasks to the executors to run.
Spark Components
Spark itself has many different components, just like Hadoop does. Underlying everything is spark core. It acts as the foundation for the platform. It's responsible for things like memory management, and fault recovery, and scheduling and distributing, monitoring your jobs and interacting with storage systems.
It has APIs for Java, Scala, python and R and at the lowest level it uses something called a resilient distributed data set or an RDD, that represents a logical collection of data partitioned across different compute nodes.
There's a layer above that with Spark SQL, that is a distributed query engine that provides low latency interactive queries up to 100 times faster than map reduce. It concludes a cost-based optimizer, columnar storage, and co-generation for fast queries and it supports various data sources coming from JDVC, ODBC, JSON, HDFS, hive, ORC, or parquet files. It also supports querying hive tables using hive QL if you want
The really important thing about Sparks SQL is that it exposes something called a data frame in python, or a data set in Scala, and this is sort of taking the place of the lower-level resilient distributed data sets and spark these days. Modern Spark code tends to interact with data in much the same way as you would with a data frame in pandas or a database table in a relational database.
You can actually issue SQL commands to your spark cluster, and under the hood it will figure out how to actually transform that into a distributed query that executes across your entire cluster, so very useful stuff.
We also have Spark Streaming. That's a real time solution that leverages spark core's fast scheduling capabilities to do streaming analytics. Data gets ingested in mini batches and analytics on that data within the same application code written for batch analytics can be applied to those mini batches, so it's pretty cool because you can use the same code that you wrote for batch processing and apply that to your real time streaming with sparks streaming. It supports ingestion from Twitter Kafka, Flume, HDFS, and zero MQ as we'll see - it can also integrate with AWS Kinesis.
We also have MLLib, the machine learning library for Spark.
Finally, we have GraphX, which is a distributed graph processing framework built on top of Spark. We're not talking about charts and graphs like QuickSight here, we're talking about computer science graphs. E.g. a graph of people in a social network, it's more of a data structures thing. It provides ETL and exploratory analysis and iterative graph computation to enable users to interactively build and transform a graph data structure at scale.
Spark MLLib
Offers several different machine learning algorithms but what's special is that they're all implemented in a way that is distributed and scalable, so not all machine learning algorithms really lend themselves well to parallel processing - a lot of them sort of need to be reimagined in order to spread that load out across an entire cluster of computers.
-
Classification
-
Logistic regression
-
Naïve Bayes
-
-
Regression
-
Decision trees
-
Recommendation engine (ALS)
-
Clustering (K-Means)
-
Latent Dirichlet Allocation (LDA)
-
An example of topic model and is used to classify text in a document to a particular topic.
-
Unsupervised
-
-
ML workflow utilities (pipelines, feature transformation, persistence)
-
SVD, PCA, statistics functions
The special thing here is that this can run on a cluster. A lot of these algorithms will not run on a cluster in their default state, if you were to run them on say scikit learn for example. Spark ML Lib allows you to process massive data sets and train machine learning models on them across an entire cluster in paralell. You can even include Spark within SageMaker.
Spark Structured Streaming
val inputDF = spark.readStream.json("s3://logs")
inputDF.groupBy($"action", window($"time", "1 hour")).count()
.writeStream.format("jdbc").start("jdbc:mysql//...")
Generally spark applications use a data set in their code, that refer to your data, which is treated a lot like a database table with Spark Streaming. That table just keeps on growing as new chunks of data are received in real time and you can query that data using windows of time, so for example, you could look at the past hour of data in your data stream coming in and just query that like a database
At a high level, that's how structured streaming works in spark. It models inbound streaming data as an unbounded database table that you can query whenever you want to.
Spark Streaming + Kinesis
Spark streaming does integrate with Kinesis. You could have a kinesis producer publishing data to a Kinesis data stream and there is a way in the KCL to actually implement a spark data set, built on top of that data stream, that you can just query like any other spark data set.
Amazon Kinesis is a fully managed service for real-time processing of streaming data at massive scale. The Kinesis receiver creates an input DStream using the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License (ASL). The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concepts of Workers, Checkpoints, and Shard Leases
Zeppelin + Spark
-
You can run spark code interactively within a notebook environment, in a browser (like you can in the Spark shell).
-
This speeds up your development cycle
-
And allows easy experimentation and exploration of your big data
-
-
Can execute SQL queries directly against your spark data using SparkSQL
-
Query results may be visualized in charts and graphs
- Can use MatplotLib and Seaborn
-
Makes Spark feel more like a data science tool!
EMR Notebook
EMR builds upon Zeppelin, with more AWS integration itself, with EMR Notebooks.
-
These are backed up to S3
-
You can provision entire clusters from within the notebook
- you can spin up an EMR notebook, spin up an EMR cluster and start a feeding task to that cluster all from an EMR notebook.
-
Hosted inside a VPC for security
-
Accessed only via AWS console
Some things you can do with EMR notebooks are build Apache Spark applications and run queries on them on your cluster. They support Python, PySpark, spark SQL, spark R, and Scala and it comes prepackaged with popular open-source graphical libraries from Anaconda to help you prototype your code and visualize your results and perform exploratory data analysis with Spark data frames.
They're all hosted outside of the EMR cluster itself and the notebook files are backed up S3, so that's how you can actually spin up a cluster from an EMR note book - the notebook itself is not on your cluster, it turns out.
EMR notebooks also allow multiple users from your organization to create their own notebooks and attach them to shared multi-tenant EMR clusters and just start experimenting with Apache Spark, collaboratively. Notebooks are provided at no additional charge to EMR customers.
EMR Security
-
IAM policies
-
IAM policies to grant or deny permissions and determine what actions the user can perform within Amazon EMR and other AWS resources
-
You can also combine IAM policies with tagging to control access on a cluster-by-cluster basis.
-
IAM roles for EMR FS request to Amazon S3, allow you to control whether cluster users can access files from within Amazon EMR based on user, group, or the location of the EMR FS data in Amazon S3.
-
-
Integrates with Kerberos
- provides strong authentication through secret key cryptography that ensures that passwords and other credentials aren't sent over the network in unencrypted format,
-
Integrates with SSH
-
Provide a secure way for your users to connect to the command line on cluster instances
-
SSH can also be used for tunneling to view the various web interfaces that exist on your EMR cluster.
-
-
IAM roles
-
Amazon EMR service role instance profile and service linked role control how Amazon EMR is able to access other AWS services
-
Each cluster and Amazon EMR must have a service role and a role for the Amazon EC2 instance profile.
-
IAM policies attached to these roles provide permissions for the cluster to interoperate with other AWS services on behalf of a user.
-
There is also an auto scaling role you need if your cluster uses automatic scaling
-
Service linked roles are used at the service for Amazon EMR has lost the ability to clean up Amazon EC2 resources.
-
EMR: Choosing Instance Types
-
Master node:
-
m4.large if < 50 nodes
-
m4.xlarge if > 50 nodes
-
-
Core & task nodes:
-
m4.large is usually good
-
If cluster waits a lot on external dependencies (i.e. a web crawler), t2.medium
-
Improved performance needed? m4.xlarge
-
Computation-intensive applications: high CPU instances
- or use a high memory instance if you know, ahead of time, that you have a very database or a memory caching intensive application.
-
Database, memory-caching applications: high memory instances
-
Network / CPU-intensive (NLP, ML) - cluster computer instances
-
-
Spot instances
-
Good choice for task nodes
-
Only use on core & master if you're testing or very cost-sensitive
- you're risking partial data loss by using a spot instance on a core or a master node
-
Deep Learning on EC2 / EMR
- EMR supports Apache MXNet out of the box and GPU instance types
- Appropriate instance types for deep learning:
- P3: 8 Tesla V100 GPU's
- P2: 16 K80 GPU's
- G3: 4 M60 GPU's (all Nvidia chips)
- Deep Learning AMI's
- May have TensorFlow already deployed