-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmost_obscure_superheroes.py
More file actions
38 lines (27 loc) · 1.32 KB
/
Copy pathmost_obscure_superheroes.py
File metadata and controls
38 lines (27 loc) · 1.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import codecs
spark = SparkSession.builder.appName('MostObscureSuperheroes').getOrCreate()
schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True)])
names = spark.read.schema(schema).option('sep', ' ').csv(
'file:///Users/brian/code/from_courses/SparkCourse/Marvel-Names')
lines = spark.read.text(
'file:///Users/brian/code/from_courses/SparkCourse/Marvel-Graph'
)
connections = lines.withColumn('id', func.split(func.trim(func.col('value')), ' ')[0]) \
.withColumn('connections', func.size(func.split(func.trim(func.col('value')), ' ')) - 1) \
.groupBy('id').agg(func.sum('connections').alias('connections'))
min_connection = connections.select('connections').agg(
func.min('connections')).first()[0]
most_obscures = connections.sort(func.col('connections').asc()).filter(
func.col('connections') == min_connection)
# Add a superhero_name column using our new udf
most_obscures_names = most_obscures.join(names, 'id').select('name')
print('The most obscure superheroes of all time with ' +
str(min_connection) + ' connection(s) are:')
most_obscures_names.show()
# Stop the session
spark.stop()