Implémenter MapReduce avec TPL (Task Parallel Library)

 

Dans cet article je vous propose une implémentation de MapReduce avec TPL (Task Parallel Library) du framework .NET 4.0. Si vous ne connaissez pas MapReduce, vous avez toujours la possibilité de consulter mon article (partie 1 et partie 2) dédié à sa présentation et à son mode opératoire.

Introduction :

J’ai voulu dans cet article proposer une implémentation possible du modèle de traitement parallèle MapReduce. Bien sûr il ne s’agit pas d’implémenter toutes les fonctionnalités du modèle créé par Google mais plutôt la fonction Map qui effectue des traitements en parallèle et la fonction Reduce qui, quant à elle, aggrège les résultats avant de nous les retourner.

Concevoir une application qui gère et synchronise plusieurs dizaines de threads reste une tâche difficile. Faire de la programmation parallèle est aussi une tâche difficile et les API de développement sont généralement complexes et demandent des efforts d’architecture/programmation/débug afin de rendre le tout utilisable.

Partant de ce principe j’ai fait le choix d’utiliser TPL (Task Parallel Library) du framework .NET 4.0. Cette librairie regroupe des classes permettant de paralléliser des traitements. Ces classes permettant une exécution des traitements en parallèle, gèrent également la présence de coeurs multiples de la machine exécutant le code de l’application.

TPL est composé de 8 classes et de 3 namespaces principaux (.NET 4) :

  1. System.Threading.Tasks : Ce namespace contient la base de Task Parallel Library (TPL), il contient la classe principale de la technologie : la classe Tasks (une sorte de “super Thread”)
  2. System.Collections.Concurrent : Ce namespace contient les collections pouvant être utilisé dans des contextes multithreads (thread safe).
  3. System.Linq : LINQ a été amélioré pour intégrer des aides au multithreading. Connu sous le nom de PLINQ (Parallel LINQ), il permet de paralléliser certaines étapes des requêtes LINQ afin de les rendre plus rapide.

Le namespace System.Threading.Task contient une classe Task. Cette classe représente une opération asynchrone. Ainsi on défini notre opération (tâche) et, lorsqu’on la démarre, celle-ci est assigné par le TaskScheduler à un des coeurs disponibles de la machine. Le fonctionnement de ce système est basé sur une  file de type FIFO (first in / first out) prenant en compte un niveau de priorité. Le Taskscheduler intègre un mécanisme de repartition de charge et, est capable de distribuer un grand nombre de tâches aux différentes cœurs et de “balancer” la charge si un cœur est plus occupé qu’un autre. L’utilisation de TPL reste simple,  la création et l’utilisation de la classe Task est également simple.

L’implémentation que je propose se base sur la classe Task et utilise ses mécanismes de synchronisations pour synchroniser les différentes opérations. Pour sa mise en oeuvre l’application, très simplissime, TPLMapReduce  implémente une classe qui contient les méthodes nécessaires à l’éxecution du code.

 

L’application TPLMapReduce :

Pour illustrer le fonctionnement de MapReduce, l’application TPLMapReduce aura comme objectif de compter les mots trouvés dans 4 documents de type texte. Deux implémentations sont proposées pour effectuer cette tâche ; la première implémentation consiste à créér plusieurs opérations de type Map puis aggréger le tous avec une seule opération de type Reduce. La deuxième implémentation consiste à attribuer à chaque docuement un ensemble de tâches Map puis une fonction Reduce dédiée afin d’aggréger les données de celui-ci. Au final le résultat global est le même sauf que je voulais garder les portes de l’imagination et de la créativité ouvrtes.

Pour commencer, la classe MapReduce est une classe absrtaite et ne peut pas être instanciée directement, ceci dans le but de permettre son extension indépendemment des classes qui pourront l’utiliser. Elle implémente deux propriétés Map et Reduce. Ces propriétés définissent les fonctions Map et Reduce respectivement. Ces fonctions seront définies dans les classes filles de la classe MapReduce.

La fonction Map accèpte un type de données en entrée (par exemple string) et retourne un type de données intermédiaire qui peut être du même type que les données en entrées ou un type différent. Ceci est la signature de notre fonction Map.

/// <summary>
/// Obtient ou définit la fonction Map
/// </summary>
public virtual Func<TInput, TPartialResult> Map { get; protected set; }

La fonction Reduce quant à elle doit prendre en entrée l’ensemble des résultats intermédiaires des tâches Map et nous retourner  un résultat. Pour cela, sa signature est la suivante :

/// <summary>
/// Obtient ou définit la fonction Reduce
/// </summary>
public virtual Func<TPartialResult[], TOutput> Reduce { get; protected set; }

Remarquez que la modification des deux propriétés Map et Reduce ont été restrintes aux classes filles uniquement.

Les opérations Map :

Les création des tâches Map est de la responsabilité de la méthode CreateMapTasks. Cette méthode accèpte un tableau contenant les donnés en entrées auxquelles le traitement doit être appliqué.

/// <summary>
/// Créé les différentes taches Map
/// </summary>
/// <param name="map">La fonction Map</param>
/// <param name="inputs">Tableau des données en entrée du type TInput</param>
/// <returns>Retourne un tableau contenant le résultats intermédiaires du traitement</returns>

protected virtual Task<TPartialResult>[] CreateMapTasks(TInput[] inputs)
{
   var tasks = new Task<TPartialResult>[inputs.Length];

       for (int i = 0; i < inputs.Length; ++i)
       {
           tasks[i] = Task.Factory.StartNew(() => Map(inputs[i]));
       }

   return tasks;
}

Le code de cette méthode est relativement simple. Ici je parcours le tableau des données en entrées et pour chacune des lignes du tableau je créé une nouvelle instance de la classe Task en appelant la méthode Factory de cette classe.  Une opération de type Task est un conteneur pour une opération qui sera exécutée de façon asynchrone. Il suffit donc de créer une instance de la classe Task, de lui assigner un traitement à réaliser et de l’exécuter. Le runtime .Net détermine automatiquement combien d’instances Task (ou tâches) il est possible d’exécuter simultanément et ceci en fonction du nombre de coeurs disponibles, puis les gère au travers du ThreadPool qui est le « scheduler » par défaut de le la TPL. Il est possible d’exécuter les Tasks dans un ordre donné, d’attendre la fin d’une Task pour en exécuter une autre, une Task peut retourner ou non un résultat, etc.

Dans l’extrait de code ci-dessus, j’ai fait appel à la méthode StartNew() afin de démarrer immédiatement la tâche sans être obligé par la suite de boucler sur l’ensemble des tâches et d’appeller la méthode Start() sur chacune des tâches.

A la fin du traitement de chacune des tâches, la propriété Result contiendra le résultat du traitement appliqué et son type sera celui définit par la fonction Map.

 

Les opérations Reduce :

Les opérations Reduce quant à elles doivent être exécutées une fois toutes les opérations Map ont terminé leur travail. Pour assurer cette synchronisation, j’ai utilisé la méthode ContinueWhenAll() de la Classe Factory imbriquée à la classe Task. En effet cette méthode va attendre que la totalité des taches du tableau mapTasks en entrée aient terminé leurs traitements avant d’appeler l’opération Reduce.

/// <summary>
/// Créé les différentes taches Reduce 
/// </summary>
/// <param name="mapTasks">Tableau des taches Map ayant été exécutées</param>
/// <returns>Retourne le résultat de l'application de la fonction Reduce typé selon TOutput</returns>

protected virtual Task<TOutput> CreateReduceTask(Task<TPartialResult>[] mapTasks)
{
    return Task.Factory.ContinueWhenAll(mapTasks, tasks => PerformReduce(tasks));
}

Exemple d’utilisation :

Les fichiers textes (format UTF-8) utilisés dans cet exemple sont disponibles sur le site du Projet Gutenberg.

Afin de compter tous les mots qui apparaisent dans ces documents, ceux-ci nécessitent une petite phase de prétraitement en vue de leur préparation à cette tâche. Pour cela, j’ai choisi de les lire en parallèle puis de les découper en lignes. A la fin de ce prétraitement, chaque ligne du tableau contiendra une ligne extraite du fichier texte. Bien sûr qu’il s’agit d’un traitement sommaire pour les besoins de l’illustration et non d’un traitement en profondeur.

         // Définition des fichiers dont les mots devront être comptés
            string[] files = { 
                                "pg1041.txt",
                                "pg1105.txt",
                                "pg1745.txt", 
                                "pg25979.txt"
                             };

            char[] delimiters = { ' ', ',', ';', '.' };
            string[][] inputs = new string[files.Length][];

            // Lecture en parallèle des fichiers, chaque fichiers est découpé en un tableau de lignes
            for (int i = 0; i < files.Length; i++)
            {
                inputs[i] = File.ReadLines(files[i]).AsParallel().SelectMany(line => line.Split(delimiters)).ToArray();
            }

La classe Exemple1 hérite de la classe MapReduce et permet de la concrétiser avec la définition des deux fonctions Map et Reduce. L’implémentation de la classe MapReduce implique la définition de trois types génériques :

  • Type en entrée, string dans notre cas.
  • Type Intermédiaire, un tableau de Clé,Valeur dans notre cas.
  • Type de retour, un tableau de Clé,Valeur dans notre cas.
    /// <summary>
    /// Exemple d'utilisation de l'implémentation PLinq MapReduce
    /// </summary>
    class Exemple1 : MapReduce<string, KeyValuePair<string, int>[], KeyValuePair<string, int>[]>
    {
        public Exemple1()
        {
            //Méthode Map : On découpe la chaine en entrée en mot entier et on génère une paire Clé,Valeur (valeur = 1)
            // pour chaque mot trouvé
            base.Map = (s => s.Split(" ".ToCharArray(), StringSplitOptions.RemoveEmptyEntries)
            .SelectMany(g => new[] { new KeyValuePair<string, int>(g, 1) })
            .ToArray());

            //Méthode Reduce : On regrouppe l'ensemble des Clés générées par la méthode Map et on effectue la somme
            //(ou le count car la valeur est égale à 1). Le tout est stocké dans un nouvel ensemble de Clés,Valeurs
            base.Reduce = (kv => kv.SelectMany(k => k)
            .GroupBy(g => g.Key)
            .SelectMany(g => new[] { new KeyValuePair<string, int>(g.Key, g.Count()) })
            .ToArray());
        }
    }

La fonction Map découpe chaque ligne de texte en entrée en plusieurs mots (deux mots sont séparés généralement par un espace), puis pour chaque mot trouvé créé une instance de la classe KeyValuePair en indiquant comme clé le mot trouvé et 1 comme valeur, le tout est retourné à l’appelant sous forme d’un tableau.

La fonction Reduce “applatit” le tableau en entrée contenant toutes les paires Clé,Valeur en le transformant en un seul tableau à deux dimenssions puis effectue une opération de regroupement. A nouveau une nouvelle paire de Clé,Valeur est créée afin de stocker chaque nouveau résultat.

Il ne reste plus qu’à instancier la classe Exemple1 et d’éxecuter sa méthode Run() pour récupérer le résultat.

// Exécution de l'exemple 1

Exemple1 ex = new Exemple1();
var results = ex.Run(inputs);

La classe Exemple 2 illustre la deuxième proposition d’implémentation en redéfinissant la méthode Run() de la classe MapReduce.

        /// <summary>
        /// Méthode Run de la classe de base redéfinie. 
        /// Pour chaque sous-tableau, on crée un ensemble de tâches Map et Une tâche Reduce
        /// </summary>
        /// <param name="inputs">Tableau contenant les différents tableaux des données en entrée ayant le type TInput</param>
        /// <returns>Retourne le résultats du type TOutput du traitement</returns>
        public override Task<KeyValuePair<string, int>[]> Run(string[][] inputs)
        {
            //Création du tableau qui contiendra les resultats des différents traitements
            var tasks = new Task<KeyValuePair<string, int>[]>[inputs.Length];

            for (int i = 0; i < inputs.Length; i++)
            {
                tasks[i] = Run(inputs[i]);
            }

           return CreateReduceTask(tasks);
        }
Console TPLMapReduce

Console TPLMapReduce

Conclusion :

Dans cet article j’ai proposé une implémentation simplissime de MapRéduce avec TPL (Task Parallel Library) bénéficiant ainsi de toutes ses fonctionnalités de création, de gestion et de synchronisation de threads. ous avons vu comment il est était de mettre en oeuvre cette librarie fournie par le framework .NET 4.0.

Dans cet article il ne s’agit que d’une introduction à cette implémentation et ne prétend pas être exhaustif. Cependant plusieurs améliorations peuvent être apportées en vue de son amélioration.

Download :

TPLMapReduce

42 Responses to “Implémenter MapReduce avec TPL (Task Parallel Library)”

  1. Bonjour,
    C’est moi qui suit nul ou le second exemple ne marche pas :-(
    Il y a un IndexOutOfRangeException dans CreateMapTasks.
    Pourriez vous jeter un œil ?
    Sinon Super démo.
    Merci d’avance…

Leave a Reply