Friday, November 17, 2017

Complete spark 2.2.0 installation guide on windows 10.

Goal
----
The step by step Apache Spark 2.2.0 installation guide on windows 10.


Steps
----
Download winutils.exe from git, https://github.com/steveloughran/winutils
E.g.: https://github.com/steveloughran/winutils/blob/master/hadoop-2.7.1/bin/winutils.exe

mkdir /tmp/hive

winutils.exe chmod -R 777 E:\tmp\hive
or
winutils.exe chmod -R 777 /tmp/hive

set HADOOP_HOME=E:\movie\spark\hadoop
mkdir %HADOOP_HOME%\bin
copy winutils.exe to %HADOOP_HOME%\bin

Download Spark: spark-2.2.0-bin-hadoop2.7.tgz from http://spark.apache.org/downloads.html
cd E:\movie\spark\
# in MINGW64 (git / cywin)
tar -zxvf spark-2.2.0-bin-hadoop2.7.tgz
# or use 7-Zip

cd E:\movie\spark\spark-2.2.0-bin-hadoop2.7
bin\pyspark.cmd



Notes
----
%HADOOP_HOME%\bin\winutils.exe must be locatable.

Folder "E:\movie\spark\hadoop" is just an example, it can be any folder.

Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+.
http://spark.apache.org/docs/latest/

Reference
----
https://wiki.apache.org/hadoop/WindowsProblems


Here is the example output when start pyspark successfully:

E:\movie\spark\spark-2.2.0-bin-hadoop2.7>bin\pyspark.cmd

Python 3.6.1 (v3.6.1:69c0db5, Mar 21 2017, 18:41:36) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/11/17 19:07:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/11/17 19:07:36 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Python version 3.6.1 (v3.6.1:69c0db5, Mar 21 2017 18:41:36)
SparkSession available as 'spark'.
>>>
>>> textFile = spark.read.text("README.md")
17/11/17 19:08:03 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
>>> textFile.count()
103

>>> textFile.select(explode(split(textFile.value, "\s+")).name("word")).groupBy("word").count().show()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
NameError: name 'explode' is not defined

>>> from pyspark.sql.functions import *
>>> textFile.select(explode(split(textFile.value, "\s+")).name("word")).groupBy("word").count().show()
+--------------------+-----+
|                word|count|
+--------------------+-----+
|              online|    1|
|              graphs|    1|
|          ["Parallel|    1|
|          ["Building|    1|
|              thread|    1|
|       documentation|    3|
|            command,|    2|
|         abbreviated|    1|
|            overview|    1|
|                rich|    1|
|                 set|    2|
|         -DskipTests|    1|
|                name|    1|
|page](http://spar...|    1|
|        ["Specifying|    1|
|              stream|    1|
|                run:|    1|
|                 not|    1|
|            programs|    2|
|               tests|    2|
+--------------------+-----+
only showing top 20 rows

>>>

from pyspark.sql.functions import *


# module for pyspark,
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql import *

Employee = Row("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")

emp1 = Employee(7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10)
emp2 = Employee(7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20)

df1 = sqlContext.createDataFrame([emp1, emp2])

sparkSession = SparkSession.builder.master("local").appName("Window Function").getOrCreate()

empDF = sparkSession.createDataFrame([
      Employee(7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10),
      Employee(7499, "ALLEN", "SALESMAN", 7698, "20-Feb-81", 1600, 300, 30),
      Employee(7521, "WARD", "SALESMAN", 7698, "22-Feb-81", 1250, 500, 30),
      Employee(7566, "JONES", "MANAGER", 7839, "2-Apr-81", 2975, 0, 20),
      Employee(7654, "MARTIN", "SALESMAN", 7698, "28-Sep-81", 1250, 1400, 30),
      Employee(7698, "BLAKE", "MANAGER", 7839, "1-May-81", 2850, 0, 30),
      Employee(7782, "CLARK", "MANAGER", 7839, "9-Jun-81", 2450, 0, 10),
      Employee(7788, "SCOTT", "ANALYST", 7566, "19-Apr-87", 3000, 0, 20),
      Employee(7839, "KING", "PRESIDENT", 0, "17-Nov-81", 5000, 0, 10),
      Employee(7844, "TURNER", "SALESMAN", 7698, "8-Sep-81", 1500, 0, 30),
      Employee(7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20)
])


partitionWindow = Window.partitionBy("deptno").orderBy(desc("empno"))
sumTest = sum("sal").over(partitionWindow)
empDF.select("*", sumTest.name("PartSum")).show()

partitionWindowRow = Window.partitionBy("deptno").orderBy(desc("sal")).rowsBetween(-1, 1)
sumTest = sum("sal").over(partitionWindowRow)
empDF.select("*", sumTest.name("PartSum")).orderBy("deptno").show()

No comments: