Python – How do you parse 150g csv files using Elixir? shell ? SQL database?

How do you parse 150g csv files using Elixir? shell ? SQL database?… here is a solution to the problem.

How do you parse 150g csv files using Elixir? shell ? SQL database?

Problem – I have a 150GB csv file with headers and the same number of columns per row. I only need the first column, minus the header, and only the only items. csv can’t be on my local machine because I don’t have space. It’s at Apple Airport. I would try to connect using a USB cable.

I have been searching the internet for a solution for about 3 days. I’ve heard of several solutions but not sure which is the best. Which is best and why?

  • Shell: I’ve heard that I can do this with a shell, but I don’t have shell writing experience in this area

  • Python script: I did create a script but gave up after running it for 4 hours. This is probably because I access it via wifi.

  • Elixir: I’m currently learning elixir and I’ve been told that Flow is a great option to assign my work to the CPU while I’m reading new information. Compare Stream and Flow. Out of a list of 1 million similar data, it took 8 seconds to use Stream and 2 seconds to get all the unique items in the file.

    def stream_parse (file_path, chunk_size) do
    File path
    |> File stream!
    |> Stream.drop(1)
    |> Stream.map(&String.split(&1, ",") |> List.first)
    |> Stream.chunk(chunk_size, chunk_size, [])
    |> Stream.map(&MapSet.new(&1))
    |> Enumerate .to_list
    End

    def flow_parse (file_path, chunk_size) do
    File path
    |> File.stream! (read_ahead: chunk_size)
    |> Stream.drop(1)
    |> Flow.from_enumerable
    |> Flow.map(&String.split(&1, ",") |> List.first)
    |> Flow.partition
    |> Flow.uniq
    |> Enumerate .to_list
    End

Although I don’t have a particular problem with the streaming solution
It has high memory usage, uses 1 thread, and runs on one core.
The streaming solution is multithreaded, using multiple cores, but there is the problem of eventually creating everything into one Enum.to_list, which may end up being who knows how long

    SQL

  • Server: Finally I was told to just create a linux server and use SQL and load the data into SQL and run the query to get the data.

What is the best way and if yes, is there a better solution. Except for writing C

Edit 1 12/6/2017. Day/month/year

I was able to complete the stream and stream example with elixir. I also get a shell script that can accomplish the desired result. At this point, the shell script runs at the same speed as the flow, and the flow wins. However, since it’s not local to my machine stream, it won’t make any difference because I’m IO bound (bind).

def stream_parse(file_path, chunk_size, output_file) do
  file_path
    |> File.stream! (read_ahead: chunk_size)
    |> Stream.drop(1)
    |> Stream.map(&String.split(&1, ",") |> List.first)
    |> Stream.uniq
    |> Stream.map(&"#{&1}\n")
    |> Stream.into(File.stream!( output_file, [:write, :utf8]))
    |> Stream.run
end

However, this lacks the ability to write a result file for each block and store unique items for the entire 150g in memory (not an option).

Shell scripts, which also store all unique items in memory

tail -n +2 my.csv | cut -d , -f 1 | sort -u > OUTPUT.csv

Solution

Finally searched a lot of forums, Elixir Slack channel. We finally came up with a solution. The first is to split the file, since there is already a shell command for this, there is no need to overcomplicate the Elixir code. I break everything down into methods to better explain what’s going on.

Divide the file into 10 million line subfiles

$ mkdir split-files 
$ split -a 8 -l 10000000 big_file.csv ./split-files
$ cd split-files 
$ for f in *; do mv "$f" "$f.csv"; done

Next we need to get a unique item from each file and write a unique file for the output. I was able to actually use Flow.uniq because the chunk_size would be 10 million and could fit in memory.

def flow_parse_dir(path, chunk_size) do
  Path.wildcard(path <> "/*.csv")
    |> Flow.from_enumerable
    |> Flow.map(fn filename ->
        [dir, file] = String.split(filename,"/")
        flow_parse(filename, chunk_size, dir <> "/unique_"<> file)
      end)
    |> Flow.run
end
def flow_parse(file_path, chunk_size, output_file) do
  file_path
    |> File.stream! (read_ahead: chunk_size)
    |> Stream.drop(1)
    |> Flow.from_enumerable
    |> Flow.map(&String.split(&1, ",") |> List.first)
    |> Flow.partition
    |> Flow.uniq
    |> Flow.map(&"#{&1}\n")
    |> Stream.into(File.stream!( output_file, [:write, :utf8]))
    |> Stream.run
end

After creating all the unique files, we need to create a complete unique file.

def concat_files(path, totol_unique_file_name) do
  sum_file =  File.open! (path <> "/" <> totol_unique_file_name, [:read, :utf8, :write])

Path.wildcard(path <> "/*.csv")
    |> Stream.map(fn filename ->
        [_, file] = String.split(filename, "/")
        if String.contains? (file, "unique") do
          write_concat_of_unique_files(file, path, sum_file)
        end
      end)
    |> Stream.run

File.close(sum_file)
end
def write_concat_of_unique_files(file, path, totol_unique_file_name) do
  # read in file contents line by line
  path <> "/" <> file
    |> File.stream! ()
    |> Stream.map(&String.trim(&1,"\n"))
    |> Stream.map(fn line ->
        IO.puts(totol_unique_file_name, line)
      end)
    |> Stream.run
end

The last method that should get the job done.

def unique_column(file_path, chunk_size, output) do
  total_file = File.open! (output, [:read, :utf8, :write])

file_path
    |> File.stream! (read_ahead: chunk_size)
    |> Stream.map(&String.trim(&1,"\n"))
    |> Stream.chunk(chunk_size, chunk_size, [])
    |> Flow.from_enumerable
    |> Flow.map(fn chunk ->
        chunk
          |> MapSet.new
          |> MapSet.to_list
          |> List.flatten
      end)
    |> Flow.partition
    |> Flow.map(fn line ->
        Enum.map(line, fn item ->
            IO.puts(total_file, item)
          end)
        end)
    |> Flow.run

File.close(total_file)
end

Check if the final file is completely unique. Previously, the number of unique items in the file was not considered too large and fit perfectly into memory. If the content is all unique, then you will get a list as a return. If you get an error, it’s not the only one.

def check_unique(file_path) do
  original_length = file_path
    |> File.stream!
    |> Enum.to_list

unique_length = file_path
    |> File.stream!
    |> Stream.uniq
    |> Enum.to_list

^unique_length = original_length
end

Related Problems and Solutions