@@ -4,7 +4,8 @@ defmodule Stream.Reducers do
4
4
5
5
def chunk_every ( chunk_by , enumerable , count , step , leftover ) do
6
6
limit = :erlang . max ( count , step )
7
- chunk_by . ( enumerable , { [ ] , 0 } , fn entry , { acc_buffer , acc_count } ->
7
+
8
+ chunk_fun = fn entry , { acc_buffer , acc_count } ->
8
9
acc_buffer = [ entry | acc_buffer ]
9
10
acc_count = acc_count + 1
10
11
@@ -21,28 +22,37 @@ defmodule Stream.Reducers do
21
22
else
22
23
{ :cont , new_state }
23
24
end
24
- end , fn { acc_buffer , acc_count } ->
25
+ end
26
+
27
+ after_fun = fn { acc_buffer , acc_count } ->
25
28
if leftover == :discard or acc_count == 0 do
26
29
{ :cont , [ ] }
27
30
else
28
31
{ :cont , :lists . reverse ( acc_buffer , Enum . take ( leftover , count - acc_count ) ) , [ ] }
29
32
end
30
- end )
33
+ end
34
+
35
+ chunk_by . ( enumerable , { [ ] , 0 } , chunk_fun , after_fun )
31
36
end
32
37
33
38
def chunk_by ( chunk_by , enumerable , fun ) do
34
- chunk_by . ( enumerable , nil , fn
39
+ chunk_fun = fn
35
40
entry , nil ->
36
41
{ :cont , { [ entry ] , fun . ( entry ) } }
42
+
37
43
entry , { acc , value } ->
38
44
case fun . ( entry ) do
39
45
^ value -> { :cont , { [ entry | acc ] , value } }
40
46
new_value -> { :cont , :lists . reverse ( acc ) , { [ entry ] , new_value } }
41
47
end
42
- end , fn
48
+ end
49
+
50
+ after_fun = fn
43
51
nil -> { :cont , :done }
44
52
{ acc , _value } -> { :cont , :lists . reverse ( acc ) , :done }
45
- end )
53
+ end
54
+
55
+ chunk_by . ( enumerable , nil , chunk_fun , after_fun )
46
56
end
47
57
48
58
defmacro chunk_while ( callback , fun \\ nil ) do
@@ -61,6 +71,7 @@ defmodule Stream.Reducers do
61
71
quote do
62
72
fn entry , acc ( head , prev , tail ) = acc ->
63
73
value = unquote ( callback ) . ( entry )
74
+
64
75
case prev do
65
76
{ :value , ^ value } -> skip ( acc )
66
77
_ -> next_with_acc ( unquote ( fun ) , entry , head , { :value , value } , tail )
@@ -74,6 +85,7 @@ defmodule Stream.Reducers do
74
85
fn
75
86
_entry , acc ( head , amount , tail ) when amount > 0 ->
76
87
skip ( acc ( head , amount - 1 , tail ) )
88
+
77
89
entry , acc ( head , amount , tail ) ->
78
90
next_with_acc ( unquote ( fun ) , entry , head , amount , tail )
79
91
end
@@ -85,6 +97,7 @@ defmodule Stream.Reducers do
85
97
fn
86
98
entry , acc ( head , curr , tail ) when curr in [ unquote ( nth ) , :first ] ->
87
99
skip ( acc ( head , 1 , tail ) )
100
+
88
101
entry , acc ( head , curr , tail ) ->
89
102
next_with_acc ( unquote ( fun ) , entry , head , curr + 1 , tail )
90
103
end
@@ -140,6 +153,7 @@ defmodule Stream.Reducers do
140
153
fn
141
154
entry , acc ( head , curr , tail ) when curr in [ unquote ( nth ) , :first ] ->
142
155
next_with_acc ( unquote ( fun ) , unquote ( mapper ) . ( entry ) , head , 1 , tail )
156
+
143
157
entry , acc ( head , curr , tail ) ->
144
158
next_with_acc ( unquote ( fun ) , entry , head , curr + 1 , tail )
145
159
end
@@ -163,6 +177,7 @@ defmodule Stream.Reducers do
163
177
fn
164
178
entry , acc ( head , :first , tail ) ->
165
179
next_with_acc ( unquote ( fun ) , entry , head , { :ok , entry } , tail )
180
+
166
181
entry , acc ( head , { :ok , acc } , tail ) ->
167
182
value = unquote ( callback ) . ( entry , acc )
168
183
next_with_acc ( unquote ( fun ) , value , head , { :ok , value } , tail )
@@ -185,9 +200,11 @@ defmodule Stream.Reducers do
185
200
case curr do
186
201
0 ->
187
202
{ :halt , original }
203
+
188
204
1 ->
189
205
{ _ , acc } = next_with_acc ( unquote ( fun ) , entry , head , 0 , tail )
190
206
{ :halt , acc }
207
+
191
208
_ ->
192
209
next_with_acc ( unquote ( fun ) , entry , head , curr - 1 , tail )
193
210
end
@@ -200,6 +217,7 @@ defmodule Stream.Reducers do
200
217
fn
201
218
entry , acc ( head , curr , tail ) when curr in [ unquote ( nth ) , :first ] ->
202
219
next_with_acc ( unquote ( fun ) , entry , head , 1 , tail )
220
+
203
221
entry , acc ( head , curr , tail ) ->
204
222
skip ( acc ( head , curr + 1 , tail ) )
205
223
end
@@ -222,6 +240,7 @@ defmodule Stream.Reducers do
222
240
quote do
223
241
fn entry , acc ( head , prev , tail ) = original ->
224
242
value = unquote ( callback ) . ( entry )
243
+
225
244
if Map . has_key? ( prev , value ) do
226
245
skip ( original )
227
246
else
0 commit comments