Social Network Analysis using GraphFrames#

Graph analytics offers a wide range of applications such as optimization of network flow and information propagation and fraud and anomaly detection. Because to the advent of social networks and the Internet of Things, we now have massive web-scale graphs with millions to billions of nodes and edges. We need tools to efficiently analyze such large graphs.

Databricks launched GraphFrames which implements graph queries and pattern matching on top of Spark SQL to ease graph analytics. GraphFrames is a graph library built based on DataFrames. It benefits from the scalability and high performance of DataFrames, and provides high-level APIs for graph processing available from Scala, Java, and Python.

Creating GraphFrames#

We can create GraphFrames from vertex and edge DataFrames. A vertex DataFrame should contain a special column named “id” which enumerates unique IDs for each node in the graph. An edge DataFrame should contain two special columns: “src” (source node ID of edge) and “dst” (destination node ID of edge). Both the vertex and edge DataFrames can have arbitrary other columns which may represent node and edge attributes. These can be the name and age for the node attributes and relationship of the nodes as edge attribute.

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
conf = (SparkConf()
       .setMaster('local[*]')
       .set('spark.jars.packages',
           'graphframes:graphframes:0.8.2-spark3.1-s_2.12'))
sc = SparkContext(conf=conf)
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
:: loading settings :: url = jar:file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/phd/rroxasvillanueva/.ivy2/cache
The jars for the packages stored in: /home/phd/rroxasvillanueva/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7c5d398e-cdef-40e9-b390-2e134244e177;1.0
	confs: [default]
	found graphframes#graphframes;0.8.2-spark3.1-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 179ms :: artifacts dl 7ms
	:: modules in use:
	graphframes#graphframes;0.8.2-spark3.1-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-7c5d398e-cdef-40e9-b390-2e134244e177
	confs: [default]
	0 artifacts copied, 2 already retrieved (0kB/7ms)
22/03/30 21:41:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
spark = SparkSession(sc)

Example 1#

To illustrate, let us consider the sample social network in Fig. 24.

../_images/socialnet.png

Fig. 24 Sample directed social network#

You may also import data from a csv-file or a Parquet-file into a DataFrame.

First, we create the nodes and edges via dataframes.

# Vertex DataFrame
v = spark.createDataFrame([
  ("a", "Amihan", 19),
  ("b", "Bagwis", 25),
  ("k", "Kidlat", 39),
  ("d", "Danaya", 18),
  ("e", "Elias", 50),
  ("g", "Gloria", 38),
  ("h", "Hiraya", 35)], ["id", "name", "age"])
# Edge DataFrame
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("a", "d", "friend"),
  ("k", "a", "family"),
  ("k", "h", "friend"),
  ("d", "g", "family"),
  ("d", "h", "family"),
  ("d", "g", "family"),
  ("e", "d", "family"),
  ("e", "h", "friend"),
  ("g", "h", "friend"),
  ("h", "e", "friend")], ["src", "dst", "relationship"])
from graphframes import *

# Create a GraphFrame
g = GraphFrame(v, e)

We show here the nodes of our graph and its attributes.

g.vertices.show()
[Stage 0:>                                                          (0 + 1) / 1]
+---+------+---+
| id|  name|age|
+---+------+---+
|  a|Amihan| 19|
|  b|Bagwis| 25|
|  k|Kidlat| 39|
|  d|Danaya| 18|
|  e| Elias| 50|
|  g|Gloria| 38|
|  h|Hiraya| 35|
+---+------+---+
                                                                                

The edges signify the relationship between the nodes.

g.edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  a|  d|      friend|
|  k|  a|      family|
|  k|  h|      friend|
|  d|  g|      family|
|  d|  h|      family|
|  d|  g|      family|
|  e|  d|      family|
|  e|  h|      friend|
|  g|  h|      friend|
|  h|  e|      friend|
+---+---+------------+

We can also determine different network metrics, such as degree, in- and out-degree.

g.degrees.show()
                                                                                
+---+------+
| id|degree|
+---+------+
|  g|     3|
|  k|     2|
|  e|     3|
|  h|     5|
|  d|     5|
|  b|     1|
|  a|     3|
+---+------+
g.inDegrees.show()
+---+--------+
| id|inDegree|
+---+--------+
|  g|       2|
|  e|       1|
|  h|       4|
|  d|       2|
|  b|       1|
|  a|       1|
+---+--------+
g.outDegrees.show()
+---+---------+
| id|outDegree|
+---+---------+
|  g|        1|
|  k|        2|
|  e|        2|
|  h|        1|
|  d|        3|
|  a|        2|
+---+---------+

Page Rank#

PageRank is a metric for determining the centrality of nodes in a network. It ranks nodes according to their network placements. The strategy presupposes a recursive definition of significance or centrality: Numerous significant nodes point to nodes that are themselves significant. PageRank was first established for directed networks since it was used to rank websites based on their hyperlinks; however, it naturally generalizes to undirected and even weighted networks via a random-walk formulation.

PageRank was discussed in the Link Analysis Chapter. Here we just show how to implement it in GraphFrames.

Example 2#

PageRank was discussed in the Link Analysis Chapter. Here we just show how to implement it in GraphFrames.

pr = g.pageRank(resetProbability=0.15, tol=0.01)
## look at the pagerank score for every vertex
pr.vertices.show()
## look at the weight of every edge
pr.edges.show()
                                                                                
+---+------+---+------------------+
| id|  name|age|          pagerank|
+---+------+---+------------------+
|  g|Gloria| 38| 0.854458432231672|
|  h|Hiraya| 35|2.1591976714911048|
|  b|Bagwis| 25|0.3167021972103069|
|  e| Elias| 50| 2.017276721802774|
|  a|Amihan| 19|0.2810747410040871|
|  k|Kidlat| 39|0.1972454322835699|
|  d|Danaya| 18|1.1740448039764855|
+---+------+---+------------------+
                                                                                
+---+---+------------+------------------+
|src|dst|relationship|            weight|
+---+---+------------+------------------+
|  a|  b|      friend|               0.5|
|  g|  h|      friend|               1.0|
|  d|  g|      family|0.3333333333333333|
|  d|  g|      family|0.3333333333333333|
|  d|  g|      family|0.3333333333333333|
|  d|  g|      family|0.3333333333333333|
|  k|  a|      family|               0.5|
|  e|  h|      friend|               0.5|
|  e|  d|      family|               0.5|
|  a|  d|      friend|               0.5|
|  d|  h|      family|0.3333333333333333|
|  k|  h|      friend|               0.5|
|  h|  e|      friend|               1.0|
+---+---+------------+------------------+

Triangle Counting#

Triangle counting is a critical aspect of graph mining. It is needed in calculating two frequently used metrics in complex network analysis, the graph’s transitivity ratio and clustering coefficient. Triangles have been effectively used in a variety of real-world applications, like community detection in social networks, detecting spamming behavior, revealing the web’s hidden theme organization, and recommending links in online social networks. Additionally, the triangle count is a frequently used network statistic in models of exponential random graphs. In this section, we will demonstrate how to count triangles in GraphFrames.

g.triangleCount().show()
                                                                                
+-----+---+------+---+
|count| id|  name|age|
+-----+---+------+---+
|    1|  g|Gloria| 38|
|    0|  k|Kidlat| 39|
|    1|  e| Elias| 50|
|    2|  h|Hiraya| 35|
|    2|  d|Danaya| 18|
|    0|  b|Bagwis| 25|
|    0|  a|Amihan| 19|
+-----+---+------+---+

Exercise#

Implement the codes discussed here to analyze real-world data.