I started playing with Pyspark to do some data processing. It was interesting to me that I could do something like
rdd.map(lambda x: (x['somekey'], 1)).reduceByKey(lambda x, y: x + y).count()
Edit 2: This is the problem in java in case of confusion:
public class Playground {
private static interface DoesThings {
public void doThing();
}
public void func() throws Exception {
Socket s = new Socket("addr", 1234);
ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
oos.writeObject("Hello!"); // Works just fine, you're just sending a string
oos.writeObject((DoesThings)() - > System.out.println("Hey, im doing a thing!!")); // Sends the object, but error on other machine
DoesThings dt = (DoesThings)() - > System.out.println("Hey, im doing a thing!!");
System.out.println(dt.getClass());
}
}
This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark’s interactive shell (in Python or Scala), then show how to write applications in Java, Scala, and Python.,Spark’s shell provides a simple way to learn the API, as well as a powerful tool to analyze data interactively. It is available in either Scala (which runs on the Java VM and is thus a good way to use existing Java libraries) or Python. Start it by running the following in the Spark directory:,If you have PySpark pip installed into your environment (e.g., pip install pyspark), you can run your application with the regular Python interpreter or use the provided ‘spark-submit’ as you prefer.,Suppose we wish to write a self-contained application using the Spark API. We will walk through a simple application in Scala (with sbt), Java (with Maven), and Python (pip).
. / bin / spark - shell
scala > val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
scala > textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala > textFile.first() // First item in this Dataset
res1: String = # Apache Spark
scala > val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
scala > textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
. / bin / pyspark
A Spark application is composed of a driver process (also called driver program) and a set of executor processes. The driver is responsible for maintaining application state, running user code and handling user input scheduling and distributing work to executors. Executors execute code assigned by the driver and report the state and final results of computation back to the driver.,A Spark cluster is composed of master and worker nodes. A master mediates access to workers in the cluster. Workers host driver or executor processes of Spark applications.,Supports batch processing but also stream processing. Specialised Spark libraries exist for machine learning or graph analytics.,In cluster mode, a Spark application driver and its executors all run inside the cluster in association to workers.
if __name__ == "__main__":
from pyspark
import SparkContext
from pyspark.sql
import SparkSession
spark = SparkSession\
.builder\
.appName("My beautiful app")\
.master("local[*]")\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# # "Word count" computation. rdd = sc.textFile(input_file)\ .flatMap(lambda line: [(word, 1)\ for word in line.split() ])\ .reduceByKey(lambda x, y: x + y) # # Action that triggers the execution of job results = rdd.collect()
movies = ...# # read file # # Transformations form a Spark SQL query StarWarMovies = movies\ .filter(movies.title.contains('Star Wars'))\ .orderBy(movies.title) # # Action results = StarWarMovies.collect()
This article provides an introduction to Spark including use cases and examples. It contains information from the Apache Spark website as well as the book Learning Spark - Lightning-Fast Big Data Analysis.,Spark helps to simplify the challenging and computationally intensive task of processing high volumes of real-time or archived data.,Spark Core is the base engine for large-scale parallel and distributed data processing. It is responsible for:,Spark is an Apache project advertised as “lightning fast cluster computing.” It has a thriving open-source community and is the most active Apache project at the moment.
Spark also makes it possible to write code more quickly as you have over 80 high-level operators at your disposal. To demonstrate this, let’s have a look at the “Hello World!” of BigData: the Word Count example. Written in Java for MapReduce it has around 50 lines of code, whereas in Spark (and Scala) you can do it as simply as this:
sparkContext.textFile("hdfs://...")
.flatMap(line => line.split(" "))
.map(word => (word, 1)).reduceByKey(_ + _)
.saveAsTextFile("hdfs://...")
SparkSQL is a Spark component that supports querying data either via SQL or via the Hive Query Language. It originated as the Apache Hive port to run on top of Spark (in place of MapReduce) and is now integrated with the Spark stack. In addition to providing support for various data sources, it makes it possible to weave SQL queries with code transformations which results in a very powerful tool. Below is an example of a Hive compatible query:
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
First, we would have to filter tweets which seem relevant like “earthquake” or “shaking”. We could easily use Spark Streaming for that purpose as follows:
TwitterUtils.createStream(...)
.filter(_.getText.contains("earthquake") || _.getText.contains("shaking"))
If we are happy with the prediction rate of the model, we could move onto the next stage and react whenever we discover an earthquake. To detect one we need a certain number (i.e., density) of positive tweets in a defined time window (as described in the article). Note that, for tweets with Twitter location services enabled, we would also extract the location of the earthquake. Armed with this knowledge, we could use SparkSQL and query an existing Hive table (storing users interested in receiving earthquake notifications) to retrieve their email addresses and send them a personalized warning email, as follows:
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
// sendEmail is a custom function
sqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName, city, email")
.collect().foreach(sendEmail)