1. Ambari에서 MongoDB 기능 활성화

  2. HDFS에 MovieLens 데이터 업로드

  3. MongoDB와 Spark을 통합한 Python Spark 스크립트 작성

    1. pymongo 설치

    2. 스크립트 작성

      from pyspark.sql import SparkSession
      from pyspark.sql import Row
      from pyspark.sql import functions
      
      def parseInput(line):
          fields = line.split('|')
          return Row(user_id = int(fields[0]), age = int(fields[1]), gender = fields[2], occupation = fields[3], zip = fields[4])
      
      if __name__ == "__main__":
          # Create a SparkSession
          spark = SparkSession.builder.appName("MongoDBIntegration").getOrCreate()
      
          # Get the raw data
          lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.user")
          # Convert it to a RDD of Row objects with (userID, age, gender, occupation, zip)
          users = lines.map(parseInput)
          # Convert that to a DataFrame
          usersDataset = spark.createDataFrame(users)
      
          # Write it into MongoDB
          usersDataset.write\\
              .format("com.mongodb.spark.sql.DefaultSource")\\
              .option("uri","mongodb://127.0.0.1/movielens.users")\\
              .mode('append')\\
              .save()
      
          # Read it back from MongoDB into a new Dataframe
          readUsers = spark.read\\
          .format("com.mongodb.spark.sql.DefaultSource")\\
          .option("uri","mongodb://127.0.0.1/movielens.users")\\
          .load()
      
          readUsers.createOrReplaceTempView("users")
      
          sqlDF = spark.sql("SELECT * FROM users WHERE age < 20")
          sqlDF.show()
      
          # Stop the session
      	    spark.stop()`
      
  4. 코드

    1. 일단 이렇게 user 데이터를 모두 불러옴 → 이걸 Spark DataFrame으로 변환하고, 이후 MongoDB의 DataFrame으로 변환함.
    lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.user")