Dans l'analyse des données, il existe souvent des options à implémenter en combinaison avec Apache Spark + Cassandra.
Apache Spark est un outil d'analyse de données très connu.

Access data in HDFS, Alluxio, Apache Cassandra, Apache HBase, Apache Hive, and hundreds of other data sources.
RDD (Resilient Distributed Dataset), DataFrame et DataSet. .. ..
Source: https://spark.apache.org/
Cassandra est une base de données de colonnes NoSQL.

Manage massive amounts of data, fast, without losing sleep
Source: http://cassandra.apache.org/
En particulier, nous avons considéré l'évolutivité depuis le début, de sorte que le clustering est facile.
Spark a diverses fonctions, mais créons un exemple pour enregistrer le CSV dans Cassandra.

build.gradle
dependencies {
	// https://mvnrepository.com/artifact/org.scala-lang/scala-library
	compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.12'
	// https://mvnrepository.com/artifact/org.apache.spark/spark-core
	compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.3.4'
	// https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
	compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.4.1'
	
	// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
	compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.3.4'
	// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
	compile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.3.4'
}
CsvReader.java
package com.test.spark;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
public class CsvReader {
	private static final Logger logger = Logger.getLogger(CsvReader.class);
	public static void main(String[] args) {
		//Paramètres Spark
		SparkConf conf = new SparkConf();
		conf.setAppName("CSVReader");
		conf.setMaster("local[*]");
		conf.set("spark.cassandra.connection.host", "192.168.10.248");
		conf.set("spark.cassandra.connection.port", "9042");
		//Espace de clés et nom de table Cassandra
		String keyspace = "sample";
		String tableUser = "user";
		String userCsv = "C:\\data\\spark\\users.csv";
		JavaSparkContext sc = new JavaSparkContext(conf);
		try {
			SparkSession sparkSession = SparkSession.builder().master("local").appName("CSVReader")
					.config("spark.sql.warehouse.dir", "file:////C:/data/spark").getOrCreate();
			//Connexion Cassandra
			CassandraConnector connector = CassandraConnector.apply(sc.getConf());
			
			try (Session session = connector.openSession()) {
				//Supprimer l'espace de clés s'il existe
				session.execute("DROP KEYSPACE IF EXISTS " + keyspace);
				
				//Créer un espace de clés
				session.execute("CREATE KEYSPACE " + keyspace
						+ " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
				
				//Créer une table
				session.execute("CREATE TABLE " + keyspace + "." + tableUser
						+ "(user_id TEXT PRIMARY KEY, user_name TEXT, email_address TEXT, memo TEXT)");
			}
			//Obtenir des données à partir de CSV
			//La colonne AS est également importante pour correspondre à la définition de la table
			Dataset<Row> csv = sparkSession.read().format("com.databricks.spark.csv").option("header", "true")
					.option("encoding", "UTF-8").load(userCsv).select(new Column("Identifiant d'utilisateur").as("user_id"),
							new Column("Nom complet").as("user_name"), 
							new Column("adresse mail").as("email_address"),
							new Column("Remarques").as("memo"));
			//Sauvegarder à Cassandra
			csv.write().format("org.apache.spark.sql.cassandra")
					.option("header", "true")
					.option("keyspace", keyspace)
					.option("table", tableUser)
					.option("column", "user_id")
					.option("column", "user_name")
					.option("column", "email_address")
					.option("column", "memo")
					.mode(SaveMode.Append)
					.save();
			//Lire les données de Cassandra
			Dataset<Row> dataset = sparkSession.read().format("org.apache.spark.sql.cassandra")
					.option("keyspace", keyspace)
					.option("table", tableUser).load();
			
			//Obtenir un tableau à partir d'un ensemble de données
			List<Row> asList = dataset.collectAsList();
			for (Row r : asList) {
				logger.info(r);
			}
		} catch (Exception e) {
			logger.error(e);
		} finally {
			sc.stop();
			sc.close();
		}
	}
}

19/10/11 23:18:27 INFO CsvReader: [A000002,[email protected],10 ans après avoir rejoint l'entreprise,Saburo Yamada]
19/10/11 23:18:27 INFO CsvReader: [A000004,[email protected],3ème année après avoir rejoint l'entreprise,Jiro Tanaka]
19/10/11 23:18:27 INFO CsvReader: [A000003,[email protected],5e année après avoir rejoint l'entreprise,Ichiro Tanaka]
19/10/11 23:18:27 INFO CsvReader: [A000001,[email protected],1ère année après avoir rejoint l'entreprise,Yamada Taro]
Des informations détaillées telles que les opérations de base peuvent être trouvées dans le guide. Spark Programming Guide: https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html
c'est tout
Recommended Posts