Exploring Natural Language Processing (NLP) Using Spark and PySpark
Written on
Apache Spark is a powerful, open-source distributed computing framework designed for efficient processing and analysis of large datasets, particularly in the Big Data sphere. Its speed and flexibility allow for in-memory data handling, making it ideal for real-time analytics on extensive data volumes. With user-friendly APIs, Spark supports various programming languages including Java, Scala, Python, and R, catering to a diverse group of developers. The applications of Spark in the realm of Big Data are extensive, encompassing data cleansing, transformation, advanced analytics, and machine learning. Its capability to manage various workloads effectively has established Spark as a critical element in contemporary Big Data infrastructures, significantly boosting data processing speed, scalability, and development simplicity.
PySpark serves as the Python API for Spark. Below, a comparison is made between PySpark and Pandas, the data analysis library in Python.
Consequently, PySpark proves to be more efficient than many other libraries when dealing with large datasets.
The scalability feature of Spark is particularly beneficial for machine learning tasks that require model training on extensive datasets. The advantages include:
- Scalability: Capable of processing large datasets in parallel
- Modeling: Enables quicker and more accurate model training and execution
- User-friendly: Offers seamless integration with other tools and technologies
- Data ingestion: Supports both batch processing and real-time streaming
Machine learning capabilities in Spark can be leveraged via the ‘MLlib’ module, and the pyspark-ml library should be installed for this purpose. This project will utilize the pyspark-ml library.
Environment:
Being open-source, Spark can be deployed on local servers or in the cloud. Databricks is a well-known platform for managing clusters and providing a WebUI for Spark. It allows users to host their Spark clusters on their preferred cloud provider. A free Community Edition of Databricks is available, offering up to 15 GB of memory, although clusters will pause after about two hours of inactivity and must be recreated if paused. Databricks also grants access to the Databricks File System (DBFS) for data storage, with options for individual and shared notebooks. Users can schedule jobs and develop workflows as needed.
For this project, a cluster was set up in Databricks Community Edition using Apache Spark 3.3.2 and 15GB of memory, configured in the us-west availability zone. During cluster creation, necessary libraries were also installed, including nltk, gensim, and imblearn. While pyspark-ml is vital, it comes pre-installed in this cluster. Subsequently, the dataset was uploaded to DBFS, and a notebook was created within the cluster.
Exploratory Data Analysis (EDA) and Labeling:
Initially, the data was loaded into a Spark DataFrame.
The spark.read function can handle various formats such as parquet, avro, json, delta (from Delta Lake), kafka, and hive. In this instance, the data was in csv format. Different parameters were then defined using the option() method. Setting inferSchema to true allows the DataFrame to automatically deduce column data types based on the input data. The header parameter indicates whether the first row contains column names, and sep specifies the file delimiter. Ultimately, load() is called to load the data into a variable from the specified file location. The DataFrame can be viewed with df.show(), its size can be checked with df.count(), and its schema can be displayed using df.printSchema().
Next, some preliminary data cleaning was conducted, which included the removal of null values and outlier ratings, resulting in the elimination of nearly 900 reviews from the dataset.
The Ratings column was utilized as labels, and its distribution was subsequently plotted.
Here, the groupBy() method was applied to the cleaned_rating column, while count() was used for aggregation. The RDD conversion was achieved using .rdd to facilitate further transformations. The map() function was applied to extract relevant columns from these RDDs, followed by collect() to compile all RDD elements into a single output.
The plot indicated a highly uneven distribution, which could hinder effective outcomes. Additionally, a 10-class classification was deemed challenging given the small dataset of approximately 1200 rows. Therefore, a new label column was created by assigning sentiment scores to the reviews.
To achieve this, further preprocessing of the review text was performed, involving the conversion to lowercase and the removal of special characters.
A user-defined function (UDF) was established using the decorator @udf and integrated with withColumn(). This decorator is defined within pyspark.sql.functions. The UDF lower_clean_str() was responsible for converting strings to lowercase while eliminating punctuation and special characters. It was called via withColumn() and stored in cleaned_review. The withColumn() function created a new column, cleaned_rating, populated by the specified function.
Following this, the SentimentIntensityAnalyzer from the nltk.sentiment.vader module was employed to compute sentiment scores for each review. Any ratings that did not align with sentiment scores were flagged as fake (i.e., fake_review = 1).
A distribution of these labels was plotted, revealing a significant skew towards legitimate reviews.
This outcome was anticipated, as fake reviews are generally infrequent; however, a balanced dataset is necessary for effective model training. Therefore, data augmentation techniques were applied to the embedding vectors.
Text Preprocessing and Embedding
The review texts underwent tokenization, lemmatization, and removal of stop words.
The Tokenizer() function breaks the input into a list of tokens. This class is included in the pyspark.ml.feature module of Spark-MLlib. The tokenizer takes the input column name as a parameter during initialization and requires calling the transform() method while passing the DataFrame to create tokens.
Lemmatization helps reduce words to their root forms or lemmas, executed using the WordNetLemmatizer() from the nltk.stem module. The lemmatizer calls lemmatize() on each token in the text and combines the results.
The StopWordsRemover() class from pyspark.ml.feature was utilized to eliminate stop words. Initially, the default language for stop words was set using the loadDefaultStopWords() function. An instance of the class was then created with specified input and output column names, followed by applying the operation using transform() on the DataFrame.
Subsequently, the words were transformed into embeddings using various models, with the Word2Vec model selected for this project. This model is partially pre-trained and available in the gensim library.
Initially, the clean_reviews column was extracted as a list. The model was then trained on this list with a vector_size of 100, indicating that 100 features would be extracted for each word. The sg parameter is crucial; when set to 1, the Skip Gram version of Word2Vec is employed, while the default setting implements the Continuous Bag of Words (CBOW) version. After training the embedding model, the vectors were generated by summing the vector representations (provided by model.vw[index]) of each word in the reviews. Each embedding vector contains 100 values.
After embedding, the oversampling technique was applied to create a balanced dataset through synthetic data point generation, known as SMOTE (Synthetic Minority Over-sampling Technique).
In this method, the embedded vectors and fake reviews were converted to arrays and passed as arguments to the fit_resample() function of the SMOTE class, which generated the balanced dataset. As features and labels were returned separately, they were merged into a DataFrame using spark.createDataFrame(). The embedded reviews were vectorized using the array_to_vector() function from the pyspark.ml.functions module.
This resulting dataset included approximately 1888 rows. It was subsequently split into training and testing datasets in an 80-20 ratio and shuffled. The randomSplit() function divided the data into parts based on specified weights. Shuffling was achieved using the orderBy(rand()) function call, where rand() was imported from the pyspark.sql.functions module.
Modeling
The dataset is now ready for model training and prediction. For training, several models from the pyspark.ml.classification module were employed: LogisticRegression, LinearSVC, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, and MultilayerPerceptronClassifier.
The LogisticRegression() function initializes an object of the LogisticRegression class, taking the feature and label column names as parameters. The fit() function is then called on this object with the training dataset. The transform() function predicts outputs for the provided features. In this instance, transform returns the predicted fake_review labels for both training and testing datasets, which are then compared with actual labels to assess model accuracy. The logistic regression model achieved an accuracy of 69.33% on training data and 71.69% on test data. Other models were similarly evaluated and compared.
The multilayer perceptron model was structured with three hidden layers containing 64, 32, and 8 nodes, resulting in a final configuration of [100, 64, 32, 8, 2], where the input size is 100 and the output size is 2. This model yielded an accuracy of 87.02% on the training dataset and 77.25% on test data, indicating superior performance compared to other models.
In addition to Word2Vec, the FastText embedding model from the Gensim library was also utilized, and results were compared. Both Word2Vec and FastText can be applied using either CBOW or Skip Gram versions, and the performance was analyzed accordingly.
The results indicate that embedding using Word2Vec with the Skip Gram approach yielded the best outcomes on test data, while the multilayer perceptron classification model performed optimally with this data type.