zmc
2023-08-08 e792e9a60d958b93aef96050644f369feb25d61b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""
Tests multithreading behaviour for reading and
parsing files for each parser defined in parsers.py
"""
from contextlib import ExitStack
from io import BytesIO
from multiprocessing.pool import ThreadPool
 
import numpy as np
import pytest
 
import pandas as pd
from pandas import DataFrame
import pandas._testing as tm
 
# We'll probably always skip these for pyarrow
# Maybe we'll add our own tests for pyarrow too
pytestmark = pytest.mark.usefixtures("pyarrow_skip")
 
 
def _construct_dataframe(num_rows):
    """
    Construct a DataFrame for testing.
 
    Parameters
    ----------
    num_rows : int
        The number of rows for our DataFrame.
 
    Returns
    -------
    df : DataFrame
    """
    df = DataFrame(np.random.rand(num_rows, 5), columns=list("abcde"))
    df["foo"] = "foo"
    df["bar"] = "bar"
    df["baz"] = "baz"
    df["date"] = pd.date_range("20000101 09:00:00", periods=num_rows, freq="s")
    df["int"] = np.arange(num_rows, dtype="int64")
    return df
 
 
@pytest.mark.slow
def test_multi_thread_string_io_read_csv(all_parsers):
    # see gh-11786
    parser = all_parsers
    max_row_range = 10000
    num_files = 100
 
    bytes_to_df = [
        "\n".join([f"{i:d},{i:d},{i:d}" for i in range(max_row_range)]).encode()
        for _ in range(num_files)
    ]
 
    # Read all files in many threads.
    with ExitStack() as stack:
        files = [stack.enter_context(BytesIO(b)) for b in bytes_to_df]
 
        pool = stack.enter_context(ThreadPool(8))
 
        results = pool.map(parser.read_csv, files)
        first_result = results[0]
 
        for result in results:
            tm.assert_frame_equal(first_result, result)
 
 
def _generate_multi_thread_dataframe(parser, path, num_rows, num_tasks):
    """
    Generate a DataFrame via multi-thread.
 
    Parameters
    ----------
    parser : BaseParser
        The parser object to use for reading the data.
    path : str
        The location of the CSV file to read.
    num_rows : int
        The number of rows to read per task.
    num_tasks : int
        The number of tasks to use for reading this DataFrame.
 
    Returns
    -------
    df : DataFrame
    """
 
    def reader(arg):
        """
        Create a reader for part of the CSV.
 
        Parameters
        ----------
        arg : tuple
            A tuple of the following:
 
            * start : int
                The starting row to start for parsing CSV
            * nrows : int
                The number of rows to read.
 
        Returns
        -------
        df : DataFrame
        """
        start, nrows = arg
 
        if not start:
            return parser.read_csv(
                path, index_col=0, header=0, nrows=nrows, parse_dates=["date"]
            )
 
        return parser.read_csv(
            path,
            index_col=0,
            header=None,
            skiprows=int(start) + 1,
            nrows=nrows,
            parse_dates=[9],
        )
 
    tasks = [
        (num_rows * i // num_tasks, num_rows // num_tasks) for i in range(num_tasks)
    ]
 
    with ThreadPool(processes=num_tasks) as pool:
        results = pool.map(reader, tasks)
 
    header = results[0].columns
 
    for r in results[1:]:
        r.columns = header
 
    final_dataframe = pd.concat(results)
    return final_dataframe
 
 
@pytest.mark.slow
def test_multi_thread_path_multipart_read_csv(all_parsers):
    # see gh-11786
    num_tasks = 4
    num_rows = 100000
 
    parser = all_parsers
    file_name = "__thread_pool_reader__.csv"
    df = _construct_dataframe(num_rows)
 
    with tm.ensure_clean(file_name) as path:
        df.to_csv(path)
 
        final_dataframe = _generate_multi_thread_dataframe(
            parser, path, num_rows, num_tasks
        )
        tm.assert_frame_equal(df, final_dataframe)