MapReduce — Paradigme de traitement parallèle
Phases Map/Shuffle/Reduce · Combiner · Partitioner · Simulateur interactif · Travaux Pratiques · Quiz
Concept Phases Simulateur TP Quiz1 Concept et Historique
Qu'est-ce que MapReduce ?
MapReduce est un modèle de programmation pour traiter de très grands ensembles de données en parallèle sur un cluster. Il décompose tout traitement en deux opérations fondamentales :
MAP
Transforme chaque enregistrement en paires (clé, valeur)REDUCE
Agrège les valeurs ayant la même cléIdée clé : Plutôt que de déplacer les données vers le code, on déplace le code vers les données. Chaque nœud traite ses données locales → data locality.
Analogie simple — Compter des mots dans 1000 livres
Sans MapReduce : 1 personne lit 1000 livres et compte → très lentAvec MapReduce : 100 personnes (Mappers) lisent chacune 10 livres et notent les mots → 10 personnes (Reducers) comptent par lettre → résultat en parallèle ✓
Historique
2 Les Phases de MapReduce en Détail
Flux complet d'un job MapReduce
HDFS Input
┌──────────────────────────────────────────────────────────────────────────────┐
│ InputFormat → InputSplit → RecordReader → (key, value) pairs │
└──────────────────────────────────────────────────────────────────────────────┘
│
┌─────────▼──────────┐
│ MAP (Mapper.java) │ × N tâches parallèles
│ map(key, value) │
│ context.write(k, v) │
└─────────┬────────────┘
│ (intermediate key, value) pairs
┌─────────▼────────────────────────────┐
│ Buffer circulaire (100 MB défaut) │
│ → Spill sur disque si 80% plein │
│ → Tri + Combiner (local pre-reduce) │
└─────────┬────────────────────────────┘
│
┌─────────▼──────────────┐
│ SHUFFLE & SORT │
│ Partitioner → réseau │
│ Merge sort côté Reducer │
└─────────┬────────────────┘
│ (key, [v1, v2, v3, ...])
┌─────────▼────────────────┐
│ REDUCE (Reducer.java) │ × M tâches parallèles
│ reduce(key, Iterable<V>) │
│ context.write(k, result) │
└─────────┬────────────────┘
│
HDFS Output
┌──────────────────────────────────────────────────────────────────────────────┐
│ OutputFormat → part-r-00000, part-r-00001, ... (un fichier par Reducer) │
└──────────────────────────────────────────────────────────────────────────────┘
Phase MAP — Transformation
Le Mapper reçoit des paires (clé, valeur) en entrée et produit des paires (clé intermédiaire, valeur) en sortie. Chaque Mapper traite un seul InputSplit (≈ 1 bloc HDFS).
Cycle de vie d'un Mapper
setup()
Initialisation (connexion DB, chargement config)
map()
Traitement ligne par ligne — émission (k,v)
cleanup()
Libération ressources
InputFormats courants
| InputFormat | Clé | Valeur | Usage |
|---|---|---|---|
TextInputFormat | Offset | Ligne texte | Fichiers texte (défaut) |
KeyValueTextInputFormat | 1er champ | Reste | CSV/TSV |
SequenceFileInputFormat | Custom | Custom | Données binaires |
NLineInputFormat | Offset | N lignes | Contrôle split size |
Exemple — WordCount Mapper
private final Text word = new Text();
private final IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context ctx)
throws IOException, InterruptedException {
// value = une ligne du fichier texte
String[] words = value.toString().toLowerCase()
.split("\\s+");
for (String w : words) {
if (!w.isEmpty()) {
word.set(w);
ctx.write(word, one); // ("hello", 1)
}
}
}
}
Buffer circulaire (mapreduce.task.io.sort.mb)
Les sorties du Mapper sont d'abord écrites dans un buffer circulaire en RAM (100 MB défaut). Quand il atteint 80%, un spill sur disque est déclenché. À la fin du Mapper, tous les spills sont fusionnés et triés.Shuffle & Sort — La phase cachée
Invisible pour le développeur, mais la plus coûteuse. Les données intermédiaires sont triées, partitionnées et transférées sur le réseau depuis les Mappers vers les Reducers.
Côté Mapper
Côté Reducer
partition = Math.abs(key.hashCode() % numReducers)
// Exemple avec 3 Reducers :
"hadoop" hash=231 → 231 % 3 = 0 → Reducer 0
"spark" hash=114 → 114 % 3 = 0 → Reducer 0
"hive" hash=310 → 310 % 3 = 1 → Reducer 1
"yarn" hash=422 → 422 % 3 = 2 → Reducer 2
Data Skew — Le problème numéro 1
Si une clé est très fréquente (ex: "the" dans WordCount), 1 Reducer reçoit des millions de valeurs pendant que les autres sont libres → goulot d'étranglement. Solutions : Combiner, clé composite, Partitioner custom.Configs clés Shuffle
mapreduce.map.sort.spill.percent = 0.8
mapreduce.reduce.shuffle.parallelcopies = 5
Phase REDUCE — Agrégation
Le Reducer reçoit (clé, Iterable<valeurs>) — toutes les valeurs ayant la même clé — et produit le résultat final écrit dans HDFS.
Exemple — WordCount Reducer
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context ctx) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get(); // additionne tous les 1
}
result.set(sum);
ctx.write(key, result); // ("hello", 42)
}
}
Nombre de Reducers — Quel impact ?
| Reducers | Cas d'usage | Comportement |
|---|---|---|
| 0 | Map-Only job | Pas de shuffle, sortie directe en HDFS. Très rapide ! |
| 1 | Petit volume | Sortie triée globalement. 1 seul fichier part-r-00000 |
| N | Production | Parallélisme max. N fichiers en sortie. |
Règle empirique — Nombre de Reducers
numReducers ≈ 0.95 × (nodes × mapred.tasktracker.reduce.tasks.maximum)Ou plus simplement : visez que chaque Reducer traite entre 1 et 5 minutes et produit un fichier de 128 MB à 1 GB.
OutputFormats courants
| OutputFormat | Format sortie |
|---|---|
TextOutputFormat | clé TAB valeur (défaut) |
SequenceFileOutputFormat | Binaire compact (réutilisable) |
NullOutputFormat | Aucune sortie (compteurs uniquement) |
MultipleOutputs | Plusieurs fichiers de sortie |
Combiner — Mini-Reducer local
Le Combiner s'exécute sur chaque Mapper avant le transfert réseau. Il réduit considérablement la quantité de données à shuffler.
Avec Combiner : Mapper envoie → ("hello",3) = 1 seule paire ✓
Partitioner custom
Par défaut, les clés sont distribuées par hash. Si les données sont déséquilibrées, un Partitioner custom évite le data skew.
public int getPartition(Text key, IntWritable val, int numReducers) {
char first = key.toString().charAt(0);
if (first < 'h') return 0 % numReducers;
if (first < 'p') return 1 % numReducers;
return 2 % numReducers;
}
}
Paramètres de performance clés
| Paramètre | Défaut | Impact | Conseil |
|---|---|---|---|
mapreduce.task.io.sort.mb | 100 MB | Taille buffer Mapper | Augmenter si RAM disponible (256-512) |
mapreduce.map.sort.spill.percent | 0.80 | Seuil de spill | 0.9 pour moins de spills |
mapreduce.reduce.shuffle.parallelcopies | 5 | Copies réseau parallèles | 10-20 pour gros clusters |
mapreduce.map.memory.mb | 1024 MB | RAM par Mapper | Adapter à la taille des données |
mapreduce.reduce.memory.mb | 1024 MB | RAM par Reducer | 2048+ pour shuffles lourds |
3 Simulateur MapReduce Interactif
Simulez WordCount sur votre texte
4 Travaux Pratiques
TP 1 — WordCount complet avec Combiner
DébutantImplémenter le job MapReduce classique WordCount complet avec Driver, Mapper, Reducer et Combiner. Filtrer les mots courts (< 3 chars).
TP 2 — Température maximale par ville
IntermédiaireFichier CSV : "Paris,18.5\nLyon,22.1\nParis,21.3\nLyon,19.8". Calculer la température maximale par ville avec un Reducer.
TP 3 — Filtrage logs Apache (Map-Only)
IntermédiaireFiltrer les lignes de logs Apache contenant "ERROR" ou "WARN" depuis un fichier HDFS. Utiliser 0 Reducer pour maximiser la performance.
TP 4 — Index inversé (mot → documents)
AvancéConstruire un index inversé : pour chaque mot, lister les fichiers HDFS qui le contiennent. Utilisé par les moteurs de recherche.