summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ot_fullscrape.c133
1 files changed, 67 insertions, 66 deletions
diff --git a/ot_fullscrape.c b/ot_fullscrape.c
index d9c872e..36249fb 100644
--- a/ot_fullscrape.c
+++ b/ot_fullscrape.c
@@ -25,10 +25,18 @@
25 Full scrapes usually are huge and one does not want to 25 Full scrapes usually are huge and one does not want to
26 allocate more memory. So lets get them in 512k units 26 allocate more memory. So lets get them in 512k units
27*/ 27*/
28#define OT_SCRAPE_CHUNK_SIZE (512*1024) 28#define OT_SCRAPE_CHUNK_SIZE (1024)
29 29
30/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */ 30/* "d8:completei%zde10:downloadedi%zde10:incompletei%zdee" */
31#define OT_FULLSCRAPE_MAXENTRYLEN 256 31#define OT_SCRAPE_MAXENTRYLEN 256
32
33#ifdef WANT_COMPRESSION_GZIP
34#define IF_COMPRESSION( TASK ) if( mode & TASK_FLAG_GZIP ) TASK
35#define WANT_COMPRESSION_GZIP_PARAM( param1, param2 ) , param1, param2
36#else
37#define IF_COMPRESSION( TASK )
38#define WANT_COMPRESSION_GZIP_PARAM( param1, param2 )
39#endif
32 40
33/* Forward declaration */ 41/* Forward declaration */
34static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ); 42static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode );
@@ -69,11 +77,38 @@ void fullscrape_deliver( int64 socket, ot_tasktype tasktype ) {
69 mutex_workqueue_pushtask( socket, tasktype ); 77 mutex_workqueue_pushtask( socket, tasktype );
70} 78}
71 79
80static int fullscrape_increase( int *iovec_entries, struct iovec **iovector,
81 char **r, char **re WANT_COMPRESSION_GZIP_PARAM( z_stream *strm, ot_tasktype mode ) ) {
82 /* Allocate a fresh output buffer at the end of our buffers list */
83 if( !( *r = iovec_fix_increase_or_free( iovec_entries, iovector, *r, OT_SCRAPE_CHUNK_SIZE ) ) ) {
84
85 /* Deallocate gzip buffers */
86 IF_COMPRESSION( deflateEnd(strm); )
87
88 /* Release lock on current bucket and return */
89 return -1;
90 }
91
92 /* Adjust new end of output buffer */
93 *re = *r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
94
95 /* When compressing, we have all the bytes in output buffer */
96 IF_COMPRESSION( { \
97 *re -= OT_SCRAPE_MAXENTRYLEN; \
98 strm->next_out = (ot_byte*)*r; \
99 strm->avail_out = OT_SCRAPE_CHUNK_SIZE; \
100 deflate( strm, Z_NO_FLUSH ); \
101 *r = (char*)strm->next_out; \
102 } )
103
104 return 0;
105}
106
72static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) { 107static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tasktype mode ) {
73 int bucket; 108 int bucket;
74 char *r, *re; 109 char *r, *re;
75#ifdef WANT_COMPRESSION_GZIP 110#ifdef WANT_COMPRESSION_GZIP
76 char compress_buffer[OT_FULLSCRAPE_MAXENTRYLEN]; 111 char compress_buffer[OT_SCRAPE_MAXENTRYLEN];
77 z_stream strm; 112 z_stream strm;
78#endif 113#endif
79 114
@@ -83,28 +118,24 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
83 if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) ) 118 if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) )
84 return; 119 return;
85 120
86 /* ... and pointer to end of current output buffer. 121 /* re points to low watermark */
87 This works as a low watermark */ 122 re = r + OT_SCRAPE_CHUNK_SIZE - OT_SCRAPE_MAXENTRYLEN;
88 re = r + OT_SCRAPE_CHUNK_SIZE;
89 123
90#ifdef WANT_COMPRESSION_GZIP 124#ifdef WANT_COMPRESSION_GZIP
91 if( mode & TASK_FLAG_GZIP ) { 125 if( mode & TASK_FLAG_GZIP ) {
126 re += OT_SCRAPE_MAXENTRYLEN;
92 byte_zero( &strm, sizeof(strm) ); 127 byte_zero( &strm, sizeof(strm) );
93 strm.next_in = (ot_byte*)r; 128 strm.next_in = (ot_byte*)compress_buffer;
129 strm.next_out = (ot_byte*)r;
130 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
94 if( deflateInit2(&strm,9,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK ) 131 if( deflateInit2(&strm,9,Z_DEFLATED,31,8,Z_DEFAULT_STRATEGY) != Z_OK )
95 fprintf( stderr, "not ok.\n" ); 132 fprintf( stderr, "not ok.\n" );
96
97 strm.next_out = (unsigned char*)r;
98 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
99 r = compress_buffer; 133 r = compress_buffer;
100 } 134 }
101#endif 135#endif
102 136
103 /* Reply dictionary only needed for bencoded fullscrape */ 137 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE )
104 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { 138 r += sprintf( r, "d5:filesd" );
105 memmove( r, "d5:filesd", 9 );
106 r += 9;
107 }
108 139
109 /* For each bucket... */ 140 /* For each bucket... */
110 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { 141 for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) {
@@ -121,13 +152,15 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
121 switch( mode & TASK_TASK_MASK ) { 152 switch( mode & TASK_TASK_MASK ) {
122 case TASK_FULLSCRAPE: 153 case TASK_FULLSCRAPE:
123 default: 154 default:
155
124 /* push hash as bencoded string */ 156 /* push hash as bencoded string */
125 *r++='2'; *r++='0'; *r++=':'; 157 *r++='2'; *r++='0'; *r++=':';
126 memmove( r, hash, 20 ); r+=20; 158 memmove( r, hash, 20 ); r+=20;
127 159
128 /* push rest of the scrape string */ 160 /* push rest of the scrape string */
129 r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count ); 161 r += sprintf( r, "d8:completei%zde10:downloadedi%zde10:incompletei%zdee", peer_list->seed_count, peer_list->down_count, peer_list->peer_count-peer_list->seed_count );
130 break; 162
163 break;
131 case TASK_FULLSCRAPE_TPB_ASCII: 164 case TASK_FULLSCRAPE_TPB_ASCII:
132 to_hex( r, *hash ); r+=40; 165 to_hex( r, *hash ); r+=40;
133 r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count ); 166 r += sprintf( r, ":%zd:%zd\n", peer_list->seed_count, peer_list->peer_count-peer_list->seed_count );
@@ -144,73 +177,41 @@ static void fullscrape_make( int *iovec_entries, struct iovec **iovector, ot_tas
144 } 177 }
145 178
146#ifdef WANT_COMPRESSION_GZIP 179#ifdef WANT_COMPRESSION_GZIP
147 if( mode & TASK_FLAG_GZIP ) { 180 if( mode & TASK_FLAG_GZIP ) {
148 strm.next_in = (ot_byte*)compress_buffer; 181 strm.next_in = (ot_byte*)compress_buffer;
149 strm.avail_in = r - compress_buffer; 182 strm.avail_in = r - compress_buffer;
150 if( deflate( &strm, Z_NO_FLUSH ) != Z_OK ) 183 deflate( &strm, Z_NO_FLUSH );
151 fprintf( stderr, "Not ok.\n" );
152 r = (char*)strm.next_out; 184 r = (char*)strm.next_out;
153 } 185 }
154#endif 186#endif
155 187
156 /* If we reached our low watermark in buffer... */ 188 /* Check if there still is enough buffer left */
157 if( re - r <= OT_FULLSCRAPE_MAXENTRYLEN ) { 189 while( ( r > re ) && fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode ) ) )
158 190 return mutex_bucket_unlock( bucket );
159 /* crop current output buffer to the amount really used */
160 iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) );
161
162 /* And allocate a fresh output buffer at the end of our buffers list */
163 if( !( r = iovec_increase( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE ) ) ) {
164
165 /* If this fails: free buffers */
166 iovec_free( iovec_entries, iovector );
167
168#ifdef WANT_COMPRESSION_GZIP
169 deflateEnd(&strm);
170#endif
171
172 /* Release lock on current bucket and return */
173 mutex_bucket_unlock( bucket );
174 return;
175 }
176
177 /* Adjust new end of output buffer */
178 re = r + OT_SCRAPE_CHUNK_SIZE;
179 191
180#ifdef WANT_COMPRESSION_GZIP 192 IF_COMPRESSION( r = compress_buffer; )
181 if( mode & TASK_FLAG_GZIP ) {
182 strm.next_out = (ot_byte*)r;
183 strm.avail_out = OT_SCRAPE_CHUNK_SIZE;
184 }
185#endif
186 }
187#ifdef WANT_COMPRESSION_GZIP
188 if( mode & TASK_FLAG_GZIP ) {
189 r = compress_buffer;
190 }
191#endif
192 } 193 }
193 194
194 /* All torrents done: release lock on currenct bucket */ 195 /* All torrents done: release lock on currenct bucket */
195 mutex_bucket_unlock( bucket ); 196 mutex_bucket_unlock( bucket );
196 } 197 }
197 198
198 /* Close bencoded scrape dictionary if necessary */ 199 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE )
199 if( ( mode & TASK_TASK_MASK ) == TASK_FULLSCRAPE ) { 200 r += sprintf( r, "ee" );
200 *r++='e'; *r++='e';
201 }
202 201
203#ifdef WANT_COMPRESSION_GZIP 202#ifdef WANT_COMPRESSION_GZIP
204 if( mode & TASK_FLAG_GZIP ) { 203 if( mode & TASK_FLAG_GZIP ) {
205 strm.next_in = (ot_byte*) compress_buffer; 204 strm.next_in = (ot_byte*)compress_buffer;
206 strm.avail_in = r - compress_buffer; 205 strm.avail_in = r - compress_buffer;
207 if( deflate( &strm, Z_FINISH ) != Z_STREAM_END ) 206 deflate( &strm, Z_FINISH );
208 fprintf( stderr, "Not ok.\n" );
209 r = (char*)strm.next_out; 207 r = (char*)strm.next_out;
210 deflateEnd(&strm); 208
209 while( ( r > re ) && fullscrape_increase( iovec_entries, iovector, &r, &re WANT_COMPRESSION_GZIP_PARAM( &strm, mode ) ) )
210 return mutex_bucket_unlock( bucket );
211 deflateEnd(&strm);
211 } 212 }
212#endif 213#endif
213 214
214 /* Release unused memory in current output buffer */ 215 /* Release unused memory in current output buffer */
215 iovec_fixlast( iovec_entries, iovector, OT_SCRAPE_CHUNK_SIZE - ( re - r ) ); 216 iovec_fixlast( iovec_entries, iovector, r );
216} 217}