{ "cells": [ { "cell_type": "markdown", "id": "988e4c4b", "metadata": {}, "source": [ "# PageRank using MapReduce" ] }, { "cell_type": "markdown", "id": "40d3e93f", "metadata": {}, "source": [ "In this section we will illustrate the computation of Taxed PageRank in a distributed way using MapReduce in **pyspark**. Note however that this only illustrated the case when the PageRank vector $v$ fits in memory. For cases where $v$ does not fit in memory, techniques like *striping* and *blocking* should be employed, as discussed in the previous section." ] }, { "cell_type": "code", "execution_count": 1, "id": "e254efe3", "metadata": {}, "outputs": [], "source": [ "from pyspark import SparkContext\n", "import numpy as np\n", "import matplotlib.pyplot as plt\n", "import networkx as nx\n", "import csv\n", "\n", "from alis import link_analysis # our Link Analysis module" ] }, { "cell_type": "markdown", "id": "4b6ba960", "metadata": {}, "source": [ "## Illustration of Matrix-Vector Multiplication via MapReduce" ] }, { "cell_type": "markdown", "id": "c9d985b9", "metadata": {}, "source": [ "First, let's demonstrate the implementation of a matrix-vector multiplication in MapReduce." ] }, { "cell_type": "code", "execution_count": 2, "id": "ffd6e516", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING: An illegal reflective access operation has occurred\n", "WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)\n", "WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n", "WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n", "WARNING: All illegal access operations will be denied in a future release\n", "22/03/29 13:29:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n", "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n", "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "22/03/29 13:29:54 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.\n" ] } ], "source": [ "sc = SparkContext('local[*]')" ] }, { "cell_type": "markdown", "id": "bd8aaee2", "metadata": {}, "source": [ "Let's load an ilustrative example from Leskovec et al. [*Mining of massive datasets*](http://www.mmds.org/) {cite:ps}`leskovec2020mining` and call it Graph 4." ] }, { "cell_type": "code", "execution_count": 3, "id": "3902a19e", "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "G4 = nx.DiGraph()\n", "G4.add_nodes_from([\"A\",\"B\",\"C\",\"D\"])\n", "G4.add_edges_from([\n", " (\"A\",\"B\"), (\"A\",\"C\"), (\"A\",\"D\"), \n", " (\"B\",\"A\"), (\"B\",\"D\"),\n", " (\"C\",\"A\"), \n", " (\"D\",\"B\"), (\"D\",\"C\")\n", "])\n", "\n", "plt.figure() \n", "plt.title(\"Graph 1. A Graph as a hypothetical representation of the web\")\n", "nx.draw(G4, node_size=500, node_color='orange', with_labels=True, font_weight='bold', arrowsize=20)\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "0c59298b", "metadata": {}, "source": [ "Define the transition matrix $M$ using our `link_analysis` module." ] }, { "cell_type": "code", "execution_count": 4, "id": "95689ae5", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([[0. , 0.5 , 1. , 0. ],\n", " [0.33333333, 0. , 0. , 0.5 ],\n", " [0.33333333, 0. , 0. , 0.5 ],\n", " [0.33333333, 0.5 , 0. , 0. ]])" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "M = link_analysis.transition_matrix(G4)\n", "M" ] }, { "cell_type": "markdown", "id": "03c13dbb", "metadata": {}, "source": [ "Define the initial vector $v$" ] }, { "cell_type": "code", "execution_count": 5, "id": "a05dd78b", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "(4, array([1., 1., 1., 1.]))" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "n = M.shape[0]\n", "v = np.ones(n) \n", "n, v" ] }, { "cell_type": "markdown", "id": "6f542ed8", "metadata": {}, "source": [ "Define list of triples ($i$, $j$, $m_{ij}$) to be used by the Map function. The first two element of the triple contains the indices to the transition matrix while the third element refers to the value. " ] }, { "cell_type": "code", "execution_count": 6, "id": "93693f7e", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[(0, 0, 0.0),\n", " (0, 1, 0.5),\n", " (0, 2, 1.0),\n", " (0, 3, 0.0),\n", " (1, 0, 0.3333333333333333),\n", " (1, 1, 0.0),\n", " (1, 2, 0.0),\n", " (1, 3, 0.5),\n", " (2, 0, 0.3333333333333333),\n", " (2, 1, 0.0),\n", " (2, 2, 0.0),\n", " (2, 3, 0.5),\n", " (3, 0, 0.3333333333333333),\n", " (3, 1, 0.5),\n", " (3, 2, 0.0),\n", " (3, 3, 0.0)]" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "M_elements = []\n", "for i in range(n):\n", " for j in range(n):\n", " M_elements.append( (i,j, M[i,j]))\n", "M_elements" ] }, { "cell_type": "markdown", "id": "06f784e5", "metadata": {}, "source": [ "Let's save this list as we will use it later when we illustrate the MapReduce reading from a file." ] }, { "cell_type": "code", "execution_count": 7, "id": "26850f6b", "metadata": {}, "outputs": [], "source": [ "with open(\"graph4.csv\", \"w\") as f:\n", " csv_writer = csv.writer(f)\n", " for row in M_elements:\n", " csv_writer.writerow(row)" ] }, { "cell_type": "markdown", "id": "e8aa9717", "metadata": {}, "source": [ "Parallelize the list of triples" ] }, { "cell_type": "code", "execution_count": 8, "id": "80002df8", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "M_elements = sc.parallelize(M_elements)\n", "M_elements" ] }, { "cell_type": "code", "execution_count": 9, "id": "aef1ce58", "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/plain": [ "pyspark.rdd.RDD" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "type(M_elements)" ] }, { "cell_type": "code", "execution_count": 10, "id": "6735094d", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] }, { "data": { "text/plain": [ "[(0, 0, 0.0), (0, 1, 0.5), (0, 2, 1.0), (0, 3, 0.0)]" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "M_elements.take(4)" ] }, { "cell_type": "markdown", "id": "683ad9ec", "metadata": {}, "source": [ "**Map** " ] }, { "cell_type": "markdown", "id": "f63653a0", "metadata": {}, "source": [ "Apply Map function calculating ($i$, $m_{ij}v_j$)" ] }, { "cell_type": "code", "execution_count": 11, "id": "08e016e5", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[(0, 0.0),\n", " (0, 0.5),\n", " (0, 1.0),\n", " (0, 0.0),\n", " (1, 0.3333333333333333),\n", " (1, 0.0),\n", " (1, 0.0),\n", " (1, 0.5),\n", " (2, 0.3333333333333333),\n", " (2, 0.0),\n", " (2, 0.0),\n", " (2, 0.5),\n", " (3, 0.3333333333333333),\n", " (3, 0.5),\n", " (3, 0.0),\n", " (3, 0.0)]" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Note that:\n", "# i = x[0]\n", "# j = x[1]\n", "# mij = x[2]\n", "M_elements.map(lambda x: (x[0], x[2]*v[x[1]])).collect()" ] }, { "cell_type": "markdown", "id": "3fe6d59e", "metadata": {}, "source": [ "**Reduce**" ] }, { "cell_type": "markdown", "id": "9f78d898", "metadata": {}, "source": [ "The Reduce function adds the values in the pairs produced by the Map function. We can chain the map function above with `reduceByKey`." ] }, { "cell_type": "code", "execution_count": 12, "id": "4472cca7", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] }, { "data": { "text/plain": [ "[(0, 1.5),\n", " (1, 0.8333333333333333),\n", " (2, 0.8333333333333333),\n", " (3, 0.8333333333333333)]" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "M_elements.map(lambda x: (x[0], x[2]*v[x[1]])\n", " ).reduceByKey(lambda x, y: x+y\n", " ).collect()" ] }, { "cell_type": "markdown", "id": "6b31d8ee", "metadata": {}, "source": [ "The key-value pairs above are ($i$, $v'_i$) defining $v'=Mv$" ] }, { "cell_type": "markdown", "id": "eab91933", "metadata": {}, "source": [ "## Reading Graph Information from CSV file" ] }, { "cell_type": "markdown", "id": "5a5ecb1e", "metadata": {}, "source": [ "Here we implement the above steps while reading from a CSV file. Note that this still just represents Matrix-vector multiplication. Let's have a quick look at our CSV file." ] }, { "cell_type": "code", "execution_count": 13, "id": "f1ac4abe", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0,0,0.0\r", "\r\n", "0,1,0.5\r", "\r\n", "0,2,1.0\r", "\r\n", "0,3,0.0\r", "\r\n", "1,0,0.3333333333333333\r", "\r\n", "1,1,0.0\r", "\r\n", "1,2,0.0\r", "\r\n", "1,3,0.5\r", "\r\n", "2,0,0.3333333333333333\r", "\r\n", "2,1,0.0\r", "\r\n", "2,2,0.0\r", "\r\n", "2,3,0.5\r", "\r\n", "3,0,0.3333333333333333\r", "\r\n", "3,1,0.5\r", "\r\n", "3,2,0.0\r", "\r\n", "3,3,0.0\r", "\r\n" ] } ], "source": [ "!cat graph4.csv" ] }, { "cell_type": "code", "execution_count": 14, "id": "e6cd3149", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[(0, 1.5),\n", " (2, 0.8333333333333333),\n", " (1, 0.8333333333333333),\n", " (3, 0.8333333333333333)]" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# load data from a CSV file\n", "M_tuples= sc.textFile(\"graph4.csv\")\n", "\n", "# get the number of nodes of the graph \n", "n = round(np.sqrt(M_tuples.count()))\n", "\n", "# initialize our vector\n", "v = np.ones(n) \n", "\n", "# calculate M*v\n", "(M_tuples.map(lambda x: x.split(',')) # CSV file contains a string of triple per row; we split it by comma\n", " .map(lambda x: (int(x[0]), int(x[1]), float(x[2]))) # convert string to triple of (int, int, float)\n", " .map(lambda x: (x[0], x[2]*v[x[1]])) # (i, M_ij*v_j): equal to (i, Mv_ij)\n", " .reduceByKey(lambda x, y: x+y) # sum all values for a given key \n", " .collect())" ] }, { "cell_type": "markdown", "id": "cab9e530", "metadata": {}, "source": [ "We can check this quickly" ] }, { "cell_type": "code", "execution_count": 15, "id": "1fb2c814", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([1.5 , 0.83333333, 0.83333333, 0.83333333])" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "M.dot(v)" ] }, { "cell_type": "markdown", "id": "d8eb3586", "metadata": {}, "source": [ "## Taxed PageRank Update" ] }, { "cell_type": "markdown", "id": "8f33e7d6", "metadata": {}, "source": [ "Here we illustrate how to do one update of the Taxed PageRank algorithm" ] }, { "cell_type": "code", "execution_count": 16, "id": "e2e40a73", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[(0, 1.2500000000000002),\n", " (2, 0.7166666666666666),\n", " (1, 0.7166666666666666),\n", " (3, 0.7166666666666666)]" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "beta = 0.8\n", "M_tuples= sc.textFile(\"graph1.csv\")\n", "n = round(np.sqrt(M_tuples.count()))\n", "v = np.ones(n) \n", "Mv = (M_tuples.map(lambda x: x.split(','))\n", " .map(lambda x: (int(x[0]), int(x[1]), float(x[2])))\n", " .map(lambda x: (x[0], x[2]*v[x[1]]))\n", " .reduceByKey(lambda x, y: x+y) \n", " .map(lambda x: (x[0], beta*x[1]+(1-beta)/n)) # for each Mv_ij, compute beta*Mv_ij + (1-beta)/n\n", " .collect())\n", "Mv" ] }, { "cell_type": "markdown", "id": "f9504049", "metadata": {}, "source": [ "We can check this in the following way" ] }, { "cell_type": "code", "execution_count": 17, "id": "c44f48d3", "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/plain": [ "array([1.25 , 0.71666667, 0.71666667, 0.71666667])" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "beta * M.dot(v) + (1-beta)/n" ] }, { "cell_type": "markdown", "id": "8fcbbbef", "metadata": {}, "source": [ "Or, we can call our distributed_taxed_page_rank function, with one iteration." ] }, { "cell_type": "code", "execution_count": 18, "id": "814758c9", "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/plain": [ "array([1.25 , 0.71666667, 0.71666667, 0.71666667])" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "link_analysis.taxed_page_rank(link_analysis.transition_matrix(G4), max_iter=1)" ] }, { "cell_type": "markdown", "id": "a27d76ff", "metadata": {}, "source": [ "## Function for Computing PageRank using MapReduce" ] }, { "cell_type": "markdown", "id": "2da48dd7", "metadata": {}, "source": [ "Finally, we define a function which we can use to compute page rank in a distributed way, given a CSV file.Note that that this assumes that the vector of PageRanks fit in memory" ] }, { "cell_type": "code", "execution_count": 19, "id": "7d509f9f", "metadata": {}, "outputs": [], "source": [ "def distributed_taxed_page_rank(filename, beta=0.8, tol=10**-6, max_iter=100):\n", " \"\"\" Distributedly compute the Taxed PageRank of a given Transition Matrix using pyspark \n", " Note that that this assumes that the vector of PageRanks fit in memory\n", " \n", " Parameters\n", " ----------\n", " filename : string\n", " CSV file containing triples of (i, j, M_ij) representing the (i,j) index to the transition matrix M\n", " tol : float\n", " Tolerance: Iteration stops if the distance between previous and updated PageRank vectors \n", " goes below this value\n", " max_iter : integer\n", " Maximum number of iterations\n", " Returns\n", " -------\n", " v : numpy array\n", " Vector of size n containing the ordinary PageRank values \n", " \"\"\" \n", "\n", " M_tuples= sc.textFile(\"graph1.csv\")\n", " n = round(np.sqrt(M_tuples.count()))\n", " v = np.ones(n) \n", " delta = 1/tol # initialize vector difference to a large number\n", " i = 0\n", " while delta > tol:\n", " i += 1\n", " prev_v = v \n", " v = (M_tuples.map(lambda x: x.split(','))\n", " .map(lambda x: (int(x[0]), int(x[1]), float(x[2])))\n", " .map(lambda x: (x[0], x[2]*v[x[1]]))\n", " .reduceByKey(lambda x, y: x+y) \n", " .map(lambda x: (x[0], beta*x[1]+(1-beta)/n))\n", " .collect())\n", " v = np.array([v[j][1] for j in range(n)])\n", " delta = np.sum(np.abs(v-prev_v)) # compute L1 norm \n", " if i >= max_iter:\n", " break\n", " return v " ] }, { "cell_type": "markdown", "id": "593d44d6", "metadata": {}, "source": [ "Let's run it for one iteration to check if we are getting the correct answer." ] }, { "cell_type": "code", "execution_count": 20, "id": "c8a904fa", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([1.25 , 0.71666667, 0.71666667, 0.71666667])" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "distributed_taxed_page_rank(\"graph1.csv\", max_iter=1)" ] }, { "cell_type": "markdown", "id": "cb3c6041", "metadata": {}, "source": [ "Let's now run it to convergence" ] }, { "cell_type": "code", "execution_count": 21, "id": "f966f4e3", "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/plain": [ "array([0.3214298 , 0.22619129, 0.22619129, 0.22619129])" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "distributed_taxed_page_rank(\"graph1.csv\")" ] }, { "cell_type": "markdown", "id": "861f7169", "metadata": {}, "source": [ "Again, we can compare this to the non-distributed procedure" ] }, { "cell_type": "code", "execution_count": 22, "id": "255ea220", "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/plain": [ "array([0.3214298 , 0.22619129, 0.22619129, 0.22619129])" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "link_analysis.taxed_page_rank(link_analysis.transition_matrix(G4))" ] }, { "cell_type": "markdown", "id": "85666368", "metadata": {}, "source": [ "**EXERCISES**\n", "\n", "1. Give Graph 1 in the previous section, create a CSV file containing the triples that could be used to be fed to a distributed computation of PageRank.\n", "2. Given the CSV file in no.1 above, read the CSV file and perform 1 round of the matrix-vector multiplication $Mv$ using MapReduce.\n", "3. Using again the CSV file in no.1, compute the PageRank using `distributed_taxed_page_rank` then compare the answer to `link_analysis.taxed_page_rank`.\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.12" }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": true, "sideBar": true, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": false, "toc_position": { "height": "calc(100% - 180px)", "left": "10px", "top": "150px", "width": "318.188px" }, "toc_section_display": true, "toc_window_display": true } }, "nbformat": 4, "nbformat_minor": 5 }