Thursday, November 23, 2017

JavaScript Closures: setTimeout and loop

Problem:

for (var i = 0; i < 3; i++) {
    setTimeout(function() {
        console.log("My value: " + i);
    }, 1);
}

My value: 3
My value: 3
My value: 3

}

Why:

Variable Global scope, and JavaScript asynchronous event loop single thread execution.

Solution:

// Block scope, ES2005
for (let i = 0; i < 3; i++) {
    setTimeout(function() {
        console.log("My value: " + i);
    }, 1000);
}

//Immediately Invoked creation, and function scope.
for (var i = 0; i < 3; i++) {
    setTimeout((function(pNum) {
        return function() {
            console.log('i: ', pNum)
        }
    })(i), 1000);
}

//  IIFE (Immediately Invoked Function Expression)
for (var i = 0; i < 3; i++) {
    (function(pNum = i) {
        setTimeout(
            function() {
                console.log('i : ', pNum)
            }, 1000)
    })(i);
}


Enjoy the sunshine,


Tuesday, November 21, 2017

Running jupyter notebook with pyspark shell on Windows 10

# Running jupyter notebook with pyspark shell.

This notebook will not run in an ordinary jupyter notebook server.
Here is how we can load pyspark to use Jupyter notebooks.
On Mac, Linux, something like this will work:
On Windows 10, it works at 2017-Nov-21.

# Install Anaconda

https://www.anaconda.com/download/

## Run "Anaconda Prompt" to open command line:

%windir%\System32\cmd.exe "/K" E:\Users\datab_000\Anaconda3\Scripts\activate.bat E:\Users\datab_000\Anaconda3

Note: above commands are in one single line.

## Then set system environment variables:

rem set PYSPARK_PYTHON=python3
set PYSPARK_DRIVER_PYTHON=jupyter
set PYSPARK_DRIVER_PYTHON_OPTS=notebook

## start pyspark

E:\movie\spark\spark-2.2.0-bin-hadoop2.7\bin\pyspark.cmd


Reference:

Friday, November 17, 2017

Complete spark 2.2.0 installation guide on windows 10.

Goal
----
The step by step Apache Spark 2.2.0 installation guide on windows 10.


Steps
----
Download winutils.exe from git, https://github.com/steveloughran/winutils
E.g.: https://github.com/steveloughran/winutils/blob/master/hadoop-2.7.1/bin/winutils.exe

mkdir /tmp/hive

winutils.exe chmod -R 777 E:\tmp\hive
or
winutils.exe chmod -R 777 /tmp/hive

set HADOOP_HOME=E:\movie\spark\hadoop
mkdir %HADOOP_HOME%\bin
copy winutils.exe to %HADOOP_HOME%\bin

Download Spark: spark-2.2.0-bin-hadoop2.7.tgz from http://spark.apache.org/downloads.html
cd E:\movie\spark\
# in MINGW64 (git / cywin)
tar -zxvf spark-2.2.0-bin-hadoop2.7.tgz
# or use 7-Zip

cd E:\movie\spark\spark-2.2.0-bin-hadoop2.7
bin\pyspark.cmd



Notes
----
%HADOOP_HOME%\bin\winutils.exe must be locatable.

Folder "E:\movie\spark\hadoop" is just an example, it can be any folder.

Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+.
http://spark.apache.org/docs/latest/

Reference
----
https://wiki.apache.org/hadoop/WindowsProblems


Here is the example output when start pyspark successfully:

E:\movie\spark\spark-2.2.0-bin-hadoop2.7>bin\pyspark.cmd

Python 3.6.1 (v3.6.1:69c0db5, Mar 21 2017, 18:41:36) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/11/17 19:07:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/11/17 19:07:36 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Python version 3.6.1 (v3.6.1:69c0db5, Mar 21 2017 18:41:36)
SparkSession available as 'spark'.
>>>
>>> textFile = spark.read.text("README.md")
17/11/17 19:08:03 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
>>> textFile.count()
103

>>> textFile.select(explode(split(textFile.value, "\s+")).name("word")).groupBy("word").count().show()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
NameError: name 'explode' is not defined

>>> from pyspark.sql.functions import *
>>> textFile.select(explode(split(textFile.value, "\s+")).name("word")).groupBy("word").count().show()
+--------------------+-----+
|                word|count|
+--------------------+-----+
|              online|    1|
|              graphs|    1|
|          ["Parallel|    1|
|          ["Building|    1|
|              thread|    1|
|       documentation|    3|
|            command,|    2|
|         abbreviated|    1|
|            overview|    1|
|                rich|    1|
|                 set|    2|
|         -DskipTests|    1|
|                name|    1|
|page](http://spar...|    1|
|        ["Specifying|    1|
|              stream|    1|
|                run:|    1|
|                 not|    1|
|            programs|    2|
|               tests|    2|
+--------------------+-----+
only showing top 20 rows

>>>

from pyspark.sql.functions import *


# module for pyspark,
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql import *

Employee = Row("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")

emp1 = Employee(7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10)
emp2 = Employee(7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20)

df1 = sqlContext.createDataFrame([emp1, emp2])

sparkSession = SparkSession.builder.master("local").appName("Window Function").getOrCreate()

empDF = sparkSession.createDataFrame([
      Employee(7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10),
      Employee(7499, "ALLEN", "SALESMAN", 7698, "20-Feb-81", 1600, 300, 30),
      Employee(7521, "WARD", "SALESMAN", 7698, "22-Feb-81", 1250, 500, 30),
      Employee(7566, "JONES", "MANAGER", 7839, "2-Apr-81", 2975, 0, 20),
      Employee(7654, "MARTIN", "SALESMAN", 7698, "28-Sep-81", 1250, 1400, 30),
      Employee(7698, "BLAKE", "MANAGER", 7839, "1-May-81", 2850, 0, 30),
      Employee(7782, "CLARK", "MANAGER", 7839, "9-Jun-81", 2450, 0, 10),
      Employee(7788, "SCOTT", "ANALYST", 7566, "19-Apr-87", 3000, 0, 20),
      Employee(7839, "KING", "PRESIDENT", 0, "17-Nov-81", 5000, 0, 10),
      Employee(7844, "TURNER", "SALESMAN", 7698, "8-Sep-81", 1500, 0, 30),
      Employee(7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20)
])


partitionWindow = Window.partitionBy("deptno").orderBy(desc("empno"))
sumTest = sum("sal").over(partitionWindow)
empDF.select("*", sumTest.name("PartSum")).show()

partitionWindowRow = Window.partitionBy("deptno").orderBy(desc("sal")).rowsBetween(-1, 1)
sumTest = sum("sal").over(partitionWindowRow)
empDF.select("*", sumTest.name("PartSum")).orderBy("deptno").show()